GoSuda

Go souběžnost starter pack

By snowmerak
views ...

Přehled

Stručný úvod

Jazyk Go nabízí mnoho nástrojů pro správu souběžnosti. V tomto článku představíme některé z nich a související techniky.

Gorutiny?

Gorutina je nový typ modelu souběžnosti podporovaný jazykem Go. Obvykle program využívá OS vlákna k provádění více úloh současně, přičemž počet paralelních úloh je omezen počtem jader procesoru. Pro menší jednotky souběžnosti se v uživatelském prostoru vytvářejí zelená vlákna, která se střídají v rámci jednoho OS vlákna. Gorutiny však představují menší a efektivnější variantu zelených vláken. Gorutiny používají méně paměti než vlákna a lze je rychleji vytvářet a přepínat.

K použití gorutiny stačí použít klíčové slovo go. To umožňuje intuitivní transformaci synchronního kódu na asynchronní kód během psaní programu.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("Hello, World!")
14    }()
15
16    fmt.Println("Waiting for goroutine...")
17    for range ch {}
18}

Tento kód jednoduše změní synchronní kód, který po jedné sekundě čekání vypíše Hello, World!, na asynchronní tok. I když je tento příklad jednoduchý, při změně složitějšího kódu ze synchronního na asynchronní se čitelnost, viditelnost a srozumitelnost kódu výrazně zlepšují ve srovnání s metodami jako async/await nebo promise.

V mnoha případech však vytvoření špatného kódu gorutin pramení z nepochopení jednoduchého asynchronního volání synchronního kódu a toků typu fork & join (podobných rozděl a panuj). V tomto článku představíme několik metod a technik, jak se těmto případům vyhnout.

Správa souběžnosti

context

Zařazení context jako první techniky správy může být překvapivé. V Go však context přesahuje jednoduchou funkci zrušení a hraje klíčovou roli ve správě celého stromu úloh. Pro ty, kteří jej neznají, stručně popíšeme tento balíček.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("Context is done!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

Výše uvedený kód používá context k výpisu Context is done! po jedné sekundě. context umožňuje ověřit zrušení pomocí metody Done() a nabízí různé metody zrušení, jako jsou WithCancel, WithTimeout, WithDeadline a WithValue.

Vytvořme jednoduchý příklad. Předpokládejme, že píšete kód, který používá vzor aggregator k načtení dat user, post a comment. A všechny požadavky musí být splněny do 2 sekund. Můžete to napsat následovně:

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("Timeout!")
20    case <-ch:
21        fmt.Println("All data is fetched!")
22    }
23}

Tento kód vypíše Timeout!, pokud se všechna data nenačtou do 2 sekund, a All data is fetched!, pokud se všechna data načtou. Tímto způsobem umožňuje context snadnou správu zrušení a časových limitů i v kódu s mnoha gorutinami.

Různé funkce a metody související s kontextem naleznete na stránce godoc context. Doporučujeme si je prostudovat a naučit se je používat.

channel

unbuffered channel

channel je nástroj pro komunikaci mezi gorutinami. channel se vytvoří pomocí make(chan T), kde T je typ dat, která má channel přenášet. channel umožňuje výměnu dat pomocí <- a uzavření pomocí close.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Tento kód používá channel k výpisu 1 a 2. Tento kód ukazuje pouze základní zasílání a přijímání dat přes channel. channel však nabízí mnohem více funkcí. Nejprve se podívejme na buffered channel a unbuffered channel. Příklad výše je unbuffered channel, kde odesílání a přijímání dat musí proběhnout současně. Pokud k tomu nedojde, může dojít k zablokování (deadlock).

buffered channel

Co kdyby výše uvedený kód nebyl jen jednoduchý výpis, ale dva náročné procesy? Pokud by druhý proces trval dlouho, první proces by se také na stejnou dobu zastavil. Abychom tomu zabránili, můžeme použít buffered channel.

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Tento kód používá buffered channel k výpisu 1 a 2. buffered channel umožňuje, aby odesílání a přijímání dat probíhalo nezávisle na sobě. Díky bufferu v kanálu vzniká rezerva, která zabraňuje zpoždění práce způsobenému následnými úlohami.

select

Při práci s více kanály umožňuje syntaxe select snadné vytvoření struktury fan-in.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

Tento kód vytvoří tři kanály, které periodicky odesílají 1, 2 a 3, a používá select k přijímání a výpisu hodnot z kanálů. Tímto způsobem umožňuje select přijímat data z více kanálů současně a zpracovávat je ihned po přijetí.

for range

channel umožňuje snadné přijímání dat pomocí for range. Použití for range s kanálem spustí akci při každém přidání dat do kanálu a ukončí smyčku po uzavření kanálu.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Tento kód používá channel k výpisu 1 a 2. for range přijímá a vypisuje data pokaždé, když se přidají do kanálu, a po uzavření kanálu smyčku ukončí.

Jak je uvedeno výše, tato syntaxe může být použita jako jednoduchý synchronizační mechanismus.

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("Hello, World!")
 9    }()
10
11    fmt.Println("Waiting for goroutine...")
12    for range ch {}
13}

Tento kód po jedné sekundě čekání vypíše Hello, World!. channel se používá k transformaci synchronního kódu na asynchronní a k nastavení bodu join.

etc

  1. Odeslání nebo přijetí dat do/z nulového kanálu může vést k nekonečné smyčce a zablokování.
  2. Odeslání dat po uzavření kanálu způsobí paniku.
  3. I bez explicitního uzavření kanálu jej garbage collector automaticky uzavře.

mutex

spinlock

spinlock je synchronizační metoda, která opakovaně zkouší získat zámek v cyklu. V Go lze spinlock snadno implementovat pomocí ukazatelů.

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

Tento kód implementuje balíček spinlock. Používá balíček sync/atomic k implementaci SpinLock. Metoda Lock používá atomic.CompareAndSwapUintptr k pokusu o získání zámku a metoda Unlock používá atomic.StoreUintptr k uvolnění zámku. Tato metoda neustále zkouší získat zámek, takže neustále spotřebovává CPU a může vést k nekonečné smyčce. Proto je spinlock vhodné používat pouze pro jednoduchou synchronizaci nebo na krátkou dobu.

sync.Mutex

mutex je nástroj pro synchronizaci mezi gorutinami. mutex poskytovaný balíčkem sync nabízí metody Lock, Unlock, RLock a RUnlock. mutex se vytvoří pomocí sync.Mutex a lze také použít čtení/zápis zámku s sync.RWMutex.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

V tomto kódu téměř současně přistupují dvě gorutiny ke stejné proměnné count. Použití mutex k vytvoření kritické sekce pro přístup k proměnné count zabrání souběžnému přístupu k proměnné count. Tento kód proto vždy vypíše 2, bez ohledu na počet opakování.

sync.RWMutex

sync.RWMutex umožňuje rozlišovat mezi zámkem pro čtení a zápisem. Metody RLock a RUnlock umožňují nastavení a uvolnění zámku pro čtení.

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

Tento kód implementuje ConcurrentMap pomocí sync.RWMutex. Metoda Get používá zámek pro čtení a metoda Set zámek pro zápis, aby bylo možné bezpečně přistupovat a upravovat mapu data. Zámek pro čtení je nezbytný, protože v případě velkého množství operací čtení je možné umožnit více gorutinám současně provádět operace čtení bez nutnosti nastavovat zámek pro zápis. Tím se zvyšuje výkon v případech, kdy není nutné nastavovat zámek pro zápis, protože se nemění stav.

fakelock

fakelock je jednoduchý trik implementující sync.Locker. Tento datový typ poskytuje stejné metody jako sync.Mutex, ale ve skutečnosti nic nedělá.

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

Tento kód implementuje balíček fakelock. Implementuje sync.Locker a poskytuje metody Lock a Unlock, ale ve skutečnosti nic nedělá. Důvod pro existenci tohoto kódu bude vysvětlen později.

waitgroup

sync.WaitGroup

sync.WaitGroup je nástroj pro čekání na dokončení všech úloh gorutin. Poskytuje metody Add, Done a Wait. Metoda Add přidá počet gorutin, metoda Done oznamuje dokončení úlohy gorutiny a metoda Wait čeká na dokončení všech gorutin.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    pro i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Výše uvedený kód používá sync.WaitGroup k souběžnému přičítání hodnoty do proměnné c 100 gorutinami. Kód čeká pomocí sync.WaitGroup na dokončení všech gorutin a poté vypíše součet přičtených hodnot do proměnné c. Pro jednoduché operace typu "fork & join" s několika úlohami by postačoval pouze kanál, avšak pro větší množství úloh je použití sync.WaitGroup dobrou volbou.

with slice

Používá-li se se slice, waitgroup se stává vynikajícím nástrojem pro správu souběžných operací bez nutnosti zámků.

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	arr := [10]int{}
12
13	for i := 0; i < 10; i++ {
14		wg.Add(1)
15		go func(id int) {
16			defer wg.Done()
17
18			arr[id] = rand.Intn(100)
19		}(i)
20	}
21
22	wg.Wait()
23	fmt.Println("Done")
24
25    for i, v := range arr {
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

Výše uvedený kód používá pouze waitgroup k tomu, aby každá gorutina souběžně generovala 10 náhodných celých čísel a uložila je na přidělený index. Kód čeká pomocí waitgroup na dokončení všech gorutin a poté vypíše "Done". Tímto způsobem umožňuje waitgroup souběžné provádění úloh v několika gorutinách, ukládání dat bez zámků a hromadné dokončení následných operací po ukončení všech úloh.

golang.org/x/sync/errgroup.ErrGroup

errgroup je rozšíření balíčku sync.WaitGroup. Na rozdíl od sync.WaitGroup errgroup zruší všechny gorutiny a vrátí chybu, pokud dojde k chybě v jedné z úloh gorutin.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}

Výše uvedený kód používá errgroup k vytvoření 10 gorutin a vyvolání chyby v páté gorutině. Úmyslně jsme vyvolali chybu v páté gorutině, abychom demonstrovali chování při výskytu chyby. V praktickém použití by se errgroup používal k vytvoření gorutin a k provádění různých následných operací v případě výskytu chyby v jednotlivých gorutinách.

once

Nástroj pro provedení kódu pouze jednou. Souvisejícím kódem lze spustit pomocí níže uvedených konstruktorů.

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc jednoduše zajistí, že daná funkce bude spuštěna pouze jednou během celého průběhu.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}

Výše uvedený kód používá sync.OnceFunc k výpisu "Hello, World!". I když je funkce once volána vícekrát, "Hello, World!" se vypíše pouze jednou.

OnceValue

OnceValue nejen zajistí, že daná funkce bude spuštěna pouze jednou, ale také uloží návratovou hodnotu funkce a vrátí ji při dalším volání.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}

Výše uvedený kód používá sync.OnceValue k inkrementaci proměnné c. I když je funkce once volána vícekrát, proměnná c se inkrementuje pouze jednou a funkce vrátí hodnotu 1.

OnceValues

OnceValues funguje stejně jako OnceValue, ale vrací více hodnot.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}

Výše uvedený kód používá sync.OnceValues k inkrementaci proměnné c. I když je funkce once volána vícekrát, proměnná c se inkrementuje pouze jednou a funkce vrátí hodnotu 1.

atomic

Balíček atomic poskytuje atomové operace. Balíček atomic poskytuje metody Add, CompareAndSwap, Load, Store, Swap, ale v současnosti se doporučuje používat typy Int64, Uint64, Pointer.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Toto je předchozí příklad. Kód používá typ atomic.Int64 k atomové inkrementaci proměnné c. Metody Add a Load umožňují atomovou inkrementaci a čtení proměnné. Metoda Store ukládá hodnotu, metoda Swap ji vymění a metoda CompareAndSwap ji vymění pouze pokud odpovídá dané podmínce.

cond

sync.Cond

Balíček cond poskytuje podmíněné proměnné. Balíček cond lze vytvořit pomocí sync.Cond a poskytuje metody Wait, Signal, Broadcast.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}

Výše uvedený kód používá sync.Cond k čekání na to, až se proměnná ready stane true. Počkejte, až se proměnná ready stane true, a poté vypište "Ready!". sync.Cond umožňuje několika gorutinám čekat na splnění dané podmínky.

Toto lze využít k implementaci jednoduché fronty.

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

Použití sync.Cond umožňuje efektivní čekání namísto spin-lock, který by zatěžoval procesor, a obnovení činnosti po splnění podmínky.

semaphore

golang.org/x/sync/semaphore.Semaphore

Balíček semaphore poskytuje semafory. Balíček semaphore lze vytvořit pomocí golang.org/x/sync/semaphore.Semaphore a poskytuje metody Acquire, Release, TryAcquire.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}

Výše uvedený kód používá semafor k vytvoření semaforu, jeho získání pomocí metody Acquire a uvolnění pomocí metody Release. Kód ukazuje, jak získat a uvolnit semafor.

Závěr

To by mělo stačit pro základní informace. Doufáme, že vám tento článek pomohl pochopit a naučit se používat gorutiny pro správu souběžnosti. Doufáme, že vám tento článek pomohl. Děkujeme.