GoSuda

Go Samtidighed Starterpakke

By snowmerak
views ...

Oversigt

Kort introduktion

Go-sproget har mange værktøjer til styring af samtidighed. I denne artikel vil vi introducere nogle af dem og tricks.

Goroutines?

Goroutine er en ny type samtidighedsmodel, der understøttes af Go-sproget. Normalt får et program OS-tråde fra operativsystemet for at udføre flere opgaver samtidigt, og det udfører opgaver parallelt med antallet af kerner. Og for at udføre en mindre enhed af samtidighed oprettes grønne tråde i userland, så flere grønne tråde kører og udfører opgaver i en OS-tråd. Men i tilfælde af goroutines er disse former for grønne tråde blevet gjort mindre og mere effektive. Disse goroutines bruger mindre hukommelse end tråde og kan oprettes og udskiftes hurtigere end tråde.

For at bruge en goroutine skal du blot bruge nøgleordet go. Dette gør det muligt intuitivt at køre synkron kode som asynkron kode i processen med at skrive et program.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("Hello, World!")
14    }()
15
16    fmt.Println("Waiting for goroutine...")
17    for range ch {}
18}

Denne kode ændrer simpelthen en synkron kode, der venter 1 sekund og udskriver Hello, World!, til en asynkron strøm. Selvom eksemplet er simpelt, vil kode, der er lidt kompleks, ændres fra synkron kode til asynkron kode, og læsbarheden, synligheden og forståelsen af koden vil være bedre end den eksisterende async await eller promise-metode.

Men i mange tilfælde er dårlig goroutine-kode ofte skabt i en tilstand, hvor man ikke forstår strømmen af simpelthen at kalde denne synkrone kode asynkront og en strøm som fork & join (en strøm, der ligner divide and conquer). Vi vil introducere et par metoder og teknikker, der kan forberede sig på disse tilfælde i denne artikel.

Samtidighedsstyring

context

Det kan være overraskende, at den første styringsteknik, der dukker op, er context. Men i Go-sproget udmærker context sig ikke kun i annullationsfunktionen, men også i styringen af hele arbejdstreet. Lad mig kort forklare den pågældende pakke for dem, der ikke kender den.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("Context is done!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

Ovenstående kode bruger context til at udskrive Context is done! efter 1 sekund. context kan bruges til at kontrollere, om annullering er udført gennem metoden Done(), og den tilbyder forskellige annulleringsmetoder gennem metoder som WithCancel, WithTimeout, WithDeadline og WithValue.

Lad os lave et simpelt eksempel. Antag, at du skriver kode til at hente user, post og comment ved hjælp af aggregator-mønsteret for at hente nogle data. Og hvis alle anmodninger skal udføres inden for 2 sekunder, kan du skrive det som følger.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("Timeout!")
20    case <-ch:
21        fmt.Println("All data is fetched!")
22    }
23}

Ovenstående kode udskriver Timeout!, hvis den ikke kan hente alle data inden for 2 sekunder, og udskriver All data is fetched!, hvis den henter alle data. Ved at bruge context på denne måde kan du nemt administrere annullering og timeout selv i kode, hvor flere goroutines fungerer.

Du kan kontrollere de forskellige kontekstrelaterede funktioner og metoder, der er relateret til dette, i godoc context. Jeg håber, du lærer det enkle, så du kan bruge det nemt.

channel

unbuffered channel

channel er et værktøj til kommunikation mellem goroutines. channel kan oprettes med make(chan T). På dette tidspunkt er T datatypen, som det pågældende channel vil overføre. channel kan sende og modtage data med <-, og du kan lukke channel med close.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Ovenstående kode er kode, der udskriver 1 og 2 ved hjælp af channel. Denne kode viser kun simpelthen at sende og modtage værdier til channel. Men channel tilbyder mere end det. Lad os først lære om buffered channel og unbuffered channel. Før vi begynder, er eksemplet ovenfor en unbuffered channel, og handlingen med at sende data til kanalen og handlingen med at modtage data skal udføres samtidigt. Hvis disse handlinger ikke udføres samtidigt, kan der opstå en dødlås.

buffered channel

Hvad nu hvis ovenstående kode er to processer, der udfører tungt arbejde i stedet for simpelt output? Hvis den anden proces læser og udfører behandlingen og hænger i lang tid, stopper den første proces også i løbet af den tid. Vi kan bruge buffered channel til at forhindre denne situation.

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Ovenstående kode er kode, der udskriver 1 og 2 ved hjælp af en buffered channel. I denne kode brugte vi en buffered channel til at sikre, at handlingen med at sende data til kanalen og handlingen med at modtage data ikke behøver at udføres samtidigt. Ved at placere en buffer i kanalen på denne måde kan du undgå forsinkelser i operationer, der skyldes efterfølgende operationer, fordi der er plads til den længden.

select

Når du håndterer flere kanaler, kan du nemt implementere en fan-in-struktur ved hjælp af select-syntaksen.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

Ovenstående kode opretter tre kanaler, der periodisk overfører 1, 2 og 3, og er kode, der modtager og udskriver værdier fra kanalen ved hjælp af select. På denne måde kan du behandle data, når du modtager dem fra kanalen, mens du modtager data samtidigt fra flere kanaler.

for range

channel kan nemt modtage data ved hjælp af for range. Når for range bruges til kanalen, fungerer den hver gang der tilføjes data til den pågældende kanal, og sløjfen slutter, når kanalen lukkes.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Ovenstående kode er kode, der udskriver 1 og 2 ved hjælp af channel. I denne kode bruges for range til at modtage og udskrive data hver gang data tilføjes til kanalen. Og når kanalen lukkes, slutter sløjfen.

Som skrevet et par gange ovenfor, kan denne syntaks også bruges som et simpelt synkroniseringsmiddel.

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("Hello, World!")
 9    }()
10
11    fmt.Println("Waiting for goroutine...")
12    for range ch {}
13}

Ovenstående kode udskriver Hello, World! efter 1 sekund. I denne kode blev den synkrone kode ændret til asynkron kode ved hjælp af channel. På denne måde kan du nemt ændre synkron kode til asynkron kode ved hjælp af channel og indstille et join-punkt.

etc

  1. Hvis du sender eller modtager data til en nil-kanal, kan du gå ind i en uendelig sløjfe og en dødlås kan opstå.
  2. Hvis du sender data efter at have lukket kanalen, opstår der panik.
  3. Selv hvis du ikke behøver at lukke kanalen, lukker GC kanalen, mens du samler den.

mutex

spinlock

spinlock er en synkroniseringsmetode, der fortsætter med at forsøge at låse ved at dreje løkken. I Go-sproget kan du nemt implementere en spinlock ved hjælp af en pointer.

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

Ovenstående kode er kode, der implementerer spinlock-pakken. I denne kode implementeres SpinLock ved hjælp af sync/atomic-pakken. Metoden Lock forsøger at låse ved hjælp af atomic.CompareAndSwapUintptr, og metoden Unlock frigør låsen ved hjælp af atomic.StoreUintptr. Denne metode bruger CPU'en kontinuerligt, indtil låsen er opnået, fordi den forsøger at låse uden hvile, så du kan gå ind i en uendelig sløjfe. Derfor er det bedst at bruge spinlock til enkel synkronisering eller kun i en kort periode.

sync.Mutex

mutex er et værktøj til synkronisering mellem goroutines. mutex, der leveres af sync-pakken, tilbyder metoder som Lock, Unlock, RLock og RUnlock. mutex kan oprettes med sync.Mutex, og du kan også bruge læse/skrive lås med sync.RWMutex.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

I ovenstående kode får to goroutines adgang til den samme count-variabel næsten samtidigt. På dette tidspunkt, hvis du bruger mutex til at gøre koden, der har adgang til count-variablen, til et kritisk område, kan du forhindre samtidig adgang til count-variablen. I så fald udskriver denne kode 2 uanset hvor mange gange den køres.

sync.RWMutex

sync.RWMutex er en mutex, der kan bruges ved at adskille læselåse og skrivelåse. Du kan hænge og frigøre en læselås ved hjælp af metoderne RLock og RUnlock.

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

Ovenstående kode er kode, der implementerer ConcurrentMap ved hjælp af sync.RWMutex. I denne kode låser metoden Get læsen og låser metoden Set skrivningen for sikkert at få adgang til og ændre data-kortet. Årsagen til behovet for en læselås er, at hvis der er mange enkle læseoperationer, behøver du ikke at låse skrivningen, men bare hænge en læselås for at tillade flere goroutines at udføre læseoperationer på samme tid. Gennem dette kan du forbedre ydeevnen ved kun at hænge en læselås, når du ikke behøver at hænge en skrivelås, fordi der ikke er nogen ændring i staten.

fakelock

fakelock er et simpelt trick, der implementerer sync.Locker. Denne struktur giver de samme metoder som sync.Mutex, men den virker ikke.

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

Ovenstående kode er kode, der implementerer fakelock-pakken. Denne pakke implementerer sync.Locker og leverer metoderne Lock og Unlock, men i virkeligheden gør den intet. Jeg vil beskrive, hvorfor denne kode er nødvendig, hvis jeg har mulighed.

waitgroup

sync.WaitGroup

sync.WaitGroup er et værktøj, der venter, indtil arbejdet i alle goroutines er færdigt. Den tilbyder metoderne Add, Done og Wait, og du tilføjer antallet af goroutines med metoden Add, og du meddeler, at arbejdet i goroutinen er færdigt med metoden Done. Og den venter, indtil arbejdet i alle goroutines er færdigt med metoden Wait.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Ovenstående kode er kode, der bruger sync.WaitGroup til at tilføje værdier til variablen c samtidigt med 100 gorutiner. I denne kode venter den ved hjælp af sync.WaitGroup, indtil alle gorutiner er færdige, og derefter udskrives den værdi, der er føjet til variablen c. Hvis du blot har nogle få opgaver til at "fork & join", er det tilstrækkeligt at bruge kun kanaler, men det er også et godt valg at bruge sync.WaitGroup, hvis du har et stort antal opgaver til "fork & join".

with slice

Hvis den bruges sammen med slices, kan waitgroup være et godt værktøj til at administrere samtidige udførelsesopgaver uden låse.

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	arr := [10]int{}
12
13	for i := 0; i < 10; i++ {
14		wg.Add(1)
15		go func(id int) {
16			defer wg.Done()
17
18			arr[id] = rand.Intn(100)
19		}(i)
20	}
21
22	wg.Wait()
23	fmt.Println("Done")
24
25    for i, v := range arr {
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

Ovenstående kode er kode, der kun bruger waitgroup til at generere 10 tilfældige heltal samtidigt for hver gorutine og gemmer dem i det tildelte indeks. I denne kode venter den ved hjælp af waitgroup, indtil alle gorutiner er færdige, og udskriver derefter Done. Hvis du bruger waitgroup på denne måde, kan flere gorutiner udføre opgaver samtidigt, gemme data uden låse, indtil alle gorutiner er færdige, og udføre batch-efterbehandling efter opgaveafslutning.

golang.org/x/sync/errgroup.ErrGroup

errgroup er en pakke, der udvider sync.WaitGroup. I modsætning til sync.WaitGroup annullerer errgroup alle gorutiner og returnerer en fejl, hvis der opstår en fejl i en af gorutinernes opgaver.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}

Ovenstående kode er kode, der bruger errgroup til at generere 10 gorutiner og genererer en fejl i den 5. gorutine. Jeg har med vilje genereret en fejl i den femte gorutine for at vise tilfældet, hvor der opstår en fejl. Men når du faktisk bruger det, kan du bruge errgroup til at generere gorutiner og udføre forskellige efterbehandlinger for tilfældet, hvor der opstår en fejl i hver gorutine.

once

Det er et værktøj til at udføre kode, der kun skal udføres én gang. Du kan køre relateret kode via nedenstående konstruktorer.

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc giver simpelthen mulighed for, at den pågældende funktion kun kan udføres én gang i hele.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}

Ovenstående kode er kode, der bruger sync.OnceFunc til at udskrive Hello, World!. I denne kode genereres funktionen once ved hjælp af sync.OnceFunc, og selvom funktionen once kaldes flere gange, udskrives Hello, World! kun én gang.

OnceValue

OnceValue er ikke blot en funktion, der kun udføres én gang i hele, men gemmer også returværdien for den pågældende funktion og returnerer den gemte værdi, når den kaldes igen.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}

Ovenstående kode er kode, der bruger sync.OnceValue til at øge variablen c med 1. I denne kode genereres funktionen once ved hjælp af sync.OnceValue, og selvom funktionen once kaldes flere gange, returnerer den 1, hvor variablen c kun er steget én gang.

OnceValues

OnceValues fungerer på samme måde som OnceValue, men kan returnere flere værdier.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}

Ovenstående kode er kode, der bruger sync.OnceValues til at øge variablen c med 1. I denne kode genereres funktionen once ved hjælp af sync.OnceValues, og selvom funktionen once kaldes flere gange, returnerer den 1, hvor variablen c kun er steget én gang.

atomic

atomic-pakken er en pakke, der leverer atomare operationer. atomic-pakken leverer metoder som Add, CompareAndSwap, Load, Store, Swap, men brugen af typer som Int64, Uint64, Pointer anbefales for nylig.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Dette er eksemplet, der blev brugt før. Dette er kode, der bruger typen atomic.Int64 til at øge variablen c atomisk. Du kan atomisk øge variablen og læse variablen med metoderne Add og Load. Du kan også gemme værdier med metoden Store, udskifte værdier med metoden Swap og udskifte værdier, hvis de er egnede, efter at have sammenlignet værdier med metoden CompareAndSwap.

cond

sync.Cond

cond-pakken er en pakke, der leverer betingelsesvariabler. cond-pakken kan genereres med sync.Cond og leverer metoderne Wait, Signal, Broadcast.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}

Ovenstående kode er kode, der bruger sync.Cond til at vente, indtil variablen ready er true. I denne kode venter den ved hjælp af sync.Cond, indtil variablen ready er true, og udskriver derefter Ready!. Hvis du bruger sync.Cond på denne måde, kan flere gorutiner samtidigt vente, indtil bestemte betingelser er opfyldt.

Du kan bruge dette til at implementere en simpel queue.

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

Hvis du bruger sync.Cond på denne måde, kan du vente effektivt i stedet for at bruge mange CPU-ressourcer med en spin-lock, og du kan fungere igen, når betingelsen er opfyldt.

semaphore

golang.org/x/sync/semaphore.Semaphore

semaphore-pakken er en pakke, der leverer semaforer. semaphore-pakken kan genereres med golang.org/x/sync/semaphore.Semaphore og leverer metoderne Acquire, Release, TryAcquire.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}

Ovenstående kode er kode, der bruger semaphore til at generere en semafor og erhverve en semafor ved hjælp af metoden Acquire og frigive en semafor med metoden Release. I denne kode viste jeg, hvordan man får og frigiver en semafor ved hjælp af semaphore.

Afslutning

Jeg tror, at de grundlæggende ting er tilstrækkelige indtil her. Baseret på indholdet i denne artikel håber jeg, at I forstår, hvordan man administrerer samtidighed ved hjælp af gorutiner, og at I faktisk kan bruge dem. Jeg håber, at denne artikel har været nyttig for jer. Tak.