GoSuda

Pacchetto introduttivo alla concorrenza in Go

By snowmerak
views ...

Panoramica

Breve introduzione

Il linguaggio Go offre numerosi strumenti per la gestione della concorrenza. In questo articolo, presenteremo alcuni di questi strumenti e trucchi.

Goroutine?

Una goroutine è un nuovo modello di concorrenza supportato dal linguaggio Go. In genere, un programma riceve thread del sistema operativo (OS) dall'OS per eseguire più operazioni contemporaneamente ed esegue operazioni in parallelo in base al numero di core. Per eseguire la concorrenza a unità più piccole, i thread verdi vengono creati nello spazio utente e più thread verdi eseguono operazioni alternandosi all'interno di un singolo thread del sistema operativo. Tuttavia, nel caso delle goroutine, questi thread verdi sono resi più piccoli ed efficienti. Queste goroutine utilizzano meno memoria rispetto ai thread e possono essere create e sostituite più rapidamente.

Per utilizzare una goroutine, è sufficiente usare la parola chiave go. Ciò consente di eseguire in modo intuitivo codice sincrono come codice asincrono durante il processo di scrittura del programma.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("Ciao, Mondo!")
14    }()
15
16    fmt.Println("In attesa della goroutine...")
17    for range ch {}
18}

Questo codice modifica semplicemente il codice sincrono che, dopo una pausa di 1 secondo, stampa Ciao, Mondo! in un flusso asincrono. L'esempio attuale è semplice, ma se si modifica un codice leggermente più complesso da codice sincrono a codice asincrono, la leggibilità, la visibilità e la comprensione del codice diventano migliori rispetto ai metodi esistenti come async await o promise.

Tuttavia, in molti casi, si crea un codice goroutine non ottimale se il flusso di semplice chiamata asincrona del codice sincrono e il flusso come fork & join (un flusso simile a divide et impera) non vengono compresi. In questo articolo, presenteremo alcuni metodi e tecniche per prepararsi a questi casi.

Gestione della concorrenza

Context

L'apparizione di context come prima tecnica di gestione potrebbe sorprendere. Tuttavia, in Go, context svolge un ruolo eccellente nella gestione dell'intero albero delle operazioni, oltre alla semplice funzione di annullamento. Per coloro che non lo conoscono, descriveremo brevemente questo pacchetto.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("Il contesto è completato!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

Il codice sopra riportato è un codice che utilizza context per stampare Il contesto è completato! dopo 1 secondo. context può verificare se l'annullamento è stato eseguito attraverso il metodo Done() e fornisce vari metodi di annullamento attraverso metodi come WithCancel, WithTimeout, WithDeadline, WithValue.

Creiamo un semplice esempio. Supponiamo di scrivere del codice che recupera user, post e comment utilizzando il pattern aggregator per recuperare alcuni dati. E se tutte le richieste devono essere completate entro 2 secondi, può essere scritto come segue.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("Timeout!")
20    case <-ch:
21        fmt.Println("Tutti i dati sono stati recuperati!")
22    }
23}

Il codice sopra stampa Timeout! se non riesce a recuperare tutti i dati entro 2 secondi, e stampa Tutti i dati sono stati recuperati! se recupera tutti i dati. Utilizzando context in questo modo, è possibile gestire facilmente l'annullamento e il timeout anche nel codice in cui operano più goroutine.

Varie funzioni e metodi relativi al contesto possono essere verificati in godoc context. Si spera che impariate quelli semplici e possiate usarli comodamente.

Channel

Channel non bufferizzato

channel è uno strumento per la comunicazione tra goroutine. channel può essere creato con make(chan T). In questo caso, T è il tipo di dati che channel trasmetterà. channel può inviare e ricevere dati con <- e può chiudere channel con close.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Il codice sopra è un codice che stampa 1 e 2 utilizzando channel. Questo codice mostra semplicemente come inviare e ricevere valori in channel. Tuttavia, channel offre più funzioni di questa. Prima di tutto, diamo un'occhiata a buffered channel e unbuffered channel. L'esempio scritto sopra è un unbuffered channel, e l'azione di inviare dati al canale e l'azione di ricevere dati devono essere eseguite contemporaneamente. Se queste azioni non vengono eseguite contemporaneamente, può verificarsi un deadlock.

Channel bufferizzato

Cosa succede se il codice sopra non è una semplice stampa, ma 2 processi che eseguono operazioni pesanti? Se il secondo processo si blocca per un lungo periodo di tempo durante la lettura ed esecuzione dell'elaborazione, anche il primo processo si fermerà per quel tempo. Possiamo utilizzare il buffered channel per evitare questa situazione.

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Il codice sopra è un codice che stampa 1 e 2 utilizzando buffered channel. In questo codice, usando buffered channel, abbiamo fatto in modo che l'azione di inviare dati al channel e l'azione di ricevere dati non debbano essere eseguite contemporaneamente. Se si crea un buffer nel channel in questo modo, si crea uno spazio libero per la lunghezza corrispondente, e si può prevenire il ritardo di lavoro causato dall'influenza dei lavori a valle.

Select

Quando si gestiscono più canali, è possibile implementare facilmente una struttura fan-in usando la sintassi select.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

Il codice sopra è un codice che crea 3 canali che trasmettono periodicamente 1, 2 e 3, e usa select per ricevere valori dal canale e stamparli. Utilizzando select in questo modo, è possibile ricevere dati da più canali contemporaneamente ed elaborare i valori dal canale non appena vengono ricevuti.

For range

channel può ricevere facilmente dati utilizzando for range. Quando for range viene utilizzato su un canale, viene eseguito ogni volta che i dati vengono aggiunti al canale, e termina il loop quando il canale viene chiuso.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Il codice sopra è un codice che stampa 1 e 2 utilizzando channel. In questo codice, for range viene utilizzato per ricevere e stampare dati ogni volta che i dati vengono aggiunti al canale. E quando il canale viene chiuso, il loop termina.

Come scritto sopra alcune volte, questa sintassi può essere utilizzata come un semplice mezzo di sincronizzazione.

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("Ciao, Mondo!")
 9    }()
10
11    fmt.Println("In attesa della goroutine...")
12    for range ch {}
13}

Il codice sopra è un codice che, dopo una pausa di 1 secondo, stampa Ciao, Mondo!. In questo codice, channel è stato utilizzato per modificare il codice sincrono in codice asincrono. Utilizzando channel in questo modo, è possibile modificare facilmente il codice sincrono in codice asincrono e impostare il punto di join.

etc

  1. Se i dati vengono inviati o ricevuti su un canale nil, si può cadere in un loop infinito e può verificarsi un deadlock.
  2. Se i dati vengono inviati dopo la chiusura del canale, si verifica un panic.
  3. Anche se il canale non viene chiuso, il GC lo raccoglie e lo chiude.

Mutex

Spinlock

spinlock è un metodo di sincronizzazione che tenta ripetutamente di acquisire un lock eseguendo un ciclo. In Go, è facile implementare uno spinlock utilizzando puntatori.

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

Il codice sopra è un codice che implementa il pacchetto spinlock. In questo codice, sync/atomic viene utilizzato per implementare SpinLock. Nel metodo Lock, atomic.CompareAndSwapUintptr viene utilizzato per tentare un lock, e nel metodo Unlock, atomic.StoreUintptr viene utilizzato per rilasciare il lock. Questo metodo tenta ripetutamente di acquisire il lock, quindi continua a utilizzare la CPU fino a quando non ottiene il lock, e può cadere in un loop infinito. Pertanto, spinlock è meglio utilizzarlo per una semplice sincronizzazione o quando lo si usa solo per un breve periodo di tempo.

sync.Mutex

mutex è uno strumento per la sincronizzazione tra goroutine. Il mutex fornito dal pacchetto sync fornisce metodi come Lock, Unlock, RLock, RUnlock. mutex può essere creato con sync.Mutex, e sync.RWMutex può essere utilizzato per i lock di lettura/scrittura.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

Nel codice sopra, quasi contemporaneamente, due goroutine accedono alla stessa variabile count. In questo caso, se il codice che accede alla variabile count viene reso una sezione critica utilizzando mutex, è possibile bloccare l'accesso simultaneo alla variabile count. Allora questo codice stamperà sempre lo stesso 2, indipendentemente da quante volte viene eseguito.

sync.RWMutex

sync.RWMutex è un mutex che può essere utilizzato per distinguere i lock di lettura e i lock di scrittura. È possibile acquisire e rilasciare i lock di lettura utilizzando i metodi RLock e RUnlock.

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

Il codice sopra è un codice che implementa ConcurrentMap utilizzando sync.RWMutex. In questo codice, il lock di lettura viene acquisito nel metodo Get e il lock di scrittura viene acquisito nel metodo Set per accedere e modificare la mappa data in modo sicuro. La ragione per cui è necessario il lock di lettura è che, quando ci sono molte semplici operazioni di lettura, è possibile consentire a più goroutine di eseguire operazioni di lettura contemporaneamente acquisendo solo il lock di lettura senza acquisire il lock di scrittura. In questo modo, è possibile migliorare le prestazioni acquisendo solo il lock di lettura nei casi in cui non è necessario acquisire il lock di scrittura perché non vi sono modifiche allo stato.

fakelock

fakelock è un semplice trucco che implementa sync.Locker. Questa struct fornisce gli stessi metodi di sync.Mutex, ma in realtà non esegue alcuna operazione.

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

Il codice sopra è un codice che implementa il pacchetto fakelock. Questo pacchetto implementa sync.Locker per fornire i metodi Lock e Unlock, ma in realtà non esegue alcuna operazione. Descriveremo perché questo codice è necessario quando si presenterà l'occasione.

WaitGroup

sync.WaitGroup

sync.WaitGroup è uno strumento che attende fino a quando tutte le operazioni della goroutine non sono completate. Fornisce i metodi Add, Done, Wait, e il numero di goroutine viene aggiunto con il metodo Add, e il metodo Done indica che l'operazione della goroutine è completata. E il metodo Wait attende fino a quando non sono completate tutte le operazioni della goroutine.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Questo codice utilizza sync.WaitGroup per fare in modo che 100 goroutine aggiungano simultaneamente un valore alla variabile c. In questo codice, sync.WaitGroup è impiegato per attendere il completamento di tutte le goroutine, dopodiché viene stampato il valore incrementato della variabile c. Sebbene per semplici operazioni di fork & join l'utilizzo dei canali sia sufficiente, nel caso di un elevato numero di operazioni di fork & join, l'uso di sync.WaitGroup rappresenta un'ottima alternativa.

con slice

Se utilizzato con slice, il waitgroup può essere uno strumento eccellente per la gestione di operazioni concorrenti senza l'uso di lock.

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	arr := [10]int{}
12
13	for i := 0; i < 10; i++ {
14		wg.Add(1)
15		go func(id int) {
16			defer wg.Done()
17
18			arr[id] = rand.Intn(100)
19		}(i)
20	}
21
22	wg.Wait()
23	fmt.Println("Done")
24
25    for i, v := range arr {
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

Il codice sopra utilizza esclusivamente waitgroup per consentire a ciascuna goroutine di generare contemporaneamente 10 numeri interi casuali e salvarli nell'indice assegnato. In questo codice, waitgroup è utilizzato per attendere che tutte le goroutine terminino, dopodiché viene stampato Done. Utilizzando waitgroup in questo modo, più goroutine possono eseguire operazioni simultaneamente, salvare dati senza lock fino al completamento di tutte le goroutine ed eseguire processi di post-elaborazione in blocco al termine delle operazioni.

golang.org/x/sync/errgroup.ErrGroup

errgroup è un pacchetto che estende sync.WaitGroup. A differenza di sync.WaitGroup, se si verifica un errore durante l'esecuzione di una goroutine, errgroup annulla tutte le goroutine e restituisce l'errore.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}

Il codice sopra crea 10 goroutine utilizzando errgroup e genera un errore nella quinta goroutine. L'errore è stato generato intenzionalmente nella quinta goroutine per dimostrare il caso in cui si verifica un errore. Tuttavia, nell'uso effettivo, si dovrebbe usare errgroup per creare goroutine ed eseguire varie operazioni di post-elaborazione nel caso in cui si verifichi un errore in ciascuna goroutine.

once

Uno strumento per eseguire codice che deve essere eseguito una sola volta. Il codice correlato può essere eseguito tramite i seguenti costruttori.

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc consente semplicemente che una funzione venga eseguita una sola volta durante l'intero ciclo di vita del programma.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}

Il codice sopra utilizza sync.OnceFunc per stampare Hello, World!. In questo codice, sync.OnceFunc viene utilizzato per creare la funzione once e, anche se la funzione once viene chiamata più volte, Hello, World! viene stampato una sola volta.

OnceValue

OnceValue non solo consente che una funzione venga eseguita una sola volta durante l'intero ciclo di vita del programma, ma memorizza anche il valore di ritorno di quella funzione e restituisce il valore memorizzato quando viene chiamata di nuovo.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}

Il codice sopra utilizza sync.OnceValue per incrementare la variabile c di 1. In questo codice, sync.OnceValue viene utilizzato per creare la funzione once e, anche se la funzione once viene chiamata più volte, restituisce 1, poiché la variabile c viene incrementata una sola volta.

OnceValues

OnceValues funziona in modo analogo a OnceValue, ma può restituire più valori.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}

Il codice sopra utilizza sync.OnceValues per incrementare la variabile c di 1. In questo codice, sync.OnceValues viene utilizzato per creare la funzione once e, anche se la funzione once viene chiamata più volte, restituisce 1, poiché la variabile c viene incrementata una sola volta.

atomic

Il pacchetto atomic è un pacchetto che fornisce operazioni atomiche. Il pacchetto atomic fornisce metodi come Add, CompareAndSwap, Load, Store, Swap, ma recentemente si raccomanda l'uso di tipi come Int64, Uint64, Pointer.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Questo è l'esempio utilizzato in precedenza. Il codice utilizza il tipo atomic.Int64 per incrementare atomicamente la variabile c. È possibile incrementare atomicamente la variabile e leggerne il valore con i metodi Add e Load. Inoltre, è possibile memorizzare valori con il metodo Store, scambiare valori con il metodo Swap e, dopo aver confrontato i valori, scambiarli con il metodo CompareAndSwap se appropriato.

cond

sync.Cond

Il pacchetto cond è un pacchetto che fornisce variabili di condizione. Il pacchetto cond può essere creato con sync.Cond e fornisce i metodi Wait, Signal e Broadcast.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}

Il codice sopra utilizza sync.Cond per attendere che la variabile ready diventi true. In questo codice, sync.Cond viene utilizzato per attendere che la variabile ready diventi true, dopodiché viene stampato Ready!. Utilizzando sync.Cond in questo modo, è possibile far sì che più goroutine attendano contemporaneamente fino a quando non viene soddisfatta una determinata condizione.

Questo può essere utilizzato per implementare una semplice queue.

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

Utilizzando sync.Cond in questo modo, è possibile attendere in modo efficiente ed eseguire nuovamente operazioni quando le condizioni sono soddisfatte, invece di utilizzare un spin-lock che consuma molta CPU.

semaphore

golang.org/x/sync/semaphore.Semaphore

Il pacchetto semaphore è un pacchetto che fornisce semafori. Il pacchetto semaphore può essere creato con golang.org/x/sync/semaphore.Semaphore e fornisce i metodi Acquire, Release e TryAcquire.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}

Il codice sopra crea un semaforo utilizzando il pacchetto semaphore e acquisisce il semaforo con il metodo Acquire e lo rilascia con il metodo Release. In questo codice è stato dimostrato come acquisire e rilasciare un semaforo utilizzando il pacchetto semaphore.

Conclusione

Le informazioni di base terminano qui. Sulla base del contenuto di questo articolo, spero che abbiate compreso come gestire la concorrenza utilizzando le goroutine e che siate in grado di utilizzarle concretamente. Spero che questo articolo sia stato utile. Grazie.