Go Concurrency Starter Pack
Prehľad
Stručný úvod
Jazyk Go ponúka mnoho nástrojov na správu súbežnosti. V tomto článku by sme vám chceli predstaviť niektoré z nich a súvisiace techniky.
Goroutine?
Goroutine je nový typ modelu súbežnosti podporovaný v jazyku Go. Zvyčajne program získava OS vlákna (OS threads) od operačného systému na vykonávanie viacerých úloh súbežne a paralelne vykonáva úlohy v počte zodpovedajúcom počtu jadier. Pre menšie jednotky súbežnosti sa v užívateľskom priestore (userland) vytvárajú zelené vlákna (green threads), ktoré vykonávajú úlohy striedavo v rámci jedného OS vlákna. Avšak v prípade goroutín boli tieto zelené vlákna vytvorené menšie a efektívnejšie. Tieto goroutiny spotrebúvajú menej pamäte ako vlákna a môžu byť vytvorené a prepínané rýchlejšie ako vlákna.
Na použitie goroutín stačí jednoducho použiť kľúčové slovo go. To umožňuje intuitívne spúšťať synchrónny kód asynchrónne počas písania programu.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch)
12 time.Sleep(1 * time.Second)
13 fmt.Println("Hello, World!")
14 }()
15
16 fmt.Println("Waiting for goroutine...")
17 for range ch {}
18}
Tento kód jednoducho zmení synchrónny kód, ktorý po 1 sekunde vypíše Hello, World!, na asynchrónny tok. Aj keď je tento príklad jednoduchý, zmena zložitejšieho synchrónneho kódu na asynchrónny kód výrazne zlepšuje čitateľnosť, prehľadnosť a zrozumiteľnosť kódu v porovnaní s existujúcimi metódami ako async await alebo promise.
V mnohých prípadoch však môže dôjsť k vytvoreniu nekvalitného goroutine kódu, ak sa nepochopí tok jednoduchého asynchrónneho volania synchrónneho kódu a tok typu fork & join (podobný toku rozdeľ a panuj). V tomto článku predstavíme niekoľko metód a techník na predchádzanie takýmto situáciám.
Správa súbežnosti
context
Môže sa zdať prekvapujúce, že prvou technikou správy je context. Avšak v jazyku Go context presahuje jednoduchú funkciu zrušenia a hrá vynikajúcu úlohu pri správe celého stromu úloh. Pre tých, ktorí to nevedia, stručne vysvetlím tento balík.
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background())
5 defer cancel()
6
7 go func() {
8 <-ctx.Done()
9 fmt.Println("Context is done!")
10 }()
11
12 time.Sleep(1 * time.Second)
13
14 cancel()
15
16 time.Sleep(1 * time.Second)
17}
Vyššie uvedený kód používa context na výpis správy Context is done! po 1 sekunde. context umožňuje skontrolovať stav zrušenia pomocou metódy Done() a poskytuje rôzne metódy zrušenia, ako sú WithCancel, WithTimeout, WithDeadline, WithValue.
Pripravme si jednoduchý príklad. Predpokladajme, že píšete kód, ktorý na získanie údajov používa vzor aggregator na získanie user, post a comment. Ak sa všetky požiadavky musia uskutočniť do 2 sekúnd, môžete to napísať takto:
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
5 defer cancel()
6
7 ch := make(chan struct{})
8 go func() {
9 defer close(ch)
10 user := getUser(ctx)
11 post := getPost(ctx)
12 comment := getComment(ctx)
13
14 fmt.Println(user, post, comment)
15 }()
16
17 select {
18 case <-ctx.Done():
19 fmt.Println("Timeout!")
20 case <-ch:
21 fmt.Println("All data is fetched!")
22 }
23}
Vyššie uvedený kód vypíše Timeout!, ak sa všetky dáta nezískajú do 2 sekúnd, a All data is fetched!, ak sa všetky dáta získajú. Týmto spôsobom, ak použijete context, môžete jednoducho spravovať zrušenie a časové limity aj v kóde, kde beží viacero goroutín.
Rôzne súvisiace funkcie a metódy pre context sú dostupné na godoc context. Dúfam, že sa naučíte tie jednoduché a budete ich môcť pohodlne používať.
channel
unbuffered channel
channel je nástroj na komunikáciu medzi goroutinami. channel je možné vytvoriť pomocou make(chan T). V tomto prípade T je typ dát, ktoré bude channel prenášať. channel môže odosielať a prijímať dáta pomocou <- a môže byť zatvorený pomocou close.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Vyššie uvedený kód používa channel na výpis hodnôt 1 a 2. Tento kód len ukazuje odosielanie a prijímanie hodnôt cez channel. Avšak channel ponúka viac funkcií. Najprv sa pozrime na buffered channel a unbuffered channel. Predtým, ako začneme, príklad uvedený vyššie je unbuffered channel, kde odosielanie dát do kanála a prijímanie dát z kanála musia prebiehať súčasne. Ak sa tieto akcie neuskutočnia súčasne, môže dôjsť k deadlocku.
buffered channel
Čo ak vyššie uvedený kód nebol len jednoduchý výstup, ale dva procesy vykonávajúce náročné úlohy? Ak by druhý proces pri čítaní a spracovaní uviazol na dlhší čas, prvý proces by sa tiež zastavil na rovnaký čas. Na zabránenie takejto situácii môžeme použiť buffered channel.
1package main
2
3func main() {
4 ch := make(chan int, 2)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Vyššie uvedený kód je kód, ktorý používa buffered channel na výstup 1 a 2. V tomto kóde sa buffered channel používa na zabezpečenie toho, aby odosielanie dát do channel a prijímanie dát z channel nemuselo prebiehať súčasne. Ak sa do kanála pridá vyrovnávacia pamäť, vytvorí sa priestor rovnakej dĺžky, čo môže zabrániť oneskoreniu úloh spôsobenému vplyvom následných úloh.
select
Pri práci s viacerými kanálmi môžete pomocou syntaxe select jednoducho implementovať štruktúru fan-in.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10)
10 ch2 := make(chan int, 10)
11 ch3 := make(chan int, 10)
12
13 go func() {
14 for {
15 ch1 <- 1
16 time.Sleep(1 * time.Second)
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2
22 time.Sleep(2 * time.Second)
23 }
24 }()
25 go func() {
26 for {
27 ch3 <- 3
28 time.Sleep(3 * time.Second)
29 }
30 }()
31
32 for i := 0; i < 3; i++ {
33 select {
34 case v := <-ch1:
35 fmt.Println(v)
36 case v := <-ch2:
37 fmt.Println(v)
38 case v := <-ch3:
39 fmt.Println(v)
40 }
41 }
42}
Vyššie uvedený kód vytvára tri kanály, ktoré pravidelne prenášajú 1, 2 a 3, a používa select na prijímanie a výpis hodnôt z kanálov. Týmto spôsobom, ak použijete select, môžete prijímať dáta z viacerých kanálov súčasne a spracovať ich hneď, ako prídu.
for range
channel dokáže ľahko prijímať dáta pomocou for range. Ak sa for range použije pre kanál, bude sa vykonávať vždy, keď sa do kanála pridajú dáta, a ukončí slučku, keď sa kanál uzavrie.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Vyššie uvedený kód je kód, ktorý používa channel na výstup 1 a 2. V tomto kóde sa for range používa na prijímanie a výstup dát vždy, keď sa do kanála pridajú dáta. A slučka sa ukončí, keď sa kanál uzavrie.
Ako už bolo niekoľkokrát uvedené, túto syntax možno použiť aj ako jednoduchý synchronizačný mechanizmus.
1package main
2
3func main() {
4 ch := make(chan struct{})
5 go func() {
6 defer close(ch)
7 time.Sleep(1 * time.Second)
8 fmt.Println("Hello, World!")
9 }()
10
11 fmt.Println("Waiting for goroutine...")
12 for range ch {}
13}
Vyššie uvedený kód vypíše "Hello, World!" po jednej sekunde. V tomto kóde sa channel použil na zmenu synchrónneho kódu na asynchrónny. Týmto spôsobom, ak sa použije channel, je možné ľahko zmeniť synchrónny kód na asynchrónny a nastaviť miesta join.
etc
- Ak odošlete alebo prijmete dáta z nil channelu, môže dôjsť k deadlocku v nekonečnej slučke.
- Ak odošlete dáta po zatvorení channelu, dôjde k panike.
- Channel netreba explicitne zatvárať, GC ho automaticky uzavrie.
mutex
spinlock
spinlock je synchronizačná metóda, ktorá sa opakovane pokúša získať zámok v cykle. V jazyku Go možno spinlock jednoducho implementovať pomocou ukazovateľov.
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14 runtime.Gosched()
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{}
24}
Vyššie uvedený kód implementuje balík spinlock. V tomto kóde je SpinLock implementovaný pomocou balíka sync/atomic. Metóda Lock sa pokúša získať zámok pomocou atomic.CompareAndSwapUintptr a metóda Unlock uvoľní zámok pomocou atomic.StoreUintptr. Tento prístup sa neustále pokúša získať zámok, a preto nepretržite využíva CPU, kým sa zámok nezíska, čo môže viesť k nekonečnej slučke. Preto je spinlock vhodný na jednoduchú synchronizáciu alebo na použitie len na krátky čas.
sync.Mutex
mutex je nástroj na synchronizáciu medzi goroutinami. mutex poskytovaný balíkom sync ponúka metódy ako Lock, Unlock, RLock, RUnlock. mutex je možné vytvoriť pomocou sync.Mutex a na zámok pre čítanie/zápis možno použiť aj sync.RWMutex.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex
9 var count int
10
11 go func() {
12 mu.Lock()
13 count++
14 mu.Unlock()
15 }()
16
17 mu.Lock()
18 count++
19 mu.Unlock()
20
21 println(count)
22}
Vo vyššie uvedenom kóde sa dve goroutiny takmer súčasne pokúšajú pristupovať k rovnakej premennej count. V takomto prípade, ak sa kód, ktorý pristupuje k premennej count, označí ako kritická sekcia pomocou mutex, zabráni sa súbežnému prístupu k premennej count. Potom tento kód vždy vypíše 2 bez ohľadu na to, koľkokrát sa spustí.
sync.RWMutex
sync.RWMutex je mutex, ktorý umožňuje oddelené použitie zámkov na čítanie a zápis. Metódy RLock a RUnlock možno použiť na nastavenie a uvoľnenie zámku na čítanie.
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex
9 data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock()
14 defer m.RUnlock()
15
16 value, ok := m.data[key]
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock()
22 defer m.Unlock()
23
24 m.data[key] = value
25}
Vyššie uvedený kód implementuje ConcurrentMap pomocou sync.RWMutex. V tomto kóde metóda Get nastavuje zámok na čítanie a metóda Set nastavuje zámok na zápis, čo umožňuje bezpečný prístup a úpravu mapy data. Dôvodom potreby zámku na čítanie je, že v prípadoch, keď existuje mnoho jednoduchých operácií čítania, je možné použiť iba zámok na čítanie bez zámku na zápis, čo umožňuje viacerým goroutinám vykonávať operácie čítania súčasne. Týmto spôsobom, ak nie je potrebné nastavovať zámok na zápis, pretože nedochádza k zmene stavu, možno použiť iba zámok na čítanie, čím sa zvýši výkon.
fakelock
fakelock je jednoduchý trik, ktorý implementuje sync.Locker. Táto štruktúra poskytuje rovnaké metódy ako sync.Mutex, ale v skutočnosti nevykonáva žiadnu operáciu.
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}
Vyššie uvedený kód implementuje balík fakelock. Tento balík implementuje sync.Locker a poskytuje metódy Lock a Unlock, ale v skutočnosti nevykonáva žiadne operácie. Ak bude príležitosť, vysvetlím, prečo je takýto kód potrebný.
waitgroup
sync.WaitGroup
sync.WaitGroup je nástroj, ktorý čaká, kým sa všetky úlohy goroutín nedokončia. Poskytuje metódy Add, Done, Wait, pričom metóda Add pridáva počet goroutín a metóda Done signalizuje, že úloha goroutiny je dokončená. A metóda Wait čaká, kým sa nedokončia všetky úlohy goroutín.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Vyššie uvedený kód používa sync.WaitGroup na sčítanie hodnôt do premennej c zo 100 súbežných goroutín. V tomto kóde sa sync.WaitGroup používa na počkanie, kým sa všetky goroutiny nedokončia, a potom sa vypíše sčítaná hodnota premennej c. Aj keď pre jednoduché fork & join niekoľkých úloh stačí použiť iba kanály, pre veľké množstvo úloh fork & join je sync.WaitGroup tiež dobrou voľbou.
with slice
Ak sa waitgroup použije so slice, môže byť vynikajúcim nástrojom na správu súbežných operácií bez zámkov.
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup
11 arr := [10]int{}
12
13 for i := 0; i < 10; i++ {
14 wg.Add(1)
15 go func(id int) {
16 defer wg.Done()
17
18 arr[id] = rand.Intn(100)
19 }(i)
20 }
21
22 wg.Wait()
23 fmt.Println("Done")
24
25 for i, v := range arr {
26 fmt.Printf("arr[%d] = %d\n", i, v)
27 }
28}
Vyššie uvedený kód používa iba waitgroup na generovanie 10 náhodných celých čísel súčasne v každej goroutine a ich uloženie na priradený index. V tomto kóde sa waitgroup používa na počkanie, kým sa všetky goroutiny nedokončia, a potom sa vypíše Done. Týmto spôsobom, ak sa použije waitgroup, môžu viaceré goroutiny vykonávať úlohy súčasne a ukladať dáta bez zámku, kým sa všetky goroutiny nedokončia, a po dokončení úloh môžu byť hromadne spracované.
golang.org/x/sync/errgroup.ErrGroup
errgroup je balík, ktorý rozširuje sync.WaitGroup. Na rozdiel od sync.WaitGroup, errgroup zruší všetky goroutiny a vráti chybu, ak sa počas vykonávania ktorejkoľvek goroutiny vyskytne chyba.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background())
11 _ = ctx
12
13 for i := 0; i < 10; i++ {
14 i := i
15 g.Go(func() error {
16 if i == 5 {
17 return fmt.Errorf("error")
18 }
19 return nil
20 })
21 }
22
23 if err := g.Wait(); err != nil {
24 fmt.Println(err)
25 }
26}
Vyššie uvedený kód používa errgroup na vytvorenie 10 goroutín a zámerne vyvoláva chybu v piatej goroutine, aby demonštroval prípad chyby. Avšak v skutočnom použití sa errgroup používa na vytváranie goroutín a na vykonávanie rôznych následných spracovaní v prípade, že sa v každej goroutine vyskytne chyba.
once
Toto je nástroj na vykonanie kódu, ktorý sa má spustiť iba raz. Súvisiaci kód môžete spustiť pomocou nasledujúcich konštruktorov.
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFunc jednoducho zabezpečí, že daná funkcia sa vykoná iba raz v celom rozsahu.
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() {
7 println("Hello, World!")
8 })
9
10 once()
11 once()
12 once()
13 once()
14 once()
15}
Vyššie uvedený kód používa sync.OnceFunc na výpis správy "Hello, World!". V tomto kóde sa sync.OnceFunc používa na vytvorenie funkcie once a aj keď sa funkcia once volá viackrát, "Hello, World!" sa vypíše iba raz.
OnceValue
OnceValue nielenže zabezpečí, že daná funkcia sa vykoná iba raz, ale aj uloží návratovú hodnotu tejto funkcie a vráti uloženú hodnotu pri ďalšom volaní.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValue(func() int {
8 c += 1
9 return c
10 })
11
12 println(once())
13 println(once())
14 println(once())
15 println(once())
16 println(once())
17}
Vyššie uvedený kód používa sync.OnceValue na inkrementovanie premennej c o 1. V tomto kóde sa sync.OnceValue používa na vytvorenie funkcie once a aj keď sa funkcia once volá viackrát, premenná c sa zvýši iba raz a vráti 1.
OnceValues
OnceValues funguje rovnako ako OnceValue, ale môže vracať viacero hodnôt.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValues(func() (int, int) {
8 c += 1
9 return c, c
10 })
11
12 a, b := once()
13 println(a, b)
14 a, b = once()
15 println(a, b)
16 a, b = once()
17 println(a, b)
18 a, b = once()
19 println(a, b)
20 a, b = once()
21 println(a, b)
22}
Vyššie uvedený kód používa sync.OnceValues na inkrementovanie premennej c o 1. V tomto kóde sa sync.OnceValues používa na vytvorenie funkcie once a aj keď sa funkcia once volá viackrát, premenná c sa zvýši iba raz a vráti 1.
atomic
Balík atomic poskytuje atómové operácie. Balík atomic poskytuje metódy ako Add, CompareAndSwap, Load, Store, Swap, ale v poslednej dobe sa odporúča používať typy ako Int64, Uint64, Pointer.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Toto je príklad, ktorý bol použitý predtým. Je to kód, ktorý atómovo zvyšuje premennú c pomocou typu atomic.Int64. Pomocou metód Add a Load možno atómovo zvýšiť premennú a prečítať ju. Okrem toho možno pomocou metódy Store uložiť hodnotu, pomocou metódy Swap vymeniť hodnotu a pomocou metódy CompareAndSwap porovnať hodnotu a vymeniť ju, ak je vhodná.
cond
sync.Cond
Balík cond poskytuje podmienené premenné. Balík cond je možné vytvoriť pomocou sync.Cond a poskytuje metódy Wait, Signal, Broadcast.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{})
9 ready := false
10
11 go func() {
12 c.L.Lock()
13 ready = true
14 c.Signal()
15 c.L.Unlock()
16 }()
17
18 c.L.Lock()
19 for !ready {
20 c.Wait()
21 }
22 c.L.Unlock()
23
24 println("Ready!")
25}
Vyššie uvedený kód používa sync.Cond na čakanie, kým sa premenná ready nestane true. V tomto kóde sa sync.Cond používa na čakanie, kým sa premenná ready nestane true, a potom sa vypíše "Ready!". Týmto spôsobom, ak použijete sync.Cond, môžete nechať viacero goroutín čakať, kým sa súčasne nesplnia určité podmienky.
Pomocou tohto sa dá implementovať jednoduchá queue.
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex
15 Cond *sync.Cond
16 Head *Node[T]
17 Tail *Node[T]
18 Len int
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{}
23 q.Cond = sync.NewCond(&q.Mutex)
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock()
29 defer q.Unlock()
30
31 node := &Node[T]{Value: value}
32 if q.Len == 0 {
33 q.Head = node
34 q.Tail = node
35 } else {
36 q.Tail.Next = node
37 q.Tail = node
38 }
39 q.Len++
40 q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock()
45 defer q.Unlock()
46
47 for q.Len == 0 {
48 q.Cond.Wait()
49 }
50
51 node := q.Head
52 q.Head = q.Head.Next
53 q.Len--
54 return node.Value
55}
Takýmto využitím sync.Cond je možné efektívne čakať a obnoviť operáciu, keď je podmienka splnená, namiesto toho, aby sa používal spin-lock, ktorý spotrebúva veľa CPU.
semaphore
golang.org/x/sync/semaphore.Semaphore
Balík semaphore poskytuje semafor. Balík semaphore možno vytvoriť pomocou golang.org/x/sync/semaphore.Semaphore a poskytuje metódy Acquire, Release, TryAcquire.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1)
11
12 if s.TryAcquire(1) {
13 fmt.Println("Acquired!")
14 } else {
15 fmt.Println("Not Acquired!")
16 }
17
18 s.Release(1)
19}
Vyššie uvedený kód používa semaphore na vytvorenie semafora a demonštruje získanie semafora pomocou metódy Acquire a jeho uvoľnenie pomocou metódy Release. V tomto kóde sa ukázal spôsob získania a uvoľnenia semafora pomocou semaphore.
Záver
Myslím, že toto sú základné informácie. Dúfam, že na základe obsahu tohto článku pochopíte, ako spravovať súbežnosť pomocou goroutín, a budete ich môcť aj reálne použiť. Dúfam, že tento článok vám bol užitočný. Ďakujem.