Go Samtidighed Starterpakke
Oversigt
Kort introduktion
Go-sproget har mange værktøjer til styring af samtidighed. I denne artikel vil vi introducere nogle af dem og tricks.
Goroutines?
Goroutine er en ny type samtidighedsmodel, der understøttes af Go-sproget. Normalt får et program OS-tråde fra operativsystemet for at udføre flere opgaver samtidigt, og det udfører opgaver parallelt med antallet af kerner. Og for at udføre en mindre enhed af samtidighed oprettes grønne tråde i userland, så flere grønne tråde kører og udfører opgaver i en OS-tråd. Men i tilfælde af goroutines er disse former for grønne tråde blevet gjort mindre og mere effektive. Disse goroutines bruger mindre hukommelse end tråde og kan oprettes og udskiftes hurtigere end tråde.
For at bruge en goroutine skal du blot bruge nøgleordet go
. Dette gør det muligt intuitivt at køre synkron kode som asynkron kode i processen med at skrive et program.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch)
12 time.Sleep(1 * time.Second)
13 fmt.Println("Hello, World!")
14 }()
15
16 fmt.Println("Waiting for goroutine...")
17 for range ch {}
18}
Denne kode ændrer simpelthen en synkron kode, der venter 1 sekund og udskriver Hello, World!
, til en asynkron strøm. Selvom eksemplet er simpelt, vil kode, der er lidt kompleks, ændres fra synkron kode til asynkron kode, og læsbarheden, synligheden og forståelsen af koden vil være bedre end den eksisterende async await eller promise-metode.
Men i mange tilfælde er dårlig goroutine-kode ofte skabt i en tilstand, hvor man ikke forstår strømmen af simpelthen at kalde denne synkrone kode asynkront og en strøm som fork & join
(en strøm, der ligner divide and conquer). Vi vil introducere et par metoder og teknikker, der kan forberede sig på disse tilfælde i denne artikel.
Samtidighedsstyring
context
Det kan være overraskende, at den første styringsteknik, der dukker op, er context
. Men i Go-sproget udmærker context
sig ikke kun i annullationsfunktionen, men også i styringen af hele arbejdstreet. Lad mig kort forklare den pågældende pakke for dem, der ikke kender den.
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background())
5 defer cancel()
6
7 go func() {
8 <-ctx.Done()
9 fmt.Println("Context is done!")
10 }()
11
12 time.Sleep(1 * time.Second)
13
14 cancel()
15
16 time.Sleep(1 * time.Second)
17}
Ovenstående kode bruger context
til at udskrive Context is done!
efter 1 sekund. context
kan bruges til at kontrollere, om annullering er udført gennem metoden Done()
, og den tilbyder forskellige annulleringsmetoder gennem metoder som WithCancel
, WithTimeout
, WithDeadline
og WithValue
.
Lad os lave et simpelt eksempel. Antag, at du skriver kode til at hente user
, post
og comment
ved hjælp af aggregator
-mønsteret for at hente nogle data. Og hvis alle anmodninger skal udføres inden for 2 sekunder, kan du skrive det som følger.
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
5 defer cancel()
6
7 ch := make(chan struct{})
8 go func() {
9 defer close(ch)
10 user := getUser(ctx)
11 post := getPost(ctx)
12 comment := getComment(ctx)
13
14 fmt.Println(user, post, comment)
15 }()
16
17 select {
18 case <-ctx.Done():
19 fmt.Println("Timeout!")
20 case <-ch:
21 fmt.Println("All data is fetched!")
22 }
23}
Ovenstående kode udskriver Timeout!
, hvis den ikke kan hente alle data inden for 2 sekunder, og udskriver All data is fetched!
, hvis den henter alle data. Ved at bruge context
på denne måde kan du nemt administrere annullering og timeout selv i kode, hvor flere goroutines fungerer.
Du kan kontrollere de forskellige kontekstrelaterede funktioner og metoder, der er relateret til dette, i godoc context. Jeg håber, du lærer det enkle, så du kan bruge det nemt.
channel
unbuffered channel
channel
er et værktøj til kommunikation mellem goroutines. channel
kan oprettes med make(chan T)
. På dette tidspunkt er T
datatypen, som det pågældende channel
vil overføre. channel
kan sende og modtage data med <-
, og du kan lukke channel
med close
.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Ovenstående kode er kode, der udskriver 1 og 2 ved hjælp af channel
. Denne kode viser kun simpelthen at sende og modtage værdier til channel
. Men channel
tilbyder mere end det. Lad os først lære om buffered channel
og unbuffered channel
. Før vi begynder, er eksemplet ovenfor en unbuffered channel
, og handlingen med at sende data til kanalen og handlingen med at modtage data skal udføres samtidigt. Hvis disse handlinger ikke udføres samtidigt, kan der opstå en dødlås.
buffered channel
Hvad nu hvis ovenstående kode er to processer, der udfører tungt arbejde i stedet for simpelt output? Hvis den anden proces læser og udfører behandlingen og hænger i lang tid, stopper den første proces også i løbet af den tid. Vi kan bruge buffered channel
til at forhindre denne situation.
1package main
2
3func main() {
4 ch := make(chan int, 2)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Ovenstående kode er kode, der udskriver 1 og 2 ved hjælp af en buffered channel
. I denne kode brugte vi en buffered channel
til at sikre, at handlingen med at sende data til kanalen og handlingen med at modtage data ikke behøver at udføres samtidigt. Ved at placere en buffer i kanalen på denne måde kan du undgå forsinkelser i operationer, der skyldes efterfølgende operationer, fordi der er plads til den længden.
select
Når du håndterer flere kanaler, kan du nemt implementere en fan-in
-struktur ved hjælp af select
-syntaksen.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10)
10 ch2 := make(chan int, 10)
11 ch3 := make(chan int, 10)
12
13 go func() {
14 for {
15 ch1 <- 1
16 time.Sleep(1 * time.Second)
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2
22 time.Sleep(2 * time.Second)
23 }
24 }()
25 go func() {
26 for {
27 ch3 <- 3
28 time.Sleep(3 * time.Second)
29 }
30 }()
31
32 for i := 0; i < 3; i++ {
33 select {
34 case v := <-ch1:
35 fmt.Println(v)
36 case v := <-ch2:
37 fmt.Println(v)
38 case v := <-ch3:
39 fmt.Println(v)
40 }
41 }
42}
Ovenstående kode opretter tre kanaler, der periodisk overfører 1, 2 og 3, og er kode, der modtager og udskriver værdier fra kanalen ved hjælp af select
. På denne måde kan du behandle data, når du modtager dem fra kanalen, mens du modtager data samtidigt fra flere kanaler.
for range
channel
kan nemt modtage data ved hjælp af for range
. Når for range
bruges til kanalen, fungerer den hver gang der tilføjes data til den pågældende kanal, og sløjfen slutter, når kanalen lukkes.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Ovenstående kode er kode, der udskriver 1 og 2 ved hjælp af channel
. I denne kode bruges for range
til at modtage og udskrive data hver gang data tilføjes til kanalen. Og når kanalen lukkes, slutter sløjfen.
Som skrevet et par gange ovenfor, kan denne syntaks også bruges som et simpelt synkroniseringsmiddel.
1package main
2
3func main() {
4 ch := make(chan struct{})
5 go func() {
6 defer close(ch)
7 time.Sleep(1 * time.Second)
8 fmt.Println("Hello, World!")
9 }()
10
11 fmt.Println("Waiting for goroutine...")
12 for range ch {}
13}
Ovenstående kode udskriver Hello, World!
efter 1 sekund. I denne kode blev den synkrone kode ændret til asynkron kode ved hjælp af channel
. På denne måde kan du nemt ændre synkron kode til asynkron kode ved hjælp af channel
og indstille et join
-punkt.
etc
- Hvis du sender eller modtager data til en nil-kanal, kan du gå ind i en uendelig sløjfe og en dødlås kan opstå.
- Hvis du sender data efter at have lukket kanalen, opstår der panik.
- Selv hvis du ikke behøver at lukke kanalen, lukker GC kanalen, mens du samler den.
mutex
spinlock
spinlock
er en synkroniseringsmetode, der fortsætter med at forsøge at låse ved at dreje løkken. I Go-sproget kan du nemt implementere en spinlock ved hjælp af en pointer.
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14 runtime.Gosched()
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{}
24}
Ovenstående kode er kode, der implementerer spinlock
-pakken. I denne kode implementeres SpinLock
ved hjælp af sync/atomic
-pakken. Metoden Lock
forsøger at låse ved hjælp af atomic.CompareAndSwapUintptr
, og metoden Unlock
frigør låsen ved hjælp af atomic.StoreUintptr
. Denne metode bruger CPU'en kontinuerligt, indtil låsen er opnået, fordi den forsøger at låse uden hvile, så du kan gå ind i en uendelig sløjfe. Derfor er det bedst at bruge spinlock
til enkel synkronisering eller kun i en kort periode.
sync.Mutex
mutex
er et værktøj til synkronisering mellem goroutines. mutex
, der leveres af sync
-pakken, tilbyder metoder som Lock
, Unlock
, RLock
og RUnlock
. mutex
kan oprettes med sync.Mutex
, og du kan også bruge læse/skrive lås med sync.RWMutex
.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex
9 var count int
10
11 go func() {
12 mu.Lock()
13 count++
14 mu.Unlock()
15 }()
16
17 mu.Lock()
18 count++
19 mu.Unlock()
20
21 println(count)
22}
I ovenstående kode får to goroutines adgang til den samme count
-variabel næsten samtidigt. På dette tidspunkt, hvis du bruger mutex
til at gøre koden, der har adgang til count
-variablen, til et kritisk område, kan du forhindre samtidig adgang til count
-variablen. I så fald udskriver denne kode 2
uanset hvor mange gange den køres.
sync.RWMutex
sync.RWMutex
er en mutex
, der kan bruges ved at adskille læselåse og skrivelåse. Du kan hænge og frigøre en læselås ved hjælp af metoderne RLock
og RUnlock
.
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex
9 data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock()
14 defer m.RUnlock()
15
16 value, ok := m.data[key]
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock()
22 defer m.Unlock()
23
24 m.data[key] = value
25}
Ovenstående kode er kode, der implementerer ConcurrentMap
ved hjælp af sync.RWMutex
. I denne kode låser metoden Get
læsen og låser metoden Set
skrivningen for sikkert at få adgang til og ændre data
-kortet. Årsagen til behovet for en læselås er, at hvis der er mange enkle læseoperationer, behøver du ikke at låse skrivningen, men bare hænge en læselås for at tillade flere goroutines at udføre læseoperationer på samme tid. Gennem dette kan du forbedre ydeevnen ved kun at hænge en læselås, når du ikke behøver at hænge en skrivelås, fordi der ikke er nogen ændring i staten.
fakelock
fakelock
er et simpelt trick, der implementerer sync.Locker
. Denne struktur giver de samme metoder som sync.Mutex
, men den virker ikke.
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}
Ovenstående kode er kode, der implementerer fakelock
-pakken. Denne pakke implementerer sync.Locker
og leverer metoderne Lock
og Unlock
, men i virkeligheden gør den intet. Jeg vil beskrive, hvorfor denne kode er nødvendig, hvis jeg har mulighed.
waitgroup
sync.WaitGroup
sync.WaitGroup
er et værktøj, der venter, indtil arbejdet i alle goroutines er færdigt. Den tilbyder metoderne Add
, Done
og Wait
, og du tilføjer antallet af goroutines med metoden Add
, og du meddeler, at arbejdet i goroutinen er færdigt med metoden Done
. Og den venter, indtil arbejdet i alle goroutines er færdigt med metoden Wait
.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Ovenstående kode er kode, der bruger sync.WaitGroup
til at tilføje værdier til variablen c
samtidigt med 100 gorutiner. I denne kode venter den ved hjælp af sync.WaitGroup
, indtil alle gorutiner er færdige, og derefter udskrives den værdi, der er føjet til variablen c
. Hvis du blot har nogle få opgaver til at "fork & join", er det tilstrækkeligt at bruge kun kanaler, men det er også et godt valg at bruge sync.WaitGroup
, hvis du har et stort antal opgaver til "fork & join".
with slice
Hvis den bruges sammen med slices, kan waitgroup
være et godt værktøj til at administrere samtidige udførelsesopgaver uden låse.
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup
11 arr := [10]int{}
12
13 for i := 0; i < 10; i++ {
14 wg.Add(1)
15 go func(id int) {
16 defer wg.Done()
17
18 arr[id] = rand.Intn(100)
19 }(i)
20 }
21
22 wg.Wait()
23 fmt.Println("Done")
24
25 for i, v := range arr {
26 fmt.Printf("arr[%d] = %d\n", i, v)
27 }
28}
Ovenstående kode er kode, der kun bruger waitgroup
til at generere 10 tilfældige heltal samtidigt for hver gorutine og gemmer dem i det tildelte indeks. I denne kode venter den ved hjælp af waitgroup
, indtil alle gorutiner er færdige, og udskriver derefter Done
. Hvis du bruger waitgroup
på denne måde, kan flere gorutiner udføre opgaver samtidigt, gemme data uden låse, indtil alle gorutiner er færdige, og udføre batch-efterbehandling efter opgaveafslutning.
golang.org/x/sync/errgroup.ErrGroup
errgroup
er en pakke, der udvider sync.WaitGroup
. I modsætning til sync.WaitGroup
annullerer errgroup
alle gorutiner og returnerer en fejl, hvis der opstår en fejl i en af gorutinernes opgaver.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background())
11 _ = ctx
12
13 for i := 0; i < 10; i++ {
14 i := i
15 g.Go(func() error {
16 if i == 5 {
17 return fmt.Errorf("error")
18 }
19 return nil
20 })
21 }
22
23 if err := g.Wait(); err != nil {
24 fmt.Println(err)
25 }
26}
Ovenstående kode er kode, der bruger errgroup
til at generere 10 gorutiner og genererer en fejl i den 5. gorutine. Jeg har med vilje genereret en fejl i den femte gorutine for at vise tilfældet, hvor der opstår en fejl. Men når du faktisk bruger det, kan du bruge errgroup
til at generere gorutiner og udføre forskellige efterbehandlinger for tilfældet, hvor der opstår en fejl i hver gorutine.
once
Det er et værktøj til at udføre kode, der kun skal udføres én gang. Du kan køre relateret kode via nedenstående konstruktorer.
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFunc
giver simpelthen mulighed for, at den pågældende funktion kun kan udføres én gang i hele.
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() {
7 println("Hello, World!")
8 })
9
10 once()
11 once()
12 once()
13 once()
14 once()
15}
Ovenstående kode er kode, der bruger sync.OnceFunc
til at udskrive Hello, World!
. I denne kode genereres funktionen once
ved hjælp af sync.OnceFunc
, og selvom funktionen once
kaldes flere gange, udskrives Hello, World!
kun én gang.
OnceValue
OnceValue
er ikke blot en funktion, der kun udføres én gang i hele, men gemmer også returværdien for den pågældende funktion og returnerer den gemte værdi, når den kaldes igen.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValue(func() int {
8 c += 1
9 return c
10 })
11
12 println(once())
13 println(once())
14 println(once())
15 println(once())
16 println(once())
17}
Ovenstående kode er kode, der bruger sync.OnceValue
til at øge variablen c
med 1. I denne kode genereres funktionen once
ved hjælp af sync.OnceValue
, og selvom funktionen once
kaldes flere gange, returnerer den 1, hvor variablen c
kun er steget én gang.
OnceValues
OnceValues
fungerer på samme måde som OnceValue
, men kan returnere flere værdier.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValues(func() (int, int) {
8 c += 1
9 return c, c
10 })
11
12 a, b := once()
13 println(a, b)
14 a, b = once()
15 println(a, b)
16 a, b = once()
17 println(a, b)
18 a, b = once()
19 println(a, b)
20 a, b = once()
21 println(a, b)
22}
Ovenstående kode er kode, der bruger sync.OnceValues
til at øge variablen c
med 1. I denne kode genereres funktionen once
ved hjælp af sync.OnceValues
, og selvom funktionen once
kaldes flere gange, returnerer den 1, hvor variablen c
kun er steget én gang.
atomic
atomic
-pakken er en pakke, der leverer atomare operationer. atomic
-pakken leverer metoder som Add
, CompareAndSwap
, Load
, Store
, Swap
, men brugen af typer som Int64
, Uint64
, Pointer
anbefales for nylig.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Dette er eksemplet, der blev brugt før. Dette er kode, der bruger typen atomic.Int64
til at øge variablen c
atomisk. Du kan atomisk øge variablen og læse variablen med metoderne Add
og Load
. Du kan også gemme værdier med metoden Store
, udskifte værdier med metoden Swap
og udskifte værdier, hvis de er egnede, efter at have sammenlignet værdier med metoden CompareAndSwap
.
cond
sync.Cond
cond
-pakken er en pakke, der leverer betingelsesvariabler. cond
-pakken kan genereres med sync.Cond
og leverer metoderne Wait
, Signal
, Broadcast
.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{})
9 ready := false
10
11 go func() {
12 c.L.Lock()
13 ready = true
14 c.Signal()
15 c.L.Unlock()
16 }()
17
18 c.L.Lock()
19 for !ready {
20 c.Wait()
21 }
22 c.L.Unlock()
23
24 println("Ready!")
25}
Ovenstående kode er kode, der bruger sync.Cond
til at vente, indtil variablen ready
er true
. I denne kode venter den ved hjælp af sync.Cond
, indtil variablen ready
er true
, og udskriver derefter Ready!
. Hvis du bruger sync.Cond
på denne måde, kan flere gorutiner samtidigt vente, indtil bestemte betingelser er opfyldt.
Du kan bruge dette til at implementere en simpel queue
.
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex
15 Cond *sync.Cond
16 Head *Node[T]
17 Tail *Node[T]
18 Len int
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{}
23 q.Cond = sync.NewCond(&q.Mutex)
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock()
29 defer q.Unlock()
30
31 node := &Node[T]{Value: value}
32 if q.Len == 0 {
33 q.Head = node
34 q.Tail = node
35 } else {
36 q.Tail.Next = node
37 q.Tail = node
38 }
39 q.Len++
40 q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock()
45 defer q.Unlock()
46
47 for q.Len == 0 {
48 q.Cond.Wait()
49 }
50
51 node := q.Head
52 q.Head = q.Head.Next
53 q.Len--
54 return node.Value
55}
Hvis du bruger sync.Cond
på denne måde, kan du vente effektivt i stedet for at bruge mange CPU-ressourcer med en spin-lock
, og du kan fungere igen, når betingelsen er opfyldt.
semaphore
golang.org/x/sync/semaphore.Semaphore
semaphore
-pakken er en pakke, der leverer semaforer. semaphore
-pakken kan genereres med golang.org/x/sync/semaphore.Semaphore
og leverer metoderne Acquire
, Release
, TryAcquire
.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1)
11
12 if s.TryAcquire(1) {
13 fmt.Println("Acquired!")
14 } else {
15 fmt.Println("Not Acquired!")
16 }
17
18 s.Release(1)
19}
Ovenstående kode er kode, der bruger semaphore
til at generere en semafor og erhverve en semafor ved hjælp af metoden Acquire
og frigive en semafor med metoden Release
. I denne kode viste jeg, hvordan man får og frigiver en semafor ved hjælp af semaphore
.
Afslutning
Jeg tror, at de grundlæggende ting er tilstrækkelige indtil her. Baseret på indholdet i denne artikel håber jeg, at I forstår, hvordan man administrerer samtidighed ved hjælp af gorutiner, og at I faktisk kan bruge dem. Jeg håber, at denne artikel har været nyttig for jer. Tak.