GoSuda

Pack de démarrage pour la concurrence en Go

By snowmerak
views ...

Aperçu

Brève introduction

Le langage Go offre de nombreux outils pour la gestion de la concurrence. Cet article présentera certains de ces outils et astuces.

Goroutines ?

Une goroutine est un nouveau modèle de concurrence pris en charge par le langage Go. En général, un programme reçoit des threads du système d’exploitation pour exécuter plusieurs tâches simultanément et exécute des tâches en parallèle en fonction du nombre de cœurs. Pour une concurrence à plus petite échelle, des threads verts sont générés dans l’espace utilisateur, ce qui permet à plusieurs threads verts d’exécuter des tâches de manière rotative au sein d’un même thread du système d’exploitation. Cependant, dans le cas des goroutines, ce type de threads verts est rendu plus petit et plus efficace. Ces goroutines utilisent moins de mémoire que les threads et peuvent être créées et commutées plus rapidement que les threads.

Pour utiliser une goroutine, il suffit d’utiliser le mot-clé go. Cela permet d’exécuter intuitivement du code synchrone en code asynchrone lors de l’écriture d’un programme.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("Hello, World!")
14    }()
15
16    fmt.Println("Waiting for goroutine...")
17    for range ch {}
18}

Ce code modifie simplement le code synchrone qui imprime Hello, World! après une pause d’une seconde en un flux asynchrone. L’exemple actuel est simple, mais lorsque du code un peu plus complexe est modifié de synchrone à asynchrone, la lisibilité, la visibilité et la compréhension du code sont bien meilleures qu’avec des méthodes telles que async await ou promise.

Toutefois, dans de nombreux cas, un mauvais code de goroutine est produit si le flux d’appel de ce code synchrone en tant que code asynchrone et des flux tels que fork & join (un flux similaire à diviser pour régner) ne sont pas compris. Cet article présentera certaines méthodes et techniques pour se préparer à de tels cas.

Gestion de la concurrence

Contexte

Il peut être surprenant que le context soit la première technique de gestion. Cependant, dans le langage Go, le context joue un rôle exceptionnel dans la gestion de l’ensemble de l’arborescence des tâches, au-delà d’une simple fonction d’annulation. Pour ceux qui ne le connaissent pas, je vais brièvement expliquer ce package.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("Context is done!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

Le code ci-dessus utilise un context pour afficher Context is done! après une seconde. Un context peut déterminer si une annulation a eu lieu par le biais de la méthode Done() et fournit diverses méthodes d’annulation telles que WithCancel, WithTimeout, WithDeadline et WithValue.

Créons un exemple simple. Supposons que vous utilisiez le modèle d’agrégateur pour obtenir des données et que vous écriviez du code pour obtenir user, post et comment. Si toutes les requêtes doivent être effectuées en deux secondes, vous pouvez écrire ce qui suit.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("Timeout!")
20    case <-ch:
21        fmt.Println("All data is fetched!")
22    }
23}

Le code ci-dessus affiche Timeout! si toutes les données ne sont pas extraites en deux secondes et affiche All data is fetched! si toutes les données sont extraites. En utilisant un context de cette manière, l’annulation et le délai d’attente peuvent être facilement gérés, même dans du code où plusieurs goroutines sont en cours d’exécution.

Diverses fonctions et méthodes liées au contexte sont disponibles dans godoc context. J’espère que vous apprendrez les plus simples et que vous pourrez les utiliser confortablement.

Canal

Canal non tamponné

Un canal est un outil de communication entre les goroutines. Un canal peut être créé avec make(chan T). Dans ce cas, T est le type de données que le canal transmettra. Les données peuvent être envoyées et reçues avec <- et le canal peut être fermé avec close.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Le code ci-dessus utilise un canal pour afficher 1 et 2. Ce code montre simplement l’envoi et la réception de valeurs sur un canal. Cependant, un canal offre plus de fonctionnalités que cela. Examinons d’abord les canaux tamponnés et les canaux non tamponnés. Pour commencer, l’exemple ci-dessus est un canal non tamponné et l’action d’envoi de données sur un canal et l’action de réception de données doivent avoir lieu simultanément. Si ces actions ne se produisent pas simultanément, un blocage peut se produire.

Canal tamponné

Que se passe-t-il si le code ci-dessus ne consiste pas en une simple sortie, mais en deux processus qui effectuent des opérations lourdes ? Si le deuxième processus bloque pendant une longue période en raison d’une lecture et d’un traitement, le premier processus s’arrêtera également pendant cette période. Nous pouvons utiliser un canal tamponné pour éviter cette situation.

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Le code ci-dessus utilise un canal tamponné pour afficher 1 et 2. Dans ce code, nous avons utilisé un canal tamponné pour que l’action d’envoi de données sur le canal et l’action de réception de données n’aient pas à se produire simultanément. En mettant un tampon sur le canal de cette manière, une marge est créée en fonction de la longueur, ce qui permet d’éviter les retards de travail causés par l’impact du travail en aval.

Sélectionner

Lorsque vous traitez plusieurs canaux, vous pouvez facilement implémenter une structure fan-in en utilisant la syntaxe select.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

Le code ci-dessus crée trois canaux qui transmettent périodiquement 1, 2 et 3 et utilise select pour recevoir des valeurs des canaux et les imprimer. En utilisant select de cette manière, il est possible de traiter les données au fur et à mesure qu’elles sont reçues des canaux, tout en recevant simultanément des données de plusieurs canaux.

Pour la plage

Un canal peut facilement recevoir des données en utilisant for range. Si for range est utilisé sur un canal, il sera exécuté chaque fois que des données sont ajoutées au canal et la boucle se terminera lorsque le canal sera fermé.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Le code ci-dessus utilise un canal pour afficher 1 et 2. Dans ce code, nous utilisons for range pour recevoir et imprimer les données chaque fois que des données sont ajoutées au canal. La boucle se termine lorsque le canal est fermé.

Comme écrit plusieurs fois ci-dessus, cette syntaxe peut également être utilisée comme moyen de synchronisation simple.

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("Hello, World!")
 9    }()
10
11    fmt.Println("Waiting for goroutine...")
12    for range ch {}
13}

Le code ci-dessus affiche Hello, World! après une pause d’une seconde. Dans ce code, nous avons utilisé un canal pour convertir du code synchrone en code asynchrone. En utilisant canal de cette manière, il est possible de convertir facilement du code synchrone en code asynchrone et de définir un point de jointure.

etc

  1. L’envoi ou la réception de données sur un canal nil peut provoquer une boucle infinie et un blocage.
  2. L’envoi de données après la fermeture d’un canal provoque une panique.
  3. Il n’est pas nécessaire de fermer un canal, car le GC ferme le canal lorsqu’il est collecté.

Mutex

Spinlock

Un spinlock est une méthode de synchronisation qui tente de verrouiller en exécutant une boucle. Dans le langage Go, un spinlock peut facilement être implémenté en utilisant des pointeurs.

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

Le code ci-dessus est le code qui implémente le package spinlock. Dans ce code, nous avons implémenté SpinLock en utilisant le package sync/atomic. La méthode Lock tente de verrouiller en utilisant atomic.CompareAndSwapUintptr et la méthode Unlock déverrouille en utilisant atomic.StoreUintptr. Cette méthode tente de verrouiller sans s’arrêter, elle continue donc à utiliser le CPU jusqu’à ce que le verrou soit obtenu, ce qui peut entraîner une boucle infinie. Par conséquent, il est recommandé d’utiliser spinlock pour une synchronisation simple ou pour une courte durée uniquement.

sync.Mutex

Un mutex est un outil de synchronisation entre goroutines. Le mutex fourni par le package sync fournit des méthodes telles que Lock, Unlock, RLock et RUnlock. Un mutex peut être créé avec sync.Mutex et un verrou de lecture/écriture peut être utilisé avec sync.RWMutex.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

Dans le code ci-dessus, deux goroutines accèdent à la même variable count presque simultanément. Dans ce cas, si le code qui accède à la variable count est rendu zone critique en utilisant un mutex, l’accès simultané à la variable count peut être empêché. Ce code affiche alors toujours 2, quel que soit le nombre de fois où il est exécuté.

sync.RWMutex

sync.RWMutex est un mutex qui peut être utilisé en distinguant les verrous de lecture et les verrous d’écriture. Un verrou de lecture peut être verrouillé et déverrouillé en utilisant les méthodes RLock et RUnlock.

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

Le code ci-dessus est un code qui implémente ConcurrentMap en utilisant sync.RWMutex. Dans ce code, un verrou de lecture est pris dans la méthode Get et un verrou d’écriture est pris dans la méthode Set pour accéder et modifier en toute sécurité la carte data. La raison pour laquelle un verrou de lecture est nécessaire est que, dans le cas de nombreuses opérations de lecture simples, plusieurs goroutines peuvent effectuer des opérations de lecture simultanément en prenant uniquement un verrou de lecture sans prendre de verrou d’écriture. Cela permet d’améliorer les performances en prenant uniquement un verrou de lecture dans les cas où il n’est pas nécessaire de prendre un verrou d’écriture car il n’y a pas de modification d’état.

fakelock

Un fakelock est une astuce simple qui implémente sync.Locker. Cette structure fournit les mêmes méthodes que sync.Mutex, mais n’effectue aucune opération réelle.

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

Le code ci-dessus est un code qui implémente le package fakelock. Ce package implémente sync.Locker pour fournir les méthodes Lock et Unlock, mais n’effectue en fait aucune opération. La raison pour laquelle ce code est nécessaire sera décrite lorsque l’occasion se présentera.

Waitgroup

sync.WaitGroup

sync.WaitGroup est un outil qui attend que toutes les tâches d’une goroutine soient terminées. Il fournit les méthodes Add, Done et Wait. La méthode Add ajoute le nombre de goroutines, la méthode Done indique qu’une tâche d’une goroutine est terminée et la méthode Wait attend que toutes les tâches de toutes les goroutines soient terminées.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Ce code utilise sync.WaitGroup pour que 100 goroutines ajoutent simultanément des valeurs à la variable c. Dans ce code, sync.WaitGroup est utilisé pour attendre que toutes les goroutines soient terminées, puis la valeur ajoutée à la variable c est affichée. Il est suffisant d'utiliser uniquement des canaux pour simplement effectuer quelques opérations de fork & join, mais dans le cas de nombreuses opérations de fork & join, l'utilisation de sync.WaitGroup est également un bon choix.

with slice

Lorsqu'il est utilisé avec des slices, waitgroup peut être un excellent outil pour gérer les opérations simultanées sans verrous.

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	arr := [10]int{}
12
13	for i := 0; i < 10; i++ {
14		wg.Add(1)
15		go func(id int) {
16			defer wg.Done()
17
18			arr[id] = rand.Intn(100)
19		}(i)
20	}
21
22	wg.Wait()
23	fmt.Println("Done")
24
25    for i, v := range arr {
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

Le code ci-dessus utilise uniquement waitgroup pour permettre à chaque goroutine de générer simultanément 10 entiers aléatoires et de les stocker dans l'index qui lui est attribué. Dans ce code, waitgroup est utilisé pour attendre que toutes les goroutines soient terminées, puis Done est affiché. En utilisant waitgroup de cette manière, plusieurs goroutines peuvent effectuer des opérations simultanément, stocker des données sans verrou jusqu'à ce que toutes les goroutines soient terminées, et effectuer un post-traitement de manière groupée après la fin de l'opération.

golang.org/x/sync/errgroup.ErrGroup

errgroup est un package qui étend sync.WaitGroup. Contrairement à sync.WaitGroup, si une erreur se produit pendant l'exécution d'une goroutine, errgroup annule toutes les goroutines et renvoie une erreur.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}

Le code ci-dessus crée 10 goroutines en utilisant errgroup et génère une erreur dans la 5ème goroutine. Nous avons intentionnellement généré une erreur dans la cinquième goroutine pour montrer le cas où une erreur se produit. Cependant, dans la pratique, vous pouvez utiliser errgroup pour créer des goroutines et effectuer divers post-traitements pour les cas où des erreurs se produisent dans chaque goroutine.

once

C'est un outil pour exécuter du code qui ne doit être exécuté qu'une seule fois. Le code associé peut être exécuté via les constructeurs suivants.

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc permet simplement de s'assurer qu'une fonction donnée ne peut être exécutée qu'une seule fois au total.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}

Le code ci-dessus utilise sync.OnceFunc pour afficher Hello, World!. Dans ce code, sync.OnceFunc est utilisé pour créer une fonction once, et même si la fonction once est appelée plusieurs fois, Hello, World! n'est affiché qu'une seule fois.

OnceValue

OnceValue ne fait pas que s'assurer que la fonction donnée n'est exécutée qu'une seule fois au total, mais enregistre également la valeur de retour de cette fonction et renvoie la valeur enregistrée lors des appels suivants.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}

Le code ci-dessus utilise sync.OnceValue pour incrémenter la variable c de 1. Dans ce code, sync.OnceValue est utilisé pour créer une fonction once, et même si la fonction once est appelée plusieurs fois, elle ne renvoie que 1, car la variable c n'est incrémentée qu'une seule fois.

OnceValues

OnceValues fonctionne de la même manière que OnceValue, mais peut retourner plusieurs valeurs.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}

Le code ci-dessus utilise sync.OnceValues pour incrémenter la variable c de 1. Dans ce code, sync.OnceValues est utilisé pour créer une fonction once, et même si la fonction once est appelée plusieurs fois, elle ne renvoie que 1, car la variable c n'est incrémentée qu'une seule fois.

atomic

Le package atomic est un package qui fournit des opérations atomiques. Le package atomic fournit des méthodes telles que Add, CompareAndSwap, Load, Store, Swap, mais il est récemment recommandé d'utiliser les types Int64, Uint64, Pointer, etc.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Voici l'exemple qui a été utilisé précédemment. C'est un code qui incrémente atomiquement la variable c en utilisant le type atomic.Int64. Vous pouvez incrémenter atomiquement la variable et lire la variable en utilisant les méthodes Add et Load. Vous pouvez également enregistrer une valeur avec la méthode Store, échanger une valeur avec la méthode Swap et comparer une valeur, puis la remplacer si elle est appropriée avec la méthode CompareAndSwap.

cond

sync.Cond

Le package cond est un package qui fournit des variables de condition. Le package cond peut être créé avec sync.Cond, et il fournit les méthodes Wait, Signal, Broadcast.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}

Le code ci-dessus utilise sync.Cond pour attendre que la variable ready devienne true. Dans ce code, sync.Cond est utilisé pour attendre que la variable ready devienne true, puis afficher Ready!. En utilisant sync.Cond de cette manière, vous pouvez faire attendre plusieurs goroutines simultanément jusqu'à ce qu'une certaine condition soit remplie.

Ceci peut être utilisé pour mettre en œuvre une simple queue.

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

En utilisant sync.Cond de cette manière, au lieu d'utiliser un spin-lock qui utilise beaucoup de CPU, vous pouvez attendre efficacement, et vous pouvez reprendre le fonctionnement lorsque la condition est satisfaite.

semaphore

golang.org/x/sync/semaphore.Semaphore

Le package semaphore est un package qui fournit des sémaphores. Le package semaphore peut être créé avec golang.org/x/sync/semaphore.Semaphore, et il fournit les méthodes Acquire, Release, TryAcquire.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}

Le code ci-dessus utilise semaphore pour créer un sémaphore, et utilise le sémaphore pour acquérir le sémaphore avec la méthode Acquire et libérer le sémaphore avec la méthode Release. Ce code montre comment acquérir et libérer un sémaphore en utilisant semaphore.

Conclusion

Il semble que le contenu de base soit suffisant ici. J'espère que sur la base du contenu de cet article, vous avez compris comment gérer la concurrence en utilisant des goroutines et que vous pouvez réellement l'utiliser. J'espère que cet article vous a été utile. Merci.