Go Concurrency Starter Pack
Aperçu
Courte introduction
Le langage Go offre de nombreux outils pour la gestion de la concurrence. Cet article présentera certains d'entre eux ainsi que des astuces.
Goroutine ?
Une goroutine est un nouveau type de modèle de concurrence pris en charge par le langage Go. Généralement, un programme reçoit des threads OS du système d'exploitation pour effectuer plusieurs tâches simultanément, exécutant des tâches en parallèle autant de fois qu'il y a de cœurs. Pour une concurrence à une échelle plus petite, des "green threads" sont créés dans l'espace utilisateur, permettant à plusieurs "green threads" de s'exécuter et d'effectuer des tâches au sein d'un seul thread OS. Cependant, les goroutines ont rendu ce type de "green threads" encore plus petits et plus efficaces. Ces goroutines utilisent moins de mémoire que les threads et peuvent être créées et remplacées plus rapidement que les threads.
Pour utiliser une goroutine, il suffit d'utiliser le mot-clé go. Cela permet d'exécuter intuitivement du code synchrone de manière asynchrone lors de l'écriture d'un programme.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch)
12 time.Sleep(1 * time.Second)
13 fmt.Println("Hello, World!")
14 }()
15
16 fmt.Println("Waiting for goroutine...")
17 for range ch {}
18}
Ce code convertit un code synchrone simple qui attend 1 seconde puis affiche Hello, World! en un flux asynchrone. Bien que l'exemple actuel soit simple, la conversion d'un code légèrement plus complexe du mode synchrone au mode asynchrone améliore considérablement la lisibilité, la visibilité et la compréhension du code par rapport aux approches existantes telles que async await ou promise.
Cependant, dans de nombreux cas, de mauvais codes de goroutine sont créés si l'on ne comprend pas le flux d'appel asynchrone simple de ce code synchrone et les flux de type fork & join (similaire au flux diviser pour régner). Cet article présentera quelques méthodes et techniques pour faire face à de telles situations.
Gestion de la concurrence
context
Il peut être surprenant que context apparaisse comme la première technique de gestion. Cependant, dans le langage Go, context joue un rôle exceptionnel dans la gestion de l'ensemble de l'arborescence des tâches, au-delà d'une simple fonction d'annulation. Pour ceux qui ne le connaissent pas, je vais brièvement expliquer ce package.
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background())
5 defer cancel()
6
7 go func() {
8 <-ctx.Done()
9 fmt.Println("Context is done!")
10 }()
11
12 time.Sleep(1 * time.Second)
13
14 cancel()
15
16 time.Sleep(1 * time.Second)
17}
Le code ci-dessus utilise context pour afficher Context is done! après 1 seconde. context permet de vérifier l'état d'annulation via la méthode Done(), et offre diverses méthodes d'annulation telles que WithCancel, WithTimeout, WithDeadline, WithValue.
Créons un exemple simple. Supposons que vous écriviez du code pour récupérer user, post et comment en utilisant le modèle aggregator pour obtenir des données. Et si toutes les requêtes doivent être effectuées en 2 secondes, vous pouvez l'écrire comme suit.
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
5 defer cancel()
6
7 ch := make(chan struct{})
8 go func() {
9 defer close(ch)
10 user := getUser(ctx)
11 post := getPost(ctx)
12 comment := getComment(ctx)
13
14 fmt.Println(user, post, comment)
15 }()
16
17 select {
18 case <-ctx.Done():
19 fmt.Println("Timeout!")
20 case <-ch:
21 fmt.Println("All data is fetched!")
22 }
23}
Le code ci-dessus affichera Timeout! si toutes les données ne sont pas récupérées en 2 secondes, et All data is fetched! si toutes les données sont récupérées. En utilisant context de cette manière, vous pouvez facilement gérer les annulations et les délais d'attente même dans du code où plusieurs goroutines s'exécutent.
Diverses fonctions et méthodes liées au context sont disponibles sur godoc context. Nous espérons que vous pourrez les apprendre et les utiliser facilement.
channel
unbuffered channel
Un channel est un outil de communication entre goroutines. Un channel peut être créé avec make(chan T). Ici, T est le type de données que le channel transmettra. Les données peuvent être envoyées et reçues via le channel avec <-, et le channel peut être fermé avec close.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Le code ci-dessus utilise un channel pour afficher 1 et 2. Ce code montre simplement l'envoi et la réception de valeurs via un channel. Cependant, un channel offre plus de fonctionnalités que cela. Tout d'abord, examinons les buffered channel et les unbuffered channel. Avant de commencer, l'exemple ci-dessus est un unbuffered channel, où l'envoi de données vers le canal et la réception de données du canal doivent se produire simultanément. Si ces actions ne se produisent pas simultanément, un deadlock peut survenir.
buffered channel
Et si le code ci-dessus était deux processus effectuant des tâches lourdes plutôt qu'une simple sortie ? Si le deuxième processus se bloque pendant une longue période en lisant et en traitant, le premier processus s'arrêtera également pendant cette période. Pour éviter une telle situation, nous pouvons utiliser un buffered channel.
1package main
2
3func main() {
4 ch := make(chan int, 2)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Le code ci-dessus utilise un buffered channel pour afficher 1 et 2. Dans ce code, un buffered channel est utilisé pour que l'envoi de données au channel et la réception de données du channel n'aient pas besoin de se produire simultanément. En ajoutant un tampon au canal de cette manière, une marge de manœuvre est créée pour la longueur spécifiée, ce qui peut éviter les retards de tâche causés par l'impact des tâches de priorité inférieure.
select
Lorsque vous manipulez plusieurs canaux, l'instruction select peut être utilisée pour implémenter facilement une structure fan-in.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10)
10 ch2 := make(chan int, 10)
11 ch3 := make(chan int, 10)
12
13 go func() {
14 for {
15 ch1 <- 1
16 time.Sleep(1 * time.Second)
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2
22 time.Sleep(2 * time.Second)
23 }
24 }()
25 go func() {
26 for {
27 ch3 <- 3
28 time.Sleep(3 * time.Second)
29 }
30 }()
31
32 for i := 0; i < 3; i++ {
33 select {
34 case v := <-ch1:
35 fmt.Println(v)
36 case v := <-ch2:
37 fmt.Println(v)
38 case v := <-ch3:
39 fmt.Println(v)
40 }
41 }
42}
Le code ci-dessus crée trois canaux qui transmettent périodiquement 1, 2, 3, et utilise select pour recevoir des valeurs des canaux et les afficher. En utilisant select de cette manière, vous pouvez recevoir des données de plusieurs canaux simultanément et les traiter au fur et à mesure qu'elles sont reçues.
for range
Un channel peut facilement recevoir des données en utilisant for range. Lorsque for range est utilisé sur un canal, il s'exécute chaque fois que des données sont ajoutées au canal, et la boucle se termine lorsque le canal est fermé.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Le code ci-dessus utilise un channel pour afficher 1 et 2. Dans ce code, for range est utilisé pour recevoir et afficher les données chaque fois qu'elles sont ajoutées au canal. La boucle se termine lorsque le canal est fermé.
Comme mentionné plusieurs fois ci-dessus, cette syntaxe peut également être utilisée comme un simple moyen de synchronisation.
1package main
2
3func main() {
4 ch := make(chan struct{})
5 go func() {
6 defer close(ch)
7 time.Sleep(1 * time.Second)
8 fmt.Println("Hello, World!")
9 }()
10
11 fmt.Println("Waiting for goroutine...")
12 for range ch {}
13}
Le code ci-dessus affiche Hello, World! après une pause d'une seconde. Dans ce code, un channel est utilisé pour convertir du code synchrone en code asynchrone. En utilisant un channel de cette manière, le code synchrone peut être facilement converti en code asynchrone, et les points de join peuvent être définis.
etc
- L'envoi ou la réception de données sur un canal
nilpeut entraîner une boucle infinie et un deadlock. - L'envoi de données après la fermeture d'un canal provoque un panic.
- Même si le canal n'est pas explicitement fermé, le GC le ferme lors de la collecte.
mutex
spinlock
Un spinlock est une méthode de synchronisation qui tente continuellement d'acquérir un verrou en bouclant. Dans le langage Go, un spinlock peut être facilement implémenté en utilisant des pointeurs.
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14 runtime.Gosched()
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{}
24}
Le code ci-dessus implémente le package spinlock. Dans ce code, SpinLock est implémenté en utilisant le package sync/atomic. La méthode Lock tente d'acquérir le verrou en utilisant atomic.CompareAndSwapUintptr, et la méthode Unlock libère le verrou en utilisant atomic.StoreUintptr. Cette méthode tente d'acquérir le verrou sans relâche, ce qui entraîne une utilisation continue du CPU jusqu'à l'obtention du verrou, pouvant conduire à une boucle infinie. Par conséquent, il est recommandé d'utiliser un spinlock uniquement pour une synchronisation simple ou pour de courtes périodes.
sync.Mutex
Un mutex est un outil de synchronisation entre goroutines. Le mutex fourni par le package sync offre des méthodes telles que Lock, Unlock, RLock, RUnlock. Un mutex peut être créé avec sync.Mutex, et un verrou en lecture/écriture peut également être utilisé avec sync.RWMutex.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex
9 var count int
10
11 go func() {
12 mu.Lock()
13 count++
14 mu.Unlock()
15 }()
16
17 mu.Lock()
18 count++
19 mu.Unlock()
20
21 println(count)
22}
Dans le code ci-dessus, deux goroutines accèdent presque simultanément à la même variable count. À ce moment-là, en utilisant un mutex pour créer une section critique pour le code accédant à la variable count, l'accès concurrent à la variable count peut être empêché. Alors, ce code affichera toujours 2, quel que soit le nombre d'exécutions.
sync.RWMutex
sync.RWMutex est un mutex qui permet de distinguer les verrous de lecture et les verrous d'écriture. Les méthodes RLock et RUnlock peuvent être utilisées pour acquérir et libérer un verrou de lecture.
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex
9 data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock()
14 defer m.RUnlock()
15
16 value, ok := m.data[key]
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock()
22 defer m.Unlock()
23
24 m.data[key] = value
25}
Le code ci-dessus implémente un ConcurrentMap en utilisant sync.RWMutex. Dans ce code, la méthode Get acquiert un verrou de lecture et la méthode Set acquiert un verrou d'écriture, permettant un accès et une modification sécurisés de la carte data. La raison pour laquelle un verrou de lecture est nécessaire est que dans les cas où il y a beaucoup d'opérations de lecture simples, plusieurs goroutines peuvent effectuer des opérations de lecture simultanément en acquérant uniquement un verrou de lecture, sans acquérir un verrou d'écriture. Cela permet d'améliorer les performances en acquérant uniquement un verrou de lecture lorsque l'état n'a pas besoin d'être modifié et qu'un verrou d'écriture n'est pas nécessaire.
fakelock
fakelock est une astuce simple qui implémente sync.Locker. Cette structure fournit les mêmes méthodes que sync.Mutex, mais ne fait rien en réalité.
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}
Le code ci-dessus implémente le package fakelock. Ce package implémente sync.Locker pour fournir les méthodes Lock et Unlock, mais ne fait rien en réalité. La raison pour laquelle un tel code est nécessaire sera expliquée si l'occasion se présente.
waitgroup
sync.WaitGroup
sync.WaitGroup est un outil qui attend que toutes les tâches d'une goroutine soient terminées. Il fournit les méthodes Add, Done et Wait. La méthode Add ajoute le nombre de goroutines, la méthode Done indique que la tâche de la goroutine est terminée, et la méthode Wait attend que toutes les tâches des goroutines soient terminées.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Le code ci-dessus utilise sync.WaitGroup pour que 100 goroutines ajoutent simultanément des valeurs à la variable c. Dans ce code, sync.WaitGroup est utilisé pour attendre que toutes les goroutines se terminent, puis affiche la valeur ajoutée à la variable c. Bien qu'il suffise d'utiliser uniquement un canal pour fork & join quelques tâches, l'utilisation de sync.WaitGroup est une bonne option pour fork & join un grand nombre de tâches.
with slice
Utilisé avec des tranches, waitgroup peut devenir un excellent outil pour gérer des opérations concurrentes sans verrouillage.
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup
11 arr := [10]int{}
12
13 for i := 0; i < 10; i++ {
14 wg.Add(1)
15 go func(id int) {
16 defer wg.Done()
17
18 arr[id] = rand.Intn(100)
19 }(i)
20 }
21
22 wg.Wait()
23 fmt.Println("Done")
24
25 for i, v := range arr {
26 fmt.Printf("arr[%d] = %d\n", i, v)
27 }
28}
Le code ci-dessus utilise uniquement waitgroup pour que chaque goroutine génère simultanément 10 entiers aléatoires et les stocke dans l'index attribué. Dans ce code, waitgroup est utilisé pour attendre que toutes les goroutines se terminent, puis affiche Done. En utilisant waitgroup de cette manière, plusieurs goroutines peuvent effectuer des tâches simultanément, stocker des données sans verrouillage jusqu'à ce que toutes les goroutines soient terminées, et effectuer un post-traitement en bloc après la fin des tâches.
golang.org/x/sync/errgroup.ErrGroup
errgroup est un package qui étend sync.WaitGroup. Contrairement à sync.WaitGroup, errgroup annule toutes les goroutines et renvoie une erreur si l'une des tâches de la goroutine échoue.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background())
11 _ = ctx
12
13 for i := 0; i < 10; i++ {
14 i := i
15 g.Go(func() error {
16 if i == 5 {
17 return fmt.Errorf("error")
18 }
19 return nil
20 })
21 }
22
23 if err := g.Wait(); err != nil {
24 fmt.Println(err)
25 }
26}
Le code ci-dessus utilise errgroup pour créer 10 goroutines et génère une erreur dans la 5ème goroutine. J'ai délibérément provoqué une erreur dans la cinquième goroutine pour illustrer les cas d'erreur. Cependant, en pratique, vous pouvez utiliser errgroup pour créer des goroutines et effectuer divers post-traitements en cas d'erreur dans chaque goroutine.
once
Un outil pour exécuter du code qui ne doit être exécuté qu'une seule fois. Le code associé peut être exécuté via les constructeurs ci-dessous.
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFunc permet simplement à la fonction correspondante de n'être exécutée qu'une seule fois dans l'ensemble.
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() {
7 println("Hello, World!")
8 })
9
10 once()
11 once()
12 once()
13 once()
14 once()
15}
Le code ci-dessus utilise sync.OnceFunc pour afficher Hello, World!. Dans ce code, sync.OnceFunc est utilisé pour créer la fonction once, et même si la fonction once est appelée plusieurs fois, Hello, World! n'est affiché qu'une seule fois.
OnceValue
OnceValue ne se contente pas d'exécuter la fonction correspondante une seule fois dans l'ensemble, mais stocke également la valeur de retour de cette fonction et la renvoie lors des appels ultérieurs.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValue(func() int {
8 c += 1
9 return c
10 })
11
12 println(once())
13 println(once())
14 println(once())
15 println(once())
16 println(once())
17}
Le code ci-dessus utilise sync.OnceValue pour incrémenter la variable c de 1. Dans ce code, sync.OnceValue est utilisé pour créer la fonction once, et même si la fonction once est appelée plusieurs fois, la variable c n'est incrémentée qu'une seule fois et renvoie 1.
OnceValues
OnceValues fonctionne de la même manière que OnceValue, mais peut renvoyer plusieurs valeurs.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValues(func() (int, int) {
8 c += 1
9 return c, c
10 })
11
12 a, b := once()
13 println(a, b)
14 a, b = once()
15 println(a, b)
16 a, b = once()
17 println(a, b)
18 a, b = once()
19 println(a, b)
20 a, b = once()
21 println(a, b)
22}
Le code ci-dessus utilise sync.OnceValues pour incrémenter la variable c de 1. Dans ce code, sync.OnceValues est utilisé pour créer la fonction once, et même si la fonction once est appelée plusieurs fois, la variable c n'est incrémentée qu'une seule fois et renvoie 1.
atomic
Le package atomic fournit des opérations atomiques. Le package atomic propose des méthodes telles que Add, CompareAndSwap, Load, Store, Swap, mais il est récemment recommandé d'utiliser des types tels que Int64, Uint64, Pointer.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
C'est l'exemple utilisé précédemment. Le code incrémente atomiquement la variable c en utilisant le type atomic.Int64. Les méthodes Add et Load permettent d'incrémenter et de lire atomiquement la variable. De plus, la méthode Store permet de stocker une valeur, la méthode Swap permet d'échanger des valeurs, et la méthode CompareAndSwap permet de comparer des valeurs et de les échanger si elles correspondent.
cond
sync.Cond
Le package cond fournit des variables de condition. Le package cond peut être créé avec sync.Cond et fournit les méthodes Wait, Signal et Broadcast.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{})
9 ready := false
10
11 go func() {
12 c.L.Lock()
13 ready = true
14 c.Signal()
15 c.L.Unlock()
16 }()
17
18 c.L.Lock()
19 for !ready {
20 c.Wait()
21 }
22 c.L.Unlock()
23
24 println("Ready!")
25}
Le code ci-dessus utilise sync.Cond pour attendre que la variable ready devienne true. Dans ce code, sync.Cond est utilisé pour attendre que la variable ready devienne true, puis affiche Ready!. En utilisant sync.Cond de cette manière, plusieurs goroutines peuvent attendre simultanément qu'une condition spécifique soit satisfaite.
Cela peut être utilisé pour implémenter une queue simple.
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex
15 Cond *sync.Cond
16 Head *Node[T]
17 Tail *Node[T]
18 Len int
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{}
23 q.Cond = sync.NewCond(&q.Mutex)
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock()
29 defer q.Unlock()
30
31 node := &Node[T]{Value: value}
32 if q.Len == 0 {
33 q.Head = node
34 q.Tail = node
35 } else {
36 q.Tail.Next = node
37 q.Tail = node
38 }
39 q.Len++
40 q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock()
45 defer q.Unlock()
46
47 for q.Len == 0 {
48 q.Cond.Wait()
49 }
50
51 node := q.Head
52 q.Head = q.Head.Next
53 q.Len--
54 return node.Value
55}
En utilisant sync.Cond de cette manière, il est possible d'attendre de manière efficace et de reprendre l'exécution lorsque la condition est remplie, au lieu d'utiliser un spin-lock qui consomme beaucoup de CPU.
semaphore
golang.org/x/sync/semaphore.Semaphore
Le package semaphore fournit des sémaphores. Le package semaphore peut être créé avec golang.org/x/sync/semaphore.Semaphore et fournit les méthodes Acquire, Release et TryAcquire.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1)
11
12 if s.TryAcquire(1) {
13 fmt.Println("Acquired!")
14 } else {
15 fmt.Println("Not Acquired!")
16 }
17
18 s.Release(1)
19}
Le code ci-dessus utilise un semaphore pour créer un sémaphore, acquérir le sémaphore avec la méthode Acquire et le libérer avec la méthode Release. Ce code illustre comment acquérir et libérer un sémaphore en utilisant semaphore.
Conclusion
Je pense que ces informations de base devraient suffire. J'espère qu'en vous basant sur le contenu de cet article, vous comprendrez et pourrez réellement utiliser les méthodes de gestion de la concurrence avec les goroutines. J'espère que cet article vous a été utile. Merci.