Go Concurrency Starter Pack
Prezentare generală
Scurtă introducere
Limbajul Go oferă numeroase instrumente pentru gestionarea concurenței. În acest articol, vom prezenta câteva dintre acestea și diverse trucuri.
goroutine?
goroutine este un nou tip de model de concurență susținut de limbajul Go. În mod obișnuit, un program primește thread-uri ale sistemului de operare (OS threads) de la sistemul de operare pentru a executa simultan mai multe sarcini, efectuând operații în paralel, în funcție de numărul de nuclee. Pentru a realiza o concurență la o scară mai mică, se creează green threads în userland, permițând mai multor green threads să ruleze în cadrul unui singur OS thread. Cu toate acestea, goroutine-urile au făcut acest tip de green threads mai mici și mai eficiente. Aceste goroutine utilizează mai puțină memorie decât thread-urile și pot fi create și înlocuite mai rapid decât thread-urile.
Pentru a utiliza goroutine-uri, este suficient să folosiți cuvântul cheie go. Acest lucru permite executarea intuitivă a codului sincron ca și cod asincron în timpul scrierii programului.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch)
12 time.Sleep(1 * time.Second)
13 fmt.Println("Hello, World!")
14 }()
15
16 fmt.Println("Waiting for goroutine...")
17 for range ch {}
18}
Acest cod transformă un cod sincron simplu, care afișează Hello, World! după o secundă de pauză, într-un flux asincron. Deși exemplul actual este simplu, atunci când un cod puțin mai complex este transformat din cod sincron în cod asincron, lizibilitatea, vizibilitatea și înțelegerea codului devin superioare metodelor existente, cum ar fi async/await sau promise-uri.
Totuși, în multe cazuri, fără o înțelegere a fluxului de apelare asincronă a codului sincron și a fluxurilor de tip fork & join (asemănătoare cu abordarea divide et impera), se pot produce coduri goroutine de calitate inferioară. În acest articol, vom prezenta câteva metode și tehnici pentru a preveni astfel de situații.
Gestionarea concurenței
context
Poate fi surprinzător că context apare ca prima tehnică de gestionare. Cu toate acestea, în limbajul Go, context joacă un rol excelent în gestionarea întregului arbore de sarcini, dincolo de simpla funcționalitate de anulare. Voi explica pe scurt acest pachet pentru cei care nu sunt familiarizați.
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background())
5 defer cancel()
6
7 go func() {
8 <-ctx.Done()
9 fmt.Println("Context is done!")
10 }()
11
12 time.Sleep(1 * time.Second)
13
14 cancel()
15
16 time.Sleep(1 * time.Second)
17}
Codul de mai sus utilizează context pentru a afișa Context is done! după o secundă. context poate verifica starea de anulare prin metoda Done() și oferă diverse metode de anulare prin WithCancel, WithTimeout, WithDeadline, WithValue etc.
Să creăm un exemplu simplu. Să presupunem că scrieți cod pentru a prelua datele user, post și comment utilizând pattern-ul aggregator pentru a obține anumite date. Dacă toate cererile trebuie să fie finalizate în decurs de 2 secunde, puteți scrie codul astfel:
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
5 defer cancel()
6
7 ch := make(chan struct{})
8 go func() {
9 defer close(ch)
10 user := getUser(ctx)
11 post := getPost(ctx)
12 comment := getComment(ctx)
13
14 fmt.Println(user, post, comment)
15 }()
16
17 select {
18 case <-ctx.Done():
19 fmt.Println("Timeout!")
20 case <-ch:
21 fmt.Println("All data is fetched!")
22 }
23}
Codul de mai sus afișează Timeout! dacă toate datele nu sunt preluate în 2 secunde și All data is fetched! dacă toate datele sunt preluate. Prin utilizarea context în acest mod, anularea și timeout-urile pot fi gestionate cu ușurință chiar și în codul în care rulează mai multe goroutine-uri.
Diverse funcții și metode legate de context pot fi găsite la godoc context. Sperăm că veți putea învăța și utiliza cu ușurință conceptele simple.
channel
unbuffered channel
channel este un instrument pentru comunicarea între goroutine-uri. Un channel poate fi creat cu make(chan T). Aici, T este tipul de date pe care channel-ul îl va transmite. channel-ul poate trimite și primi date cu <- și poate fi închis cu close.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Codul de mai sus afișează 1 și 2 utilizând un channel. Acest cod demonstrează doar trimiterea și primirea de valori într-un channel. Cu toate acestea, channel-ul oferă mai multe funcționalități decât atât. Să începem prin a explora buffered channel și unbuffered channel. Înainte de a începe, exemplul de mai sus este un unbuffered channel, ceea ce înseamnă că acțiunea de a trimite date către canal și acțiunea de a primi date trebuie să aibă loc simultan. Dacă aceste acțiuni nu se întâmplă simultan, poate apărea un deadlock.
buffered channel
Ce se întâmplă dacă codul de mai sus nu este o simplă afișare, ci două procese care efectuează sarcini intensive? Dacă al doilea proces este blocat pentru o perioadă lungă de timp în timp ce citește și procesează, primul proces va fi și el blocat în acel interval. Putem utiliza un buffered channel pentru a preveni o astfel de situație.
1package main
2
3func main() {
4 ch := make(chan int, 2)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Codul de mai sus afișează 1 și 2 utilizând un buffered channel. În acest cod, am utilizat un buffered channel pentru a permite ca acțiunea de a trimite date către canal și acțiunea de a primi date să nu aibă loc simultan. Prin adăugarea unui buffer la canal, se creează o marjă de manevră de lungimea respectivă, ceea ce poate preveni întârzierile de lucru cauzate de influența sarcinilor ulterioare.
select
Atunci când se gestionează mai multe canale, instrucțiunea select poate fi utilizată pentru a implementa cu ușurință o structură fan-in.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10)
10 ch2 := make(chan int, 10)
11 ch3 := make(chan int, 10)
12
13 go func() {
14 for {
15 ch1 <- 1
16 time.Sleep(1 * time.Second)
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2
22 time.Sleep(2 * time.Second)
23 }
24 }()
25 go func() {
26 for {
27 ch3 <- 3
28 time.Sleep(3 * time.Second)
29 }
30 }()
31
32 for i := 0; i < 3; i++ {
33 select {
34 case v := <-ch1:
35 fmt.Println(v)
36 case v := <-ch2:
37 fmt.Println(v)
38 case v := <-ch3:
39 fmt.Println(v)
40 }
41 }
42}
Codul de mai sus creează 3 canale care transmit periodic 1, 2 și 3 și utilizează select pentru a primi și afișa valori din canale. Prin utilizarea select în acest mod, se pot primi date simultan din mai multe canale și se pot procesa valorile pe măsură ce acestea sunt primite.
for range
Un channel poate primi cu ușurință date utilizând for range. Când for range este utilizat cu un canal, acesta va funcționa de fiecare dată când datele sunt adăugate la canal și va ieși din buclă atunci când canalul este închis.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Codul de mai sus afișează 1 și 2 utilizând un channel. În acest cod, for range este utilizat pentru a primi și afișa date de fiecare dată când datele sunt adăugate la canal. Apoi, bucla se termină când canalul este închis.
Așa cum am menționat de câteva ori mai sus, această sintaxă poate fi utilizată și ca un simplu mijloc de sincronizare.
1package main
2
3func main() {
4 ch := make(chan struct{})
5 go func() {
6 defer close(ch)
7 time.Sleep(1 * time.Second)
8 fmt.Println("Hello, World!")
9 }()
10
11 fmt.Println("Waiting for goroutine...")
12 for range ch {}
13}
Codul de mai sus afișează "Hello, World!" după o pauză de 1 secundă. În acest cod, channel-ul este utilizat pentru a transforma codul sincron în cod asincron. Prin utilizarea channel-ului în acest mod, codul sincron poate fi transformat cu ușurință în cod asincron, iar punctele de join pot fi stabilite.
etc
- Dacă trimiteți sau primiți date de la un nil channel, puteți intra într-o buclă infinită, rezultând un deadlock.
- Dacă trimiteți date către un canal după ce a fost închis, va apărea o panică.
- Chiar dacă un canal nu este închis explicit, GC-ul îl va închide în timpul colectării gunoiului.
mutex
spinlock
spinlock este o metodă de sincronizare care încearcă în mod repetat să obțină un lock într-o buclă. În limbajul Go, un spinlock poate fi implementat cu ușurință utilizând pointeri.
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14 runtime.Gosched()
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{}
24}
Codul de mai sus implementează pachetul spinlock. În acest cod, SpinLock este implementat utilizând pachetul sync/atomic. Metoda Lock încearcă să obțină un lock utilizând atomic.CompareAndSwapUintptr, iar metoda Unlock eliberează lock-ul utilizând atomic.StoreUintptr. Această metodă încearcă în mod continuu să obțină lock-ul, consumând continuu CPU până la obținerea lock-ului, ceea ce poate duce la o buclă infinită. Prin urmare, spinlock ar trebui utilizat pentru sincronizări simple sau pentru perioade scurte de timp.
sync.Mutex
mutex este un instrument pentru sincronizarea între goroutine-uri. mutex-ul furnizat de pachetul sync oferă metode precum Lock, Unlock, RLock, RUnlock. Un mutex poate fi creat ca sync.Mutex, iar sync.RWMutex poate fi utilizat pentru lock-uri de citire/scriere.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex
9 var count int
10
11 go func() {
12 mu.Lock()
13 count++
14 mu.Unlock()
15 }()
16
17 mu.Lock()
18 count++
19 mu.Unlock()
20
21 println(count)
22}
În codul de mai sus, două goroutine-uri accesează aproape simultan aceeași variabilă count. Prin utilizarea unui mutex pentru a transforma codul care accesează variabila count într-o secțiune critică, accesul concurent la variabila count poate fi prevenit. Astfel, acest cod va afișa întotdeauna 2, indiferent de câte ori este rulat.
sync.RWMutex
sync.RWMutex este un mutex care permite utilizarea separată a lock-urilor de citire și scriere. Metoda RLock și RUnlock pot fi utilizate pentru a aplica și a elibera lock-uri de citire.
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex
9 data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock()
14 defer m.RUnlock()
15
16 value, ok := m.data[key]
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock()
22 defer m.Unlock()
23
24 m.data[key] = value
25}
Codul de mai sus implementează ConcurrentMap utilizând sync.RWMutex. În acest cod, metoda Get aplică un lock de citire, iar metoda Set aplică un lock de scriere, permițând accesul și modificarea sigură a hărții data. Necesitatea unui lock de citire apare atunci când există multe operații simple de citire; în acest caz, nu se aplică un lock de scriere, ci doar un lock de citire, permițând mai multor goroutine-uri să efectueze operații de citire simultan. Prin aceasta, performanța poate fi îmbunătățită prin aplicarea doar a unui lock de citire atunci când nu este necesară aplicarea unui lock de scriere, deoarece nu există modificări ale stării.
fakelock
fakelock este un truc simplu care implementează sync.Locker. Această structură oferă aceleași metode ca sync.Mutex, dar nu efectuează nicio operație reală.
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}
Codul de mai sus implementează pachetul fakelock. Acest pachet implementează sync.Locker, oferind metode Lock și Unlock, dar nu efectuează nicio operație reală. Voi explica de ce este necesar un astfel de cod când voi avea ocazia.
waitgroup
sync.WaitGroup
sync.WaitGroup este un instrument care așteaptă finalizarea tuturor sarcinilor goroutine-urilor. Acesta oferă metodele Add, Done, Wait. Metoda Add adaugă numărul de goroutine-uri, Done semnalează finalizarea sarcinii unui goroutine, iar Wait așteaptă până la finalizarea tuturor sarcinilor goroutine-urilor.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Codul de mai sus utilizează sync.WaitGroup pentru a incrementa variabila c de către 100 de goroutine-uri simultan. În acest cod, sync.WaitGroup este utilizat pentru a aștepta până la finalizarea tuturor goroutine-urilor, apoi afișează valoarea adăugată la variabila c. Deși utilizarea canalelor este suficientă pentru a fork & join câteva sarcini, sync.WaitGroup este o opțiune bună pentru a fork & join un număr mare de sarcini.
with slice
Atunci când este utilizat cu un slice, waitgroup poate deveni un instrument excelent pentru gestionarea sarcinilor concurente fără lock-uri.
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup
11 arr := [10]int{}
12
13 for i := 0; i < 10; i++ {
14 wg.Add(1)
15 go func(id int) {
16 defer wg.Done()
17
18 arr[id] = rand.Intn(100)
19 }(i)
20 }
21
22 wg.Wait()
23 fmt.Println("Done")
24
25 for i, v := range arr {
26 fmt.Printf("arr[%d] = %d\n", i, v)
27 }
28}
Codul de mai sus utilizează doar waitgroup pentru ca fiecare goroutine să genereze simultan 10 numere întregi aleatorii și să le stocheze la indexul alocat. În acest cod, waitgroup este utilizat pentru a aștepta până la finalizarea tuturor goroutine-urilor, apoi afișează "Done". Prin utilizarea waitgroup în acest mod, mai multe goroutine-uri pot efectua sarcini simultan, pot stoca date fără lock-uri până la finalizarea tuturor goroutine-urilor și pot efectua post-procesare în bloc după finalizarea sarcinii.
golang.org/x/sync/errgroup.ErrGroup
errgroup este un pachet care extinde sync.WaitGroup. Spre deosebire de sync.WaitGroup, errgroup anulează toate goroutine-urile și returnează o eroare dacă apare o eroare în oricare dintre sarcinile goroutine-urilor.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background())
11 _ = ctx
12
13 for i := 0; i < 10; i++ {
14 i := i
15 g.Go(func() error {
16 if i == 5 {
17 return fmt.Errorf("error")
18 }
19 return nil
20 })
21 }
22
23 if err := g.Wait(); err != nil {
24 fmt.Println(err)
25 }
26}
Codul de mai sus utilizează errgroup pentru a crea 10 goroutine-uri și a genera o eroare în cel de-al cincilea goroutine. Am generat intenționat o eroare în al cincilea goroutine pentru a demonstra cazurile în care apar erori. Cu toate acestea, în utilizarea reală, errgroup poate fi utilizat pentru a crea goroutine-uri și a efectua diverse post-procesări în cazul în care apar erori în fiecare goroutine.
once
Un instrument pentru a executa cod care trebuie rulat o singură dată. Codul relevant poate fi executat prin constructorul de mai jos.
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFunc asigură pur și simplu că funcția respectivă poate fi executată o singură dată pe parcursul întregii execuții.
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() {
7 println("Hello, World!")
8 })
9
10 once()
11 once()
12 once()
13 once()
14 once()
15}
Codul de mai sus afișează "Hello, World!" utilizând sync.OnceFunc. În acest cod, sync.OnceFunc este utilizat pentru a crea funcția once, iar chiar dacă funcția once este apelată de mai multe ori, "Hello, World!" va fi afișat o singură dată.
OnceValue
OnceValue nu doar execută funcția respectivă o singură dată, ci stochează și valoarea returnată a funcției și o returnează la apelările ulterioare.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValue(func() int {
8 c += 1
9 return c
10 })
11
12 println(once())
13 println(once())
14 println(once())
15 println(once())
16 println(once())
17}
Codul de mai sus utilizează sync.OnceValue pentru a incrementa variabila c cu 1. În acest cod, sync.OnceValue este utilizat pentru a crea funcția once, iar chiar dacă funcția once este apelată de mai multe ori, variabila c va returna 1, fiind incrementată o singură dată.
OnceValues
OnceValues funcționează la fel ca OnceValue, dar poate returna mai multe valori.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValues(func() (int, int) {
8 c += 1
9 return c, c
10 })
11
12 a, b := once()
13 println(a, b)
14 a, b = once()
15 println(a, b)
16 a, b = once()
17 println(a, b)
18 a, b = once()
19 println(a, b)
20 a, b = once()
21 println(a, b)
22}
Codul de mai sus utilizează sync.OnceValues pentru a incrementa variabila c cu 1. În acest cod, sync.OnceValues este utilizat pentru a crea funcția once, iar chiar dacă funcția once este apelată de mai multe ori, variabila c va returna 1, fiind incrementată o singură dată.
atomic
Pachetul atomic oferă operații atomice. Pachetul atomic oferă metode precum Add, CompareAndSwap, Load, Store, Swap, dar recent se recomandă utilizarea tipurilor precum Int64, Uint64, Pointer.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Acesta este un exemplu utilizat anterior. Este un cod care incrementează atomic variabila c utilizând tipul atomic.Int64. Cu metodele Add și Load, variabila poate fi incrementată atomic și valoarea ei poate fi citită. De asemenea, cu metoda Store se poate salva o valoare, cu metoda Swap se poate schimba o valoare, iar cu metoda CompareAndSwap se poate compara o valoare și, dacă este potrivită, se poate schimba.
cond
sync.Cond
Pachetul cond oferă variabile de condiție. Pachetul cond poate fi creat ca sync.Cond și oferă metodele Wait, Signal, Broadcast.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{})
9 ready := false
10
11 go func() {
12 c.L.Lock()
13 ready = true
14 c.Signal()
15 c.L.Unlock()
16 }()
17
18 c.L.Lock()
19 for !ready {
20 c.Wait()
21 }
22 c.L.Unlock()
23
24 println("Ready!")
25}
Codul de mai sus așteaptă ca variabila ready să devină true utilizând sync.Cond. În acest cod, sync.Cond este utilizat pentru a aștepta până când variabila ready devine true, apoi afișează "Ready!". Prin utilizarea sync.Cond în acest mod, mai multe goroutine-uri pot fi făcute să aștepte simultan până când o anumită condiție este îndeplinită.
Acest lucru poate fi utilizat pentru a implementa o coadă simplă.
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex
15 Cond *sync.Cond
16 Head *Node[T]
17 Tail *Node[T]
18 Len int
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{}
23 q.Cond = sync.NewCond(&q.Mutex)
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock()
29 defer q.Unlock()
30
31 node := &Node[T]{Value: value}
32 if q.Len == 0 {
33 q.Head = node
34 q.Tail = node
35 } else {
36 q.Tail.Next = node
37 q.Tail = node
38 }
39 q.Len++
40 q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock()
45 defer q.Unlock()
46
47 for q.Len == 0 {
48 q.Cond.Wait()
49 }
50
51 node := q.Head
52 q.Head = q.Head.Next
53 q.Len--
54 return node.Value
55}
Prin utilizarea sync.Cond în acest fel, se poate aștepta eficient și se poate relua operația atunci când condiția este îndeplinită, în loc să se utilizeze spin-lock care consumă mult CPU.
semaphore
golang.org/x/sync/semaphore.Semaphore
Pachetul semaphore oferă semafoare. Pachetul semaphore poate fi creat ca golang.org/x/sync/semaphore.Semaphore și oferă metodele Acquire, Release, TryAcquire.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1)
11
12 if s.TryAcquire(1) {
13 fmt.Println("Acquired!")
14 } else {
15 fmt.Println("Not Acquired!")
16 }
17
18 s.Release(1)
19}
Codul de mai sus creează un semafor utilizând semaphore și demonstrează obținerea și eliberarea semaforului cu metodele Acquire și Release. Acest cod a prezentat modul de obținere și eliberare a unui semafor utilizând semaphore.
Concluzie
Cred că elementele de bază sunt suficiente până aici. Sper că, pe baza conținutului acestui articol, veți înțelege cum să gestionați concurența utilizând goroutine-uri și că veți putea să le utilizați în practică. Sper că acest articol v-a fost de ajutor. Vă mulțumesc.