Go samtidighet startpaket
Översikt
Kort introduktion
Go-språket har många verktyg för samtidighetskontroll. I denna artikel kommer vi att presentera några av dem och några tricks.
Gorutiner?
En gorutin är en ny typ av samtidighetsmodell som stöds i Go-språket. Vanligtvis får ett program OS-trådar från operativsystemet för att utföra flera uppgifter samtidigt, och utför operationer parallellt så många som kärnorna. För att utföra samtidighetsoperationer i mindre skala skapas gröna trådar i användarutrymmet, så att flera gröna trådar utför operationer i en OS-tråd. Men i fallet med gorutiner har denna typ av grön tråd gjorts mindre och mer effektiv. Dessa gorutiner använder mindre minne än trådar och kan skapas och bytas ut snabbare än trådar.
För att använda en gorutin behöver du bara använda nyckelordet go
. Detta gör det möjligt att intuitivt köra synkron kod som asynkron kod under programskrivningsprocessen.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch)
12 time.Sleep(1 * time.Second)
13 fmt.Println("Hello, World!")
14 }()
15
16 fmt.Println("Waiting for goroutine...")
17 for range ch {}
18}
Denna kod ändrar enkelt synkron kod som vilar i 1 sekund och matar ut Hello, World!
till ett asynkront flöde. Detta exempel är enkelt, men om du ändrar lite mer komplex kod från synkron till asynkron kod, blir läsbarheten, synligheten och förståelsen av koden bättre än med befintliga metoder som async await eller promise.
I många fall skapas dock dålig gorutinkod om man inte förstår flödet av att helt enkelt anropa sådan synkron kod asynkront och flöden som fork & join
(ett flöde som liknar divide and conquer). I den här artikeln kommer vi att introducera några metoder och tekniker för att förbereda oss för dessa fall.
Samtidighetskontroll
context
Att context
är den första kontrolltekniken kan vara överraskande. Men i Go-språket spelar context
en utmärkt roll i att hantera hela arbetsträdet utöver en enkel annulleringsfunktion. För dem som inte vet, låt oss kort förklara detta paket.
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background())
5 defer cancel()
6
7 go func() {
8 <-ctx.Done()
9 fmt.Println("Context is done!")
10 }()
11
12 time.Sleep(1 * time.Second)
13
14 cancel()
15
16 time.Sleep(1 * time.Second)
17}
Koden ovan använder context
för att mata ut Context is done!
efter 1 sekund. context
kan kontrollera om en annullering har skett genom Done()
-metoden och ger olika avbokningsmetoder genom metoder som WithCancel
, WithTimeout
, WithDeadline
, WithValue
.
Låt oss skapa ett enkelt exempel. Anta att du skriver kod för att hämta user
, post
och comment
med hjälp av aggregator
-mönstret för att hämta data. Och om alla förfrågningar måste göras inom 2 sekunder kan du skriva den som följer:
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
5 defer cancel()
6
7 ch := make(chan struct{})
8 go func() {
9 defer close(ch)
10 user := getUser(ctx)
11 post := getPost(ctx)
12 comment := getComment(ctx)
13
14 fmt.Println(user, post, comment)
15 }()
16
17 select {
18 case <-ctx.Done():
19 fmt.Println("Timeout!")
20 case <-ch:
21 fmt.Println("All data is fetched!")
22 }
23}
Koden ovan matar ut Timeout!
om den inte kan hämta all data inom 2 sekunder och matar ut All data is fetched!
om den hämtar all data. Genom att använda context
på det här sättet kan du enkelt hantera avbokningar och tidsgränser även i kod som kör flera gorutiner.
Olika context
-relaterade funktioner och metoder kan kontrolleras i godoc context. Vi hoppas att du ska lära dig de enkla och använda dem bekvämt.
channel
unbuffered channel
channel
är ett verktyg för kommunikation mellan gorutiner. channel
kan skapas med make(chan T)
. Här är T
typen av data som channel
ska leverera. channel
kan utbyta data med <-
, och channel
kan stängas med close
.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Koden ovan matar ut 1 och 2 med hjälp av channel
. Denna kod visar bara enkelt hur man skickar och tar emot värden i channel
. Men channel
erbjuder fler funktioner än så. Låt oss först lära oss om buffered channel
och unbuffered channel
. Först och främst är exemplet som skrivits ovan ett unbuffered channel
, som kräver att åtgärden att skicka data till kanalen och åtgärden att ta emot data sker samtidigt. Om dessa åtgärder inte sker samtidigt kan det uppstå en dead lock.
buffered channel
Vad händer om koden ovan inte är en enkel utskrift utan två processer som utför tunga operationer? Om den andra processen hänger sig under lång tid medan den läser och utför bearbetning, kommer den första processen också att stoppas under den tiden. Vi kan använda buffered channel
för att förhindra denna situation.
1package main
2
3func main() {
4 ch := make(chan int, 2)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Koden ovan matar ut 1 och 2 med hjälp av buffered channel
. I den här koden används buffered channel
för att säkerställa att åtgärden att skicka data till kanalen och åtgärden att ta emot data inte behöver ske samtidigt. Genom att lägga till en buffert i kanalen på det här sättet skapas ett utrymme motsvarande dess längd, vilket kan förhindra driftsfördröjningar som orsakas av påverkan av efterföljande operationer.
select
När du hanterar flera kanaler kan du enkelt implementera en fan-in
-struktur med select
-syntaxen.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10)
10 ch2 := make(chan int, 10)
11 ch3 := make(chan int, 10)
12
13 go func() {
14 for {
15 ch1 <- 1
16 time.Sleep(1 * time.Second)
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2
22 time.Sleep(2 * time.Second)
23 }
24 }()
25 go func() {
26 for {
27 ch3 <- 3
28 time.Sleep(3 * time.Second)
29 }
30 }()
31
32 for i := 0; i < 3; i++ {
33 select {
34 case v := <-ch1:
35 fmt.Println(v)
36 case v := <-ch2:
37 fmt.Println(v)
38 case v := <-ch3:
39 fmt.Println(v)
40 }
41 }
42}
Koden ovan skapar tre kanaler som periodiskt skickar 1, 2 och 3 och använder select
för att ta emot värden från kanalerna och mata ut dem. Genom att använda select
på detta sätt kan du ta emot data från flera kanaler samtidigt och bearbeta värdena när de tas emot från kanalerna.
for range
channel
kan enkelt ta emot data med hjälp av for range
. Om for range
används i en kanal fungerar det varje gång data läggs till i kanalen och avslutar loopen när kanalen stängs.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Koden ovan matar ut 1 och 2 med hjälp av channel
. I den här koden används for range
för att ta emot och mata ut data varje gång data läggs till i kanalen. Och loopen avslutas när kanalen stängs.
Som vi har skrivit några gånger ovan kan denna syntax också användas som ett enkelt synkroniseringsmedel.
1package main
2
3func main() {
4 ch := make(chan struct{})
5 go func() {
6 defer close(ch)
7 time.Sleep(1 * time.Second)
8 fmt.Println("Hello, World!")
9 }()
10
11 fmt.Println("Waiting for goroutine...")
12 for range ch {}
13}
Koden ovan vilar i 1 sekund och matar ut Hello, World!
. I den här koden används channel
för att ändra synkron kod till asynkron kod. Genom att använda channel
på det här sättet kan du enkelt ändra synkron kod till asynkron kod och ställa in join
-punkten.
etc
- Om du skickar eller tar emot data i en nil-kanal kan du hamna i en oändlig loop och en deadlock.
- Om du skickar data efter att ha stängt kanalen kommer en panic att inträffa.
- Du behöver inte stänga kanalen, GC stänger kanalen medan den samlas in.
mutex
spinlock
spinlock
är en synkroniseringsmetod som kontinuerligt försöker få en låsning genom att köra en loop. I Go-språket kan du enkelt implementera en spinlock med hjälp av en pekare.
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14 runtime.Gosched()
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{}
24}
Koden ovan är kod som implementerar spinlock
-paketet. I den här koden används sync/atomic
-paketet för att implementera SpinLock
. Metoden Lock
använder atomic.CompareAndSwapUintptr
för att försöka få en låsning, och metoden Unlock
använder atomic.StoreUintptr
för att frigöra låsningen. Denna metod försöker låsa utan att vila, så den fortsätter att använda CPU:n tills den får låsningen, vilket kan leda till en oändlig loop. Därför är det bäst att använda spinlock
för enkel synkronisering eller endast under en kort tid.
sync.Mutex
mutex
är ett verktyg för synkronisering mellan gorutiner. mutex
som tillhandahålls av sync
-paketet tillhandahåller metoder som Lock
, Unlock
, RLock
, RUnlock
. mutex
kan skapas med sync.Mutex
, och en läs-/skrivlås kan användas med sync.RWMutex
.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex
9 var count int
10
11 go func() {
12 mu.Lock()
13 count++
14 mu.Unlock()
15 }()
16
17 mu.Lock()
18 count++
19 mu.Unlock()
20
21 println(count)
22}
I koden ovan får två gorutiner tillgång till samma count
-variabel nästan samtidigt. Om vi i det här fallet skapar koden som får tillgång till count
-variabeln som en kritisk region med hjälp av mutex
kan vi förhindra samtidig åtkomst till count
-variabeln. Då kommer denna kod att mata ut 2
lika oavsett hur många gånger den körs.
sync.RWMutex
sync.RWMutex
är en mutex
som kan användas för att skilja mellan läslås och skrivlås. Du kan låsa och frigöra ett läslås med hjälp av metoderna RLock
och RUnlock
.
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex
9 data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock()
14 defer m.RUnlock()
15
16 value, ok := m.data[key]
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock()
22 defer m.Unlock()
23
24 m.data[key] = value
25}
Koden ovan är kod som implementerar ConcurrentMap
med hjälp av sync.RWMutex
. I den här koden låses läsning i metoden Get
och skrivning låses i metoden Set
för att på ett säkert sätt få åtkomst till och ändra data
-kartan. Anledningen till att ett läslås krävs är att om det finns många enkla läsoperationer kan flera gorutiner utföra läsoperationer samtidigt genom att endast låsa läsning utan att låsa skrivning. På så sätt kan vi förbättra prestandan genom att endast låsa läsning i de fall där det inte är nödvändigt att låsa skrivning eftersom statusen inte ändras.
fakelock
fakelock
är ett enkelt trick som implementerar sync.Locker
. Denna struktur tillhandahåller samma metoder som sync.Mutex
, men den utför inte någon faktisk operation.
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}
Koden ovan är kod som implementerar fakelock
-paketet. Detta paket implementerar sync.Locker
och tillhandahåller metoderna Lock
och Unlock
, men de gör egentligen ingenting. Jag kommer att beskriva varför denna kod behövs när tillfället ges.
waitgroup
sync.WaitGroup
sync.WaitGroup
är ett verktyg för att vänta tills alla gorutiner är klara. Det tillhandahåller metoderna Add
, Done
och Wait
. Antalet gorutiner läggs till med metoden Add
, och Done
-metoden meddelar att en gorutins arbete har avslutats. Och metoden Wait
väntar tills arbetet för alla gorutiner är klart.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Ovanstående kod använder sync.WaitGroup
för att låta 100 gorutiner samtidigt addera värden till variabeln c
. I denna kod används sync.WaitGroup
för att vänta tills alla gorutiner har avslutats, varefter värdet som adderats till variabeln c
skrivs ut. Att använda enbart kanaler kan vara tillräckligt för att fork & join
-hantera ett mindre antal operationer, men att använda sync.WaitGroup
är ett bra alternativ för att hantera ett stort antal fork & join
-operationer.
med slice
Om det används tillsammans med en slice kan waitgroup
vara ett utmärkt verktyg för att hantera samtidiga operationer utan låsningar.
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup
11 arr := [10]int{}
12
13 for i := 0; i < 10; i++ {
14 wg.Add(1)
15 go func(id int) {
16 defer wg.Done()
17
18 arr[id] = rand.Intn(100)
19 }(i)
20 }
21
22 wg.Wait()
23 fmt.Println("Done")
24
25 for i, v := range arr {
26 fmt.Printf("arr[%d] = %d\n", i, v)
27 }
28}
Ovanstående kod använder enbart waitgroup
för att låta varje gorutin samtidigt generera 10 slumpmässiga heltal och lagra dem på det tilldelade indexet. I denna kod används waitgroup
för att vänta tills alla gorutiner har avslutats, varefter Done
skrivs ut. Genom att använda waitgroup
på detta sätt kan flera gorutiner utföra operationer samtidigt, lagra data utan låsningar tills alla gorutiner har avslutats, och utföra efterbearbetning gemensamt efter att operationerna har avslutats.
golang.org/x/sync/errgroup.ErrGroup
errgroup
är ett paket som utökar sync.WaitGroup
. Till skillnad från sync.WaitGroup
avbryter errgroup
alla gorutiner och returnerar ett fel om ett fel uppstår i någon av gorutinernas operationer.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background())
11 _ = ctx
12
13 for i := 0; i < 10; i++ {
14 i := i
15 g.Go(func() error {
16 if i == 5 {
17 return fmt.Errorf("error")
18 }
19 return nil
20 })
21 }
22
23 if err := g.Wait(); err != nil {
24 fmt.Println(err)
25 }
26}
Ovanstående kod använder errgroup
för att skapa 10 gorutiner och genererar ett fel i den femte gorutinen. Avsiktligt genererades ett fel i den femte gorutinen för att visa ett fall där ett fel uppstår. Men i praktiken kan errgroup
användas för att skapa gorutiner och utföra olika efterbearbetningar för fall där fel uppstår i varje gorutin.
once
Detta är ett verktyg för att exekvera kod som endast bör köras en gång. Relevant kod kan köras via följande konstruktörer.
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFunc
gör det möjligt att enkelt säkerställa att den aktuella funktionen bara kan köras en gång i sin helhet.
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() {
7 println("Hello, World!")
8 })
9
10 once()
11 once()
12 once()
13 once()
14 once()
15}
Ovanstående kod använder sync.OnceFunc
för att skriva ut Hello, World!
. I denna kod används sync.OnceFunc
för att skapa funktionen once
, och även om once
-funktionen anropas flera gånger skrivs Hello, World!
endast ut en gång.
OnceValue
OnceValue
ser inte bara till att den aktuella funktionen bara körs en gång i sin helhet, utan sparar också returvärdet från den aktuella funktionen och returnerar det sparade värdet när det anropas igen.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValue(func() int {
8 c += 1
9 return c
10 })
11
12 println(once())
13 println(once())
14 println(once())
15 println(once())
16 println(once())
17}
Ovanstående kod använder sync.OnceValue
för att öka variabeln c
med 1. I denna kod används sync.OnceValue
för att skapa funktionen once
, och även om once
-funktionen anropas flera gånger returneras endast 1, vilket är värdet av c
som endast har ökat en gång.
OnceValues
OnceValues
fungerar på samma sätt som OnceValue
, men kan returnera flera värden.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValues(func() (int, int) {
8 c += 1
9 return c, c
10 })
11
12 a, b := once()
13 println(a, b)
14 a, b = once()
15 println(a, b)
16 a, b = once()
17 println(a, b)
18 a, b = once()
19 println(a, b)
20 a, b = once()
21 println(a, b)
22}
Ovanstående kod använder sync.OnceValues
för att öka variabeln c
med 1. I denna kod används sync.OnceValues
för att skapa funktionen once
, och även om once
-funktionen anropas flera gånger returneras endast 1, vilket är värdet av c
som endast har ökat en gång.
atomic
atomic
-paketet tillhandahåller atomära operationer. atomic
-paketet tillhandahåller metoder som Add
, CompareAndSwap
, Load
, Store
, Swap
, men på senare tid rekommenderas att använda typer som Int64
, Uint64
, Pointer
med mera.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Detta är samma exempel som användes tidigare. Det här är kod som atomärt ökar variabeln c
med typen atomic.Int64
. Med metoderna Add
och Load
kan variabeln atomärt ökas och avläsas. Dessutom kan värden lagras med metoden Store
, värden kan bytas med metoden Swap
och värden kan jämföras och bytas ut med metoden CompareAndSwap
om de matchar.
cond
sync.Cond
cond
-paketet tillhandahåller villkorsvariabler. cond
-paketet kan skapas med sync.Cond
och tillhandahåller metoderna Wait
, Signal
och Broadcast
.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{})
9 ready := false
10
11 go func() {
12 c.L.Lock()
13 ready = true
14 c.Signal()
15 c.L.Unlock()
16 }()
17
18 c.L.Lock()
19 for !ready {
20 c.Wait()
21 }
22 c.L.Unlock()
23
24 println("Ready!")
25}
Ovanstående kod använder sync.Cond
för att vänta tills variabeln ready
blir true
. I denna kod används sync.Cond
för att vänta tills variabeln ready
blir true
, varefter Ready!
skrivs ut. Genom att använda sync.Cond
på detta sätt kan flera gorutiner samtidigt vänta tills ett visst villkor är uppfyllt.
Detta kan användas för att implementera en enkel queue
.
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex
15 Cond *sync.Cond
16 Head *Node[T]
17 Tail *Node[T]
18 Len int
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{}
23 q.Cond = sync.NewCond(&q.Mutex)
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock()
29 defer q.Unlock()
30
31 node := &Node[T]{Value: value}
32 if q.Len == 0 {
33 q.Head = node
34 q.Tail = node
35 } else {
36 q.Tail.Next = node
37 q.Tail = node
38 }
39 q.Len++
40 q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock()
45 defer q.Unlock()
46
47 for q.Len == 0 {
48 q.Cond.Wait()
49 }
50
51 node := q.Head
52 q.Head = q.Head.Next
53 q.Len--
54 return node.Value
55}
Genom att använda sync.Cond
på detta sätt kan man effektivt vänta och återuppta driften när villkoret är uppfyllt, istället för att använda en spin-lock
som använder mycket CPU.
semaphore
golang.org/x/sync/semaphore.Semaphore
semaphore
-paketet tillhandahåller semaforer. semaphore
-paketet kan skapas med golang.org/x/sync/semaphore.Semaphore
och tillhandahåller metoderna Acquire
, Release
och TryAcquire
.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1)
11
12 if s.TryAcquire(1) {
13 fmt.Println("Acquired!")
14 } else {
15 fmt.Println("Not Acquired!")
16 }
17
18 s.Release(1)
19}
Ovanstående kod använder semaphore
för att skapa en semafor, och med semaforen används metoden Acquire
för att erhålla semaforen och metoden Release
för att frigöra semaforen. I den här koden har vi visat hur semaphore
används för att erhålla och frigöra en semafor.
Avslutningsvis
Detta borde räcka för grunderna. Jag hoppas att du med utgångspunkt i innehållet i den här artikeln förstår hur du använder gorutiner för att hantera samtidighet och faktiskt kan använda dem. Jag hoppas att den här artikeln har varit till hjälp för dig. Tack.