GoSuda

Go Samtidighets-Startpakke

By snowmerak
views ...

Oversikt

Kort introduksjon

Go-språket har mange verktøy for samtidighetskontroll. Denne artikkelen vil introdusere noen av dem og noen triks.

Gorutiner?

goroutine er en ny type samtidighetsmodell støttet av Go-språket. Vanligvis, for at et program skal utføre flere oppgaver samtidig, får det OS-tråder fra operativsystemet, og utfører oppgaver parallelt i henhold til antall kjerner. Og for å utføre samtidighet av mindre enheter, genereres grønne tråder i brukerlandet, slik at flere grønne tråder kjører og utfører oppgaver i en OS-tråd. Men i tilfelle av gorutiner, ble denne typen grønne tråder gjort mindre og mer effektive. Disse gorutinene bruker mindre minne enn tråder, og kan opprettes og byttes raskere enn tråder.

For å bruke gorutiner, er det bare å bruke go-nøkkelordet. Dette gjør det intuitivt å kjøre synkron kode som asynkron kode i prosessen med å skrive programmer.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("Hello, World!")
14    }()
15
16    fmt.Println("Waiting for goroutine...")
17    for range ch {}
18}

Denne koden endrer enkelt synkron kode som venter i 1 sekund og skriver ut Hello, World! til en asynkron flyt. Dette eksemplet er enkelt, men hvis du endrer litt mer kompleks kode fra synkron kode til asynkron kode, blir lesbarheten, synligheten og forståelsen av koden bedre enn eksisterende async await eller promise-metoder.

Men i mange tilfeller, uten å forstå flyten av å bare kalle denne synkrone koden asynkront og flyten som fork & join (en flyt som ligner på splitt og hersk), lages dårlig gorutinekode. Noen metoder og teknikker for å forberede seg på slike tilfeller vil bli introdusert i denne artikkelen.

Samtidighetskontroll

context

Det kan være overraskende at context dukker opp som den første kontrollteknikken. Men i Go-språket spiller context en utmerket rolle i å administrere hele oppgavetreet, utover bare avbrytingsfunksjonen. La meg forklare denne pakken kort for de som ikke vet det.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("Context is done!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

Koden ovenfor bruker context til å skrive ut Context is done! etter 1 sekund. context kan sjekke om den er kansellert gjennom Done()-metoden, og tilbyr forskjellige avbrytingsmetoder gjennom metoder som WithCancel, WithTimeout, WithDeadline og WithValue.

La oss lage et enkelt eksempel. La oss anta at du skriver kode for å hente user, post og comment ved å bruke aggregator-mønsteret for å hente data. Og hvis alle forespørsler må gjøres innen 2 sekunder, kan du skrive det slik:

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("Timeout!")
20    case <-ch:
21        fmt.Println("All data is fetched!")
22    }
23}

Koden ovenfor skriver ut Timeout! hvis den ikke henter alle dataene innen 2 sekunder, og skriver ut All data is fetched! hvis den henter alle dataene. Ved å bruke context på denne måten, kan du enkelt administrere kansellering og tidsavbrudd selv i kode der flere gorutiner kjører.

Du kan sjekke forskjellige context-relaterte funksjoner og metoder relatert til dette på godoc context. Jeg håper du kan lære de enkle tingene og bruke dem enkelt.

channel

unbuffered channel

channel er et verktøy for kommunikasjon mellom gorutiner. channel kan opprettes med make(chan T). På dette tidspunktet er T typen data som channel vil overføre. channel kan sende og motta data med <-, og channel kan lukkes med close.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Koden ovenfor er kode som skriver ut 1 og 2 ved å bruke channel. Denne koden viser bare å sende og motta verdier til channel. Men channel tilbyr mer enn det. La oss først lære om buffered channel og unbuffered channel. Før vi starter, er eksemplet skrevet ovenfor en unbuffered channel, og handlingen med å sende data til kanalen og handlingen med å motta data må skje samtidig. Hvis disse handlingene ikke utføres samtidig, kan det oppstå en vranglås.

buffered channel

Hva om koden ovenfor er to prosesser som utfører tungt arbeid i stedet for bare utskrift? Hvis den andre prosessen leser og utfører behandling, og henger lenge, vil den første prosessen også stoppe i løpet av den tiden. Vi kan bruke buffered channel for å forhindre en slik situasjon.

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Koden ovenfor er kode som skriver ut 1 og 2 ved å bruke buffered channel. I denne koden, ved å bruke en buffered channel, trenger ikke handlingen med å sende data til kanalen og handlingen med å motta data å skje samtidig. Ved å ha en buffer i kanalen, kan du få litt ledig plass i den lengden, og forhindre forsinkelser i arbeidet forårsaket av arbeidet med lavere prioritet.

select

Når du håndterer flere kanaler, kan du enkelt implementere en fan-in-struktur ved å bruke select-syntaksen.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

Koden ovenfor oppretter tre kanaler som overfører 1, 2, 3 periodisk, og bruker select til å motta og skrive ut verdier fra kanalene. Ved å bruke select på denne måten, kan du motta data fra flere kanaler samtidig og behandle dem etter hvert som du mottar verdier fra kanalene.

for range

channel kan enkelt motta data ved å bruke for range. Hvis du bruker for range til kanalen, vil den fungere hver gang data legges til kanalen, og den avslutter løkken når kanalen er lukket.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Koden ovenfor er kode som skriver ut 1 og 2 ved å bruke channel. I denne koden mottar og skriver den ut data hver gang data legges til kanalen ved hjelp av for range. Og den avslutter løkken når kanalen er lukket.

Som skrevet flere ganger ovenfor, kan denne syntaksen brukes som et enkelt synkroniseringsmiddel.

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("Hello, World!")
 9    }()
10
11    fmt.Println("Waiting for goroutine...")
12    for range ch {}
13}

Koden ovenfor er kode som skriver ut Hello, World! etter å ha ventet i 1 sekund. I denne koden endret den synkron kode til asynkron kode ved å bruke channel. Ved å bruke channel på denne måten, kan du enkelt endre synkron kode til asynkron kode og sette join-punkter.

etc

  1. Hvis du sender eller mottar data til en nil-kanal, kan du havne i en uendelig løkke og forårsake en vranglås.
  2. Hvis du sender data etter at kanalen er lukket, oppstår panikk.
  3. Selv om du ikke lukker kanalen, lukkes den av GC mens den samles inn.

mutex

spinlock

spinlock er en synkroniseringsmetode som prøver å låse kontinuerlig ved å snurre en løkke. I Go-språket kan du enkelt implementere en spinlock ved å bruke en peker.

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

Koden ovenfor er kode som implementerer spinlock-pakken. I denne koden implementerte vi SpinLock ved å bruke sync/atomic-pakken. Lock-metoden prøver å låse ved å bruke atomic.CompareAndSwapUintptr, og Unlock-metoden frigjør låsen ved å bruke atomic.StoreUintptr. Denne metoden fortsetter å bruke CPU-en til du får låsen fordi den prøver å låse uten å hvile, og kan havne i en uendelig løkke. Derfor er det best å bruke spinlock for enkel synkronisering eller hvis du bare bruker den i en kort periode.

sync.Mutex

mutex er et verktøy for synkronisering mellom gorutiner. mutex levert av sync-pakken tilbyr metoder som Lock, Unlock, RLock, RUnlock. mutex kan opprettes med sync.Mutex, og du kan også bruke lese-/skrivelåser med sync.RWMutex.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

I koden ovenfor får to gorutiner tilgang til den samme count-variabelen nesten samtidig. På dette tidspunktet, hvis du bruker mutex til å lage koden som får tilgang til count-variabelen til et kritisk område, kan du forhindre samtidig tilgang til count-variabelen. Da vil denne koden skrive ut 2 på samme måte uansett hvor mange ganger den kjøres.

sync.RWMutex

sync.RWMutex er en mutex som kan brukes separat med en leselås og en skrivlås. Du kan låse og frigi leselåsen ved å bruke metodene RLock, RUnlock.

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

Koden ovenfor er kode som implementerer ConcurrentMap ved å bruke sync.RWMutex. I denne koden låser Get-metoden leselåsen, og Set-metoden låser skrivlåsen for trygt å få tilgang til og modifisere data-kartet. Årsaken til at leselåser er nødvendig er at i mange tilfeller av enkle lesearbeid, kan flere gorutiner utføre lesearbeid samtidig ved bare å låse leselåsen uten å låse skrivlåsen. Gjennom dette, hvis det ikke er noen endring i statusen, slik at det ikke er nødvendig å låse skrivlåsen, kan ytelsen forbedres ved å låse bare leselåsen.

fakelock

fakelock er et enkelt triks for å implementere sync.Locker. Denne strukturen tilbyr de samme metodene som sync.Mutex, men den gjør ikke noe.

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

Koden ovenfor er kode som implementerer fakelock-pakken. Denne pakken implementerer sync.Locker og tilbyr metodene Lock og Unlock, men den gjør faktisk ingenting. Jeg skal beskrive hvorfor denne koden er nødvendig hvis jeg får sjansen.

waitgroup

sync.WaitGroup

sync.WaitGroup er et verktøy for å vente til alle gorutine-oppgavene er ferdige. Den tilbyr metodene Add, Done og Wait, og legger til antall gorutiner med Add-metoden, og signaliserer at gorutine-oppgaven er fullført med Done-metoden. Og den venter til alle gorutine-oppgavene er ferdige med Wait-metoden.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
for i := 0; i < 100 ; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        c.Add(1)
    }()
}

wg.Wait()
println(c.Load())

}

1

위 코드는 sync.WaitGroup을 사용하여 100개의 고루틴이 동시에 c 변수에 값을 더하는 코드입니다. 이 코드에서는 sync.WaitGroup을 사용하여 모든 고루틴이 끝날 때까지 기다린 후, c 변수에 더한 값을 출력합니다. 단순하게 몇몇개의 작업을 fork & join하는 경우엔 채널만을 이용해도 충분하지만, 다량의 작업을 fork & join하는 경우엔 sync.WaitGroup을 사용하는 것도 좋은 선택지입니다.

 1
 2#### med slice
 3
 4Hvis de brukes med slices, kan `waitgroup` være et utmerket verktøy for å administrere samtidige utførelsesoperasjoner uten låsing.
 5
 6```go
 7package main
 8
 9import (
10	"fmt"
11	"sync"
12    "rand"
13)
14
15func main() {
16	var wg sync.WaitGroup
17	arr := [10]int{}
18
19	for i := 0; i < 10; i++ {
20		wg.Add(1)
21		go func(id int) {
22			defer wg.Done()
23
24			arr[id] = rand.Intn(100)
25		}(i)
26	}
27
28	wg.Wait()
29	fmt.Println("Done")
30
31    for i, v := range arr {
32        fmt.Printf("arr[%d] = %d\n", i, v)
33    }
34}
1Ovennevnte kode er en kode som bruker `waitgroup` alene for å generere 10 tilfeldige heltall samtidig i hver goroutine og lagre dem i de tildelte indeksene. I denne koden bruker vi `waitgroup` for å vente til alle goroutiner er ferdige, og deretter skrive ut `Done`. Ved å bruke `waitgroup` på denne måten kan du la flere goroutiner utføre oppgaver samtidig, lagre data uten låsing til alle goroutiner er ferdige, og utføre batch-etterbehandling etter at oppgavene er fullført.

golang.org/x/sync/errgroup.ErrGroup

errgroup er en pakke som utvider sync.WaitGroup. I motsetning til sync.WaitGroup, avbryter errgroup alle goroutiner og returnerer en feil hvis det oppstår en feil i en av goroutinens oppgaver.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}
1Ovennevnte kode genererer 10 goroutiner ved hjelp av `errgroup` og forårsaker en feil i den femte goroutinen. Vi demonstrerte det ved å med vilje forårsake en feil i den femte gorutinen. Men i faktisk bruk kan du bruke `errgroup` til å generere goroutiner og utføre forskjellige etterbehandlinger for tilfeller der det oppstår feil i hver goroutine.

once

Et verktøy for å utføre kode som bare skal utføres én gang. Du kan kjøre relatert kode gjennom konstruktøren nedenfor.

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc sørger ganske enkelt for at den aktuelle funksjonen bare kan kjøres én gang overalt.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}
1Ovennevnte kode er en kode som skriver ut "Hello, World!" ved hjelp av `sync.OnceFunc`. I denne koden bruker vi `sync.OnceFunc` til å generere `once`-funksjonen, og selv om du kaller `once`-funksjonen flere ganger, vil "Hello, World!" bare skrives ut én gang.

OnceValue

OnceValue sørger ganske enkelt for at den aktuelle funksjonen ikke bare utføres én gang overalt, men også lagrer returverdien til den aktuelle funksjonen og returnerer den lagrede verdien når den kalles igjen.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}
1Ovennevnte kode er en kode som øker variabelen `c` med 1 ved å bruke `sync.OnceValue`. I denne koden bruker vi `sync.OnceValue` til å generere `once`-funksjonen, og selv om du kaller `once`-funksjonen flere ganger, returnerer den bare 1, der variabelen `c` er økt én gang.

OnceValues

OnceValues fungerer på samme måte som OnceValue, men kan returnere flere verdier.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}
1Ovennevnte kode er en kode som øker variabelen `c` med 1 ved å bruke `sync.OnceValues`. I denne koden bruker vi `sync.OnceValues` til å generere `once`-funksjonen, og selv om du kaller `once`-funksjonen flere ganger, returnerer den bare 1, der variabelen `c` er økt én gang.

atomic

atomic-pakken er en pakke som gir atomoperasjoner. atomic-pakken tilbyr metoder som Add, CompareAndSwap, Load, Store og Swap, men det anbefales å bruke typer som Int64, Uint64 og Pointer i det siste.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Dette er eksemplet som ble brukt før. Det er en kode som bruker typen atomic.Int64 til å øke variabelen c atomisk. Du kan atomisk øke en variabel med Add-metoden og Load-metoden, og lese variabelen. Du kan også lagre en verdi med Store-metoden, erstatte en verdi med Swap-metoden, og sammenligne en verdi og erstatte den hvis den er egnet med CompareAndSwap-metoden.

cond

sync.Cond

cond-pakken er en pakke som gir en betinget variabel. cond-pakken kan genereres med sync.Cond og gir metodene Wait, Signal og Broadcast.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}
1Ovennevnte kode er en kode som venter til variabelen `ready` blir `true` ved å bruke `sync.Cond`. I denne koden bruker vi `sync.Cond` for å vente til variabelen `ready` blir `true`, og deretter skrive ut "Ready!". Ved å bruke `sync.Cond` på denne måten, kan flere goroutiner vente samtidig til en bestemt betingelse er oppfylt.

Du kan implementere en enkel queue ved å utnytte dette.

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

Ved å utnytte sync.Cond på denne måten, kan du effektivt vente i stedet for å bruke et spin-lock som bruker mye CPU, og utføre igjen når betingelsen er oppfylt.

semaphore

golang.org/x/sync/semaphore.Semaphore

semaphore-pakken er en pakke som gir en semafor. semaphore-pakken kan genereres med golang.org/x/sync/semaphore.Semaphore og gir metodene Acquire, Release og TryAcquire.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}
1Ovennevnte kode er en kode som genererer en semafor ved hjelp av semaforen og bruker semaforen til å skaffe semaforen med `Acquire`-metoden og frigjør semaforen med `Release`-metoden. I denne koden demonstrerte vi hvordan du skaffer og frigjør en semafor ved hjelp av semaforen.

Avslutning

Det grunnleggende innholdet bør være det. Basert på innholdet i denne artikkelen, håper jeg at du kan forstå hvordan du administrerer samtidighet ved å bruke goroutiner og faktisk bruke dem. Jeg håper denne artikkelen var nyttig for deg. Takk skal du ha.