GoSuda

Pachet introductiv pentru concurența Go

By snowmerak
views ...

Prezentare generală

Scurtă introducere

Limbajul Go are multe instrumente pentru gestionarea concurenței. În acest articol, vom prezenta câteva dintre ele și trucuri.

Goroutine?

Goroutine este un nou model de concurență suportat de limbajul Go. În mod normal, un program primește thread-uri OS de la sistemul de operare pentru a efectua mai multe operații în același timp, și efectuează operații în paralel, câte nuclee sunt. Și pentru a efectua concurență pe unități mai mici, se generează thread-uri verzi în spațiul utilizatorului, astfel încât mai multe thread-uri verzi să ruleze într-un singur thread OS și să efectueze operații. Dar, în cazul goroutine, aceste tipuri de thread-uri verzi au fost făcute mai mici și mai eficiente. Aceste goroutine utilizează mai puțină memorie decât thread-urile și pot fi create și înlocuite mai rapid decât thread-urile.

Pentru a utiliza goroutine, trebuie doar să utilizați cuvântul cheie go. Acest lucru vă permite să executați în mod intuitiv codul sincron în cod asincron în procesul de scriere a programului.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("Hello, World!")
14    }()
15
16    fmt.Println("Waiting for goroutine...")
17    for range ch {}
18}

Acest cod modifică un cod sincron care așteaptă 1 secundă și apoi afișează Hello, World! într-un flux asincron. Exemplul de acum este simplu, dar dacă modificați un cod puțin mai complex din cod sincron în cod asincron, lizibilitatea, vizibilitatea și înțelegerea codului sunt mai bune decât metodele existente, cum ar fi async await sau promise.

Cu toate acestea, în multe cazuri, codul goroutine nu este bun dacă nu se înțelege doar fluxul de apelare asincronă a codului sincron și fluxul precum fork & join (un flux similar cu divide et impera). În acest articol, vom prezenta câteva metode și tehnici care pot fi pregătite pentru astfel de cazuri.

Gestionarea concurenței

context

Poate fi surprinzător că context apare ca prima tehnică de gestionare. Dar în limbajul Go, context joacă un rol excelent nu numai în funcția simplă de anulare, ci și în gestionarea întregului arbore de sarcini. Dacă nu știți, vom explica pe scurt pachetul.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("Context is done!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

Codul de mai sus este un cod care afișează Context is done! după 1 secundă folosind context. context poate verifica dacă a fost anulat prin metoda Done() și oferă diverse metode de anulare prin metode precum WithCancel, WithTimeout, WithDeadline, WithValue.

Să facem un exemplu simplu. Să presupunem că scrieți cod pentru a obține user, post, comment folosind modelul aggregator pentru a obține anumite date. Și dacă toate solicitările trebuie făcute în 2 secunde, le puteți scrie după cum urmează.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("Timeout!")
20    case <-ch:
21        fmt.Println("All data is fetched!")
22    }
23}

Codul de mai sus afișează Timeout! dacă nu poate prelua toate datele în 2 secunde și afișează All data is fetched! dacă preia toate datele. Dacă utilizați context în acest mod, puteți gestiona cu ușurință anulările și expirările chiar și în codul în care rulează mai multe goroutine.

Diverse funcții și metode legate de context sunt disponibile la godoc context. Sper că le veți învăța pe cele simple și veți fi capabili să le folosiți cu ușurință.

channel

unbuffered channel

channel este un instrument pentru comunicarea între goroutine. channel poate fi creat cu make(chan T). În acest caz, T este tipul de date pe care acest channel îl va transmite. channel poate trimite și primi date cu <- și poate fi închis cu close.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Codul de mai sus este un cod care afișează 1 și 2 folosind channel. Acest cod arată doar simpla trimitere și primire de valori în channel. Dar channel oferă mai multe funcții. Mai întâi, vom afla despre buffered channel și unbuffered channel. Înainte de a începe, exemplul de mai sus este un unbuffered channel, iar acțiunea de a trimite date la canal și acțiunea de a primi date trebuie să aibă loc în același timp. Dacă această acțiune nu are loc în același timp, poate apărea o blocare.

buffered channel

Ce se întâmplă dacă codul de mai sus nu este o ieșire simplă, ci două procese care efectuează operații grele? Dacă al doilea proces se blochează pentru o perioadă lungă de timp în timp ce citește și efectuează procesarea, primul proces se va opri, de asemenea, pentru acel timp. Putem folosi un buffered channel pentru a preveni această situație.

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Codul de mai sus este un cod care afișează 1 și 2 folosind un buffered channel. În acest cod, am folosit un buffered channel pentru a face ca acțiunea de a trimite date către channel și acțiunea de a primi date să nu mai trebuiască să aibă loc în același timp. Dacă adăugați un buffer la canal în acest fel, apare o rezervă pentru lungimea respectivă, iar acest lucru poate preveni întârzierea lucrărilor cauzată de influența lucrărilor ulterioare.

select

Când tratați mai multe canale, puteți implementa cu ușurință o structură fan-in utilizând sintaxa select.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

Codul de mai sus este un cod care creează 3 canale care transmit periodic 1, 2 și 3 și utilizează select pentru a primi valori de la canale și a le afișa. Dacă utilizați select în acest mod, puteți primi date simultan de la mai multe canale și le puteți procesa pe măsură ce primiți valori de la canale.

for range

channel poate primi cu ușurință date folosind for range. Dacă for range este folosit pe un canal, acesta funcționează de fiecare dată când datele sunt adăugate la acel canal și încheie bucla când canalul este închis.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Codul de mai sus este un cod care afișează 1 și 2 folosind channel. În acest cod, datele sunt primite și afișate de fiecare dată când datele sunt adăugate la canal folosind for range. Și încheie bucla când canalul este închis.

După cum am scris de câteva ori mai sus, această sintaxă poate fi folosită ca un mijloc simplu de sincronizare.

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("Hello, World!")
 9    }()
10
11    fmt.Println("Waiting for goroutine...")
12    for range ch {}
13}

Codul de mai sus este un cod care așteaptă 1 secundă și apoi afișează Hello, World!. În acest cod, am folosit un channel pentru a schimba codul sincron în cod asincron. Dacă utilizați channel în acest mod, puteți schimba cu ușurință codul sincron în cod asincron și puteți seta punctele join.

etc

  1. Dacă trimiteți sau primiți date pe un canal nul, puteți cădea într-o buclă infinită și poate apărea o blocare.
  2. Dacă trimiteți date după închiderea unui canal, apare o panică.
  3. Chiar dacă nu închideți în mod necesar un canal, GC va colecta și închide canalul.

mutex

spinlock

spinlock este o metodă de sincronizare care încearcă continuu să obțină un lock prin rularea unei bucle. În limbajul Go, puteți implementa cu ușurință un spinlock folosind pointeri.

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

Codul de mai sus este codul care implementează pachetul spinlock. În acest cod, SpinLock a fost implementat folosind pachetul sync/atomic. Metoda Lock încearcă să obțină un lock folosind atomic.CompareAndSwapUintptr, iar metoda Unlock eliberează lock-ul folosind atomic.StoreUintptr. Deoarece această metodă încearcă să obțină un lock fără pauză, aceasta va continua să folosească CPU-ul până când obține lock-ul, ceea ce poate duce la o buclă infinită. Prin urmare, este recomandat să utilizați spinlock pentru o sincronizare simplă sau numai pentru perioade scurte de timp.

sync.Mutex

mutex este un instrument pentru sincronizarea dintre goroutine. mutex oferit de pachetul sync oferă metode precum Lock, Unlock, RLock, RUnlock. mutex poate fi creat cu sync.Mutex, iar sync.RWMutex poate fi folosit pentru lock-uri de citire/scriere.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

În codul de mai sus, două goroutine accesează aceeași variabilă count aproape simultan. În acest caz, dacă folosim un mutex pentru a crea codul care accesează variabila count ca o regiune critică, accesul concurent la variabila count poate fi prevenit. Atunci acest cod va afișa 2 în mod identic, indiferent de câte ori rulează.

sync.RWMutex

sync.RWMutex este un mutex care poate fi folosit pentru a separa lock-urile de citire și lock-urile de scriere. Puteți obține și elibera un lock de citire folosind metodele RLock și RUnlock.

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

Codul de mai sus este un cod care implementează ConcurrentMap folosind sync.RWMutex. În acest cod, un lock de citire este obținut în metoda Get, iar un lock de scriere este obținut în metoda Set pentru a accesa și modifica în siguranță harta data. Motivul pentru care este necesar un lock de citire este că, în cazul în care există multe operații simple de citire, este posibil să se permită mai multor goroutine să efectueze simultan operații de citire obținând doar lock-uri de citire, fără obținerea lock-urilor de scriere. Prin aceasta, se poate îmbunătăți performanța prin obținerea doar a unui lock de citire, în cazurile în care nu este necesar un lock de scriere, deoarece nu există o modificare a stării.

fakelock

fakelock este un truc simplu care implementează sync.Locker. Această structură oferă aceleași metode ca și sync.Mutex, dar nu efectuează nicio operație reală.

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

Codul de mai sus este codul care implementează pachetul fakelock. Acest pachet oferă metode Lock și Unlock prin implementarea sync.Locker, dar nu efectuează nicio operație reală. Voi descrie de ce este necesar acest cod când voi avea ocazia.

waitgroup

sync.WaitGroup

sync.WaitGroup este un instrument care așteaptă până când toate goroutine-urile și-au terminat operațiile. Oferă metode Add, Done, Wait, iar numărul de goroutine este adăugat cu metoda Add, iar metoda Done anunță că operația goroutine-ului este finalizată. Și metoda Wait așteaptă până când operațiile tuturor goroutine-urilor sunt finalizate.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11    pentru i := 0; i < 100 ; i++ {
12        wg.Add(1)
13        go func() {
14            defer wg.Done()
15            c.Add(1)
16        }()
17    }
18
19    wg.Wait()
20    println(c.Load())
21}

Acest cod utilizează sync.WaitGroup pentru a avea 100 de goroutine-uri adăugând simultan valori la variabila c. În acest cod, sync.WaitGroup este utilizat pentru a aștepta până când toate goroutine-urile se termină, apoi afișează valoarea adăugată la variabila c. Simplu, dacă trebuie doar să faceți un fork & join pentru câteva operații, utilizarea doar a canalelor este suficientă, dar dacă trebuie să faceți un fork & join pentru un volum mare de operații, utilizarea sync.WaitGroup este o opțiune bună.

cu slice

Dacă este utilizat cu un slice, waitgroup poate fi un instrument excelent pentru gestionarea operațiunilor simultane fără blocări.

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	arr := [10]int{}
12
13	for i := 0; i < 10; i++ {
14		wg.Add(1)
15		go func(id int) {
16			defer wg.Done()
17
18			arr[id] = rand.Intn(100)
19		}(i)
20	}
21
22	wg.Wait()
23	fmt.Println("Done")
24
25    for i, v := range arr {
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

Codul de mai sus este un cod care folosește doar waitgroup pentru a genera simultan 10 numere întregi aleatorii de către fiecare goroutine și le stochează la indexul alocat. În acest cod, waitgroup este utilizat pentru a aștepta până când toate goroutine-urile se termină, apoi afișează Done. Folosind waitgroup în acest fel, mai multe goroutine-uri pot efectua operații simultan, pot stoca date fără blocări până când toate goroutine-urile se termină și pot efectua post-procesarea în bloc după ce operațiile se termină.

golang.org/x/sync/errgroup.ErrGroup

errgroup este un pachet care extinde sync.WaitGroup. Spre deosebire de sync.WaitGroup, dacă apare o eroare în operația unui goroutine, errgroup anulează toate goroutine-urile și returnează o eroare.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}

Codul de mai sus este un cod care creează 10 goroutine-uri folosind errgroup și generează o eroare în cel de-al 5-lea goroutine. Am demonstrat cazul în care apare o eroare generând în mod intenționat o eroare în cel de-al cincilea goroutine. Cu toate acestea, în practică, este mai bine să utilizați errgroup pentru a crea goroutine-uri și să efectuați diverse post-procesări pentru cazurile în care apar erori în fiecare goroutine.

once

Este un instrument pentru executarea codului care trebuie executat o singură dată. Codul asociat poate fi executat prin intermediul constructorului de mai jos.

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc pur și simplu permite ca funcția respectivă să fie executată o singură dată în întregime.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}

Codul de mai sus este un cod care afișează Hello, World! folosind sync.OnceFunc. În acest cod, o funcție once este creată folosind sync.OnceFunc, și chiar dacă funcția once este apelată de mai multe ori, Hello, World! este afișat o singură dată.

OnceValue

OnceValue nu doar permite ca funcția respectivă să fie executată o singură dată în întregime, dar stochează valoarea returnată a funcției respective și returnează valoarea stocată când este apelată din nou.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}

Codul de mai sus este un cod care crește variabila c cu 1 folosind sync.OnceValue. În acest cod, o funcție once este creată folosind sync.OnceValue, și chiar dacă funcția once este apelată de mai multe ori, se returnează 1, valoarea la care variabila c a fost incrementată o singură dată.

OnceValues

OnceValues funcționează la fel ca OnceValue, dar poate returna mai multe valori.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}

Codul de mai sus este un cod care crește variabila c cu 1 folosind sync.OnceValues. În acest cod, o funcție once este creată folosind sync.OnceValues, și chiar dacă funcția once este apelată de mai multe ori, se returnează 1, valoarea la care variabila c a fost incrementată o singură dată.

atomic

Pachetul atomic este un pachet care oferă operații atomice. Pachetul atomic oferă metode precum Add, CompareAndSwap, Load, Store, Swap, dar în ultima vreme se recomandă utilizarea tipurilor precum Int64, Uint64, Pointer.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Acesta este exemplul folosit mai devreme. Este un cod care incrementează atomic variabila c folosind tipul atomic.Int64. Puteți incrementa atomic variabila și citi variabila cu metodele Add și Load. De asemenea, puteți stoca valori cu metoda Store, puteți schimba valori cu metoda Swap și puteți compara valorile și le puteți schimba dacă se potrivesc cu metoda CompareAndSwap.

cond

sync.Cond

Pachetul cond este un pachet care oferă variabile condiționale. Pachetul cond poate fi creat cu sync.Cond și oferă metodele Wait, Signal, Broadcast.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}

Codul de mai sus este un cod care așteaptă până când variabila ready devine true folosind sync.Cond. În acest cod, se folosește sync.Cond pentru a aștepta până când variabila ready devine true, apoi se afișează Ready!. Folosind sync.Cond în acest mod, mai multe goroutine-uri pot fi făcute să aștepte simultan până când o anumită condiție este îndeplinită.

Folosind această metodă, puteți implementa o coadă simplă.

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

Folosind sync.Cond în acest fel, puteți aștepta eficient și puteți reporni când o condiție este îndeplinită, în loc să utilizați mult CPU cu un spin-lock.

semaphore

golang.org/x/sync/semaphore.Semaphore

Pachetul semaphore este un pachet care oferă semafoare. Pachetul semaphore poate fi creat cu golang.org/x/sync/semaphore.Semaphore și oferă metodele Acquire, Release, TryAcquire.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}

Codul de mai sus este un cod care creează un semafor folosind semaphore, și folosește semaforul pentru a obține semaforul cu metoda Acquire și eliberează semaforul cu metoda Release. În acest cod, am demonstrat cum să obțineți și să eliberați un semafor folosind semaphore.

În concluzie

Cred că aceasta este suficient pentru noțiunile de bază. Pe baza conținutului acestui articol, sper că ați înțeles cum să gestionați concurența folosind goroutine-uri și că puteți să le folosiți în practică. Sper că acest articol a fost de ajutor. Vă mulțumesc.