Go Concurrency Starter Pack
Přehled
Krátký úvod
Jazyk Go nabízí mnoho nástrojů pro správu souběžnosti. V tomto článku bychom vám rádi představili některé z nich a také některé triky.
Gorutina?
Gorutina je nový typ modelu souběžnosti podporovaný jazykem Go. Obecně platí, že program získává OS vlákna od operačního systému, aby mohl současně provádět více úloh, a paralelně vykonává úlohy podle počtu jader. Pro provádění souběžnosti v menších jednotkách se v uživatelském prostoru vytvářejí zelená vlákna, která vykonávají úlohy v rámci jednoho OS vlákna. Nicméně gorutiny v jazyce Go jsou menší a efektivnější formou těchto zelených vláken. Tyto gorutiny spotřebovávají méně paměti než vlákna a mohou být vytvářeny a nahrazovány rychleji než vlákna.
Pro použití gorutiny stačí jednoduše použít klíčové slovo go. To umožňuje intuitivně spouštět synchronní kód jako asynchronní kód během psaní programu.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch)
12 time.Sleep(1 * time.Second)
13 fmt.Println("Hello, World!")
14 }()
15
16 fmt.Println("Waiting for goroutine...")
17 for range ch {}
18}
Tento kód jednoduše změní synchronní kód, který po 1 sekundě odpočinku vytiskne Hello, World!, na asynchronní tok. I když je tento příklad jednoduchý, pokud změníte složitější synchronní kód na asynchronní, čitelnost, viditelnost a srozumitelnost kódu se zlepší ve srovnání s existujícími metodami jako async/await nebo promises.
Často se však stává, že bez pochopení toku, který jednoduše volá synchronní kód asynchronně, a toku typu fork & join (podobného toku typu rozděl a panuj), může vzniknout nekvalitní kód gorutin. V tomto článku představíme několik metod a technik, jak se s takovými případy vypořádat.
Správa souběžnosti
context
Může se zdát překvapivé, že prvním zmiňovaným mechanismem správy je context. Nicméně v jazyce Go context přesahuje pouhou funkci zrušení a hraje vynikající roli při správě celého stromu úloh. Pro ty, kteří tento balíček neznají, jej stručně vysvětlím.
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background())
5 defer cancel()
6
7 go func() {
8 <-ctx.Done()
9 fmt.Println("Context is done!")
10 }()
11
12 time.Sleep(1 * time.Second)
13
14 cancel()
15
16 time.Sleep(1 * time.Second)
17}
Výše uvedený kód používá context k zobrazení zprávy Context is done! po 1 sekundě. context umožňuje ověřit stav zrušení pomocí metody Done() a poskytuje různé metody zrušení, jako jsou WithCancel, WithTimeout, WithDeadline a WithValue.
Vytvořme si jednoduchý příklad. Předpokládejme, že píšete kód pro získání dat pomocí vzoru aggregator pro získání user, post a comment. Pokud všechny požadavky musí být dokončeny do 2 sekund, můžete to napsat následovně.
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
5 defer cancel()
6
7 ch := make(chan struct{})
8 go func() {
9 defer close(ch)
10 user := getUser(ctx)
11 post := getPost(ctx)
12 comment := getComment(ctx)
13
14 fmt.Println(user, post, comment)
15 }()
16
17 select {
18 case <-ctx.Done():
19 fmt.Println("Timeout!")
20 case <-ch:
21 fmt.Println("All data is fetched!")
22 }
23}
Výše uvedený kód vypíše Timeout!, pokud se nepodaří získat všechna data do 2 sekund, a All data is fetched!, pokud se všechna data získají. Tímto způsobem lze pomocí context snadno spravovat zrušení a časové limity i v kódu, kde běží více gorutin.
Různé funkce a metody související s contextem jsou k dispozici na godoc context. Doufáme, že se naučíte ty základní a budete je moci pohodlně používat.
channel
unbuffered channel
channel je nástroj pro komunikaci mezi gorutinami. channel lze vytvořit pomocí make(chan T). V tomto případě T je typ dat, která bude channel přenášet. channel může odesílat a přijímat data pomocí <- a channel lze uzavřít pomocí close.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Výše uvedený kód používá channel k výpisu 1 a 2. Tento kód pouze ukazuje odesílání a přijímání hodnot v channelu. channel však nabízí více funkcí. Nejprve se podívejme na buffered channel a unbuffered channel. Než začneme, příklad výše je unbuffered channel, kde musí být odeslání dat do kanálu a přijetí dat z kanálu provedeno současně. Pokud k tomu nedojde současně, může dojít k deadlocku.
buffered channel
Co když výše uvedený kód místo jednoduchého výpisu provádí dva procesy, které vykonávají náročné operace? Pokud by se druhý proces při čtení a zpracování zasekl na delší dobu, první proces by se také na tuto dobu zastavil. Abychom předešli takové situaci, můžeme použít buffered channel.
1package main
2
3func main() {
4 ch := make(chan int, 2)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Výše uvedený kód používá buffered channel k výpisu 1 a 2. V tomto kódu jsme použili buffered channel, aby odesílání dat do channelu a přijímání dat z něj nemusely probíhat současně. Když má kanál buffer, vytvoří se volný prostor o délce tohoto bufferu, což může zabránit zpoždění úloh způsobenému vlivem následných úloh.
select
Při práci s více kanály lze pomocí příkazu select snadno implementovat strukturu fan-in.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10)
10 ch2 := make(chan int, 10)
11 ch3 := make(chan int, 10)
12
13 go func() {
14 for {
15 ch1 <- 1
16 time.Sleep(1 * time.Second)
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2
22 time.Sleep(2 * time.Second)
23 }
24 }()
25 go func() {
26 for {
27 ch3 <- 3
28 time.Sleep(3 * time.Second)
29 }
30 }()
31
32 for i := 0; i < 3; i++ {
33 select {
34 case v := <-ch1:
35 fmt.Println(v)
36 case v := <-ch2:
37 fmt.Println(v)
38 case v := <-ch3:
39 fmt.Println(v)
40 }
41 }
42}
Výše uvedený kód vytváří tři kanály, které pravidelně odesílají 1, 2 a 3, a poté pomocí select přijímá hodnoty z kanálů a vypisuje je. Tímto způsobem, použitím select, můžete přijímat data z více kanálů současně a zpracovávat je, jakmile jsou přijata.
for range
channel může snadno přijímat data pomocí for range. Když se for range použije na kanál, bude se provádět pokaždé, když se do kanálu přidají data, a smyčka se ukončí, když se kanál uzavře.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Výše uvedený kód používá channel k výpisu 1 a 2. V tomto kódu se for range používá k přijímání a výpisu dat pokaždé, když jsou přidána do kanálu. A smyčka se ukončí, když je kanál uzavřen.
Jak jsem již několikrát napsal, tato syntaxe může být použita i pro jednoduchou synchronizaci.
1package main
2
3func main() {
4 ch := make(chan struct{})
5 go func() {
6 defer close(ch)
7 time.Sleep(1 * time.Second)
8 fmt.Println("Hello, World!")
9 }()
10
11 fmt.Println("Waiting for goroutine...")
12 for range ch {}
13}
Výše uvedený kód po 1 sekundě odpočinku vypíše Hello, World!. V tomto kódu se channel používá ke změně synchronního kódu na asynchronní. Tímto způsobem lze pomocí channel snadno změnit synchronní kód na asynchronní a nastavit bod join.
etc
- Odeslání nebo přijetí dat z nil channelu může vést k nekonečné smyčce a deadlocku.
- Odeslání dat do channelu po jeho uzavření způsobí panic.
- Channel nemusí být explicitně uzavřen, GC jej uzavře při garbage collection.
mutex
spinlock
spinlock je synchronizační metoda, která se neustále pokouší získat zámek v cyklu. V jazyce Go lze spinlock snadno implementovat pomocí ukazatelů.
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14 runtime.Gosched()
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{}
24}
Výše uvedený kód implementuje balíček spinlock. V tomto kódu je SpinLock implementován pomocí balíčku sync/atomic. Metoda Lock se pokouší získat zámek pomocí atomic.CompareAndSwapUintptr a metoda Unlock uvolní zámek pomocí atomic.StoreUintptr. Tato metoda se neustále pokouší získat zámek, a proto neustále využívá CPU, dokud zámek nezíská, což může vést k nekonečné smyčce. Proto se spinlock doporučuje používat pouze pro jednoduchou synchronizaci nebo pro krátkodobé použití.
sync.Mutex
mutex je nástroj pro synchronizaci mezi gorutinami. mutex poskytovaný balíčkem sync nabízí metody jako Lock, Unlock, RLock, RUnlock. mutex lze vytvořit pomocí sync.Mutex a pro zámek čtení/zápisu lze použít sync.RWMutex.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex
9 var count int
10
11 go func() {
12 mu.Lock()
13 count++
14 mu.Unlock()
15 }()
16
17 mu.Lock()
18 count++
19 mu.Unlock()
20
21 println(count)
22}
Ve výše uvedeném kódu přistupují dvě gorutiny téměř současně ke stejné proměnné count. V takovém případě, pokud se kód přistupující k proměnné count označí jako kritická sekce pomocí mutexu, lze zabránit souběžnému přístupu k proměnné count. Pak tento kód vždy vypíše 2, bez ohledu na to, kolikrát se spustí.
sync.RWMutex
sync.RWMutex je mutex, který umožňuje oddělené použití zámků pro čtení a zápis. Metody RLock a RUnlock se používají k nastavení a uvolnění zámku pro čtení.
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex
9 data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock()
14 defer m.RUnlock()
15
16 value, ok := m.data[key]
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock()
22 defer m.Unlock()
23
24 m.data[key] = value
25}
Výše uvedený kód implementuje ConcurrentMap pomocí sync.RWMutex. V tomto kódu metoda Get nastavuje zámek pro čtení a metoda Set nastavuje zámek pro zápis, což umožňuje bezpečný přístup a úpravu mapy data. Důvodem potřeby zámku pro čtení je, že v případě mnoha jednoduchých operací čtení lze povolit více gorutinám souběžně provádět operace čtení bez nastavení zámku pro zápis. Tímto způsobem, pokud není nutné měnit stav a není nutné nastavovat zámek pro zápis, lze nastavením pouze zámku pro čtení zlepšit výkon.
fakelock
fakelock je jednoduchý trik, který implementuje sync.Locker. Tato struktura poskytuje stejné metody jako sync.Mutex, ale ve skutečnosti neprovádí žádnou operaci.
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}
Výše uvedený kód implementuje balíček fakelock. Tento balíček implementuje sync.Locker a poskytuje metody Lock a Unlock, ale ve skutečnosti neprovádí žádnou operaci. Proč je takový kód potřeba, vysvětlím, pokud se naskytne příležitost.
waitgroup
sync.WaitGroup
sync.WaitGroup je nástroj, který čeká, dokud nejsou dokončeny všechny operace gorutin. Poskytuje metody Add, Done, Wait. Metodou Add se přidá počet gorutin a metodou Done se signalizuje dokončení operace gorutiny. Metoda Wait čeká, dokud nejsou dokončeny všechny operace gorutin.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Výše uvedený kód používá sync.WaitGroup k tomu, aby 100 gorutin současně přičítalo hodnotu k proměnné c. V tomto kódu se sync.WaitGroup používá k čekání na dokončení všech gorutin a poté se vypíše hodnota přičtená k proměnné c. I když pro fork & join několika úloh stačí pouze kanál, pro fork & join velkého počtu úloh je sync.WaitGroup dobrou volbou.
with slice
Ve spojení se slice se waitgroup může stát vynikajícím nástrojem pro správu souběžného provádění úloh bez zámků.
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup
11 arr := [10]int{}
12
13 for i := 0; i < 10; i++ {
14 wg.Add(1)
15 go func(id int) {
16 defer wg.Done()
17
18 arr[id] = rand.Intn(100)
19 }(i)
20 }
21
22 wg.Wait()
23 fmt.Println("Done")
24
25 for i, v := range arr {
26 fmt.Printf("arr[%d] = %d\n", i, v)
27 }
28}
Výše uvedený kód používá pouze waitgroup k tomu, aby každá gorutina současně generovala 10 náhodných celých čísel a ukládala je do přiděleného indexu. V tomto kódu se waitgroup používá k čekání na dokončení všech gorutin a poté se vypíše Done. Tímto způsobem, použitím waitgroup, může více gorutin současně provádět úlohy, ukládat data bez zámků, dokud nejsou dokončeny všechny gorutiny, a poté hromadně provádět post-processing.
golang.org/x/sync/errgroup.ErrGroup
errgroup je rozšířený balíček sync.WaitGroup. Na rozdíl od sync.WaitGroup, errgroup zruší všechny gorutiny a vrátí chybu, pokud dojde k chybě v kterékoli z operací gorutin.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background())
11 _ = ctx
12
13 for i := 0; i < 10; i++ {
14 i := i
15 g.Go(func() error {
16 if i == 5 {
17 return fmt.Errorf("error")
18 }
19 return nil
20 })
21 }
22
23 if err := g.Wait(); err != nil {
24 fmt.Println(err)
25 }
26}
Výše uvedený kód používá errgroup k vytvoření 10 gorutin a způsobuje chybu v 5. gorutině. Úmyslně jsme způsobili chybu v páté gorutině, abychom ukázali případ, kdy k chybě dojde. Nicméně v praxi se errgroup používá k vytváření gorutin a k provádění různých následných zpracování v případě chyb v jednotlivých gorutinách.
once
Nástroj pro spuštění kódu, který se má spustit pouze jednou. Související kód lze spustit pomocí následujících konstruktorů.
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFunc jednoduše zajišťuje, že se daná funkce spustí pouze jednou v celém průběhu.
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() {
7 println("Hello, World!")
8 })
9
10 once()
11 once()
12 once()
13 once()
14 once()
15}
Výše uvedený kód používá sync.OnceFunc k výpisu Hello, World!. V tomto kódu se sync.OnceFunc používá k vytvoření funkce once a i když se funkce once volá vícekrát, Hello, World! se vypíše pouze jednou.
OnceValue
OnceValue nejenže zajišťuje, že se daná funkce spustí pouze jednou v celém průběhu, ale také ukládá návratovou hodnotu této funkce a vrací uloženou hodnotu při opětovném volání.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValue(func() int {
8 c += 1
9 return c
10 })
11
12 println(once())
13 println(once())
14 println(once())
15 println(once())
16 println(once())
17}
Výše uvedený kód používá sync.OnceValue k inkrementaci proměnné c o 1. V tomto kódu se sync.OnceValue používá k vytvoření funkce once a i když se funkce once volá vícekrát, proměnná c se inkrementuje pouze jednou a vrátí 1.
OnceValues
OnceValues funguje stejně jako OnceValue, ale může vracet více hodnot.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValues(func() (int, int) {
8 c += 1
9 return c, c
10 })
11
12 a, b := once()
13 println(a, b)
14 a, b = once()
15 println(a, b)
16 a, b = once()
17 println(a, b)
18 a, b = once()
19 println(a, b)
20 a, b = once()
21 println(a, b)
22}
Výše uvedený kód používá sync.OnceValues k inkrementaci proměnné c o 1. V tomto kódu se sync.OnceValues používá k vytvoření funkce once a i když se funkce once volá vícekrát, proměnná c se inkrementuje pouze jednou a vrátí 1.
atomic
Balíček atomic poskytuje atomické operace. Balíček atomic poskytuje metody jako Add, CompareAndSwap, Load, Store, Swap, ale v poslední době se doporučuje používat typy jako Int64, Uint64, Pointer.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Toto je příklad, který byl použit dříve. Jde o kód, který atomicky inkrementuje proměnnou c pomocí typu atomic.Int64. Pomocí metod Add a Load lze atomicky inkrementovat proměnnou a číst ji. Dále lze pomocí metody Store ukládat hodnoty, pomocí metody Swap vyměňovat hodnoty a pomocí metody CompareAndSwap porovnat hodnoty a vyměnit je, pokud jsou vhodné.
cond
sync.Cond
Balíček cond poskytuje podmínkové proměnné. Balíček cond lze vytvořit pomocí sync.Cond a poskytuje metody Wait, Signal, Broadcast.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{})
9 ready := false
10
11 go func() {
12 c.L.Lock()
13 ready = true
14 c.Signal()
15 c.L.Unlock()
16 }()
17
18 c.L.Lock()
19 for !ready {
20 c.Wait()
21 }
22 c.L.Unlock()
23
24 println("Ready!")
25}
Výše uvedený kód používá sync.Cond k čekání, dokud proměnná ready nebude true. V tomto kódu se sync.Cond používá k čekání, dokud proměnná ready nebude true, a poté se vypíše Ready!. Tímto způsobem lze pomocí sync.Cond zajistit, aby více gorutin čekalo, dokud není splněna určitá podmínka.
Toho lze využít k implementaci jednoduché queue.
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex
15 Cond *sync.Cond
16 Head *Node[T]
17 Tail *Node[T]
18 Len int
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{}
23 q.Cond = sync.NewCond(&q.Mutex)
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock()
29 defer q.Unlock()
30
31 node := &Node[T]{Value: value}
32 if q.Len == 0 {
33 q.Head = node
34 q.Tail = node
35 } else {
36 q.Tail.Next = node
37 q.Tail = node
38 }
39 q.Len++
40 q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock()
45 defer q.Unlock()
46
47 for q.Len == 0 {
48 q.Cond.Wait()
49 }
50
51 node := q.Head
52 q.Head = q.Head.Next
53 q.Len--
54 return node.Value
55}
Využitím sync.Cond lze efektivně čekat a po splnění podmínky znovu spustit operaci, namísto použití spin-locku, který spotřebovává mnoho CPU.
semaphore
golang.org/x/sync/semaphore.Semaphore
Balíček semaphore poskytuje semafory. Balíček semaphore lze vytvořit pomocí golang.org/x/sync/semaphore.Semaphore a poskytuje metody Acquire, Release, TryAcquire.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1)
11
12 if s.TryAcquire(1) {
13 fmt.Println("Acquired!")
14 } else {
15 fmt.Println("Not Acquired!")
16 }
17
18 s.Release(1)
19}
Výše uvedený kód ukazuje použití semaforu, jeho vytvoření a získání pomocí metody Acquire a uvolnění pomocí metody Release. Tento kód demonstruje, jak získat a uvolnit semafor pomocí semaphore.
Závěr
Tímto bychom se mohli spokojit se základními informacemi. Doufáme, že na základě obsahu tohoto článku pochopíte, jak spravovat souběžnost pomocí gorutin, a budete je moci skutečně používat. Doufáme, že vám tento článek pomohl. Děkujeme.