GoSuda

Go samtidighet startpaket

By snowmerak
views ...

Översikt

Kort introduktion

Go-språket har många verktyg för samtidighetskontroll. I denna artikel kommer vi att presentera några av dem och några tricks.

Gorutiner?

En gorutin är en ny typ av samtidighetsmodell som stöds i Go-språket. Vanligtvis får ett program OS-trådar från operativsystemet för att utföra flera uppgifter samtidigt, och utför operationer parallellt så många som kärnorna. För att utföra samtidighetsoperationer i mindre skala skapas gröna trådar i användarutrymmet, så att flera gröna trådar utför operationer i en OS-tråd. Men i fallet med gorutiner har denna typ av grön tråd gjorts mindre och mer effektiv. Dessa gorutiner använder mindre minne än trådar och kan skapas och bytas ut snabbare än trådar.

För att använda en gorutin behöver du bara använda nyckelordet go. Detta gör det möjligt att intuitivt köra synkron kod som asynkron kod under programskrivningsprocessen.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("Hello, World!")
14    }()
15
16    fmt.Println("Waiting for goroutine...")
17    for range ch {}
18}

Denna kod ändrar enkelt synkron kod som vilar i 1 sekund och matar ut Hello, World! till ett asynkront flöde. Detta exempel är enkelt, men om du ändrar lite mer komplex kod från synkron till asynkron kod, blir läsbarheten, synligheten och förståelsen av koden bättre än med befintliga metoder som async await eller promise.

I många fall skapas dock dålig gorutinkod om man inte förstår flödet av att helt enkelt anropa sådan synkron kod asynkront och flöden som fork & join (ett flöde som liknar divide and conquer). I den här artikeln kommer vi att introducera några metoder och tekniker för att förbereda oss för dessa fall.

Samtidighetskontroll

context

Att context är den första kontrolltekniken kan vara överraskande. Men i Go-språket spelar context en utmärkt roll i att hantera hela arbetsträdet utöver en enkel annulleringsfunktion. För dem som inte vet, låt oss kort förklara detta paket.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("Context is done!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

Koden ovan använder context för att mata ut Context is done! efter 1 sekund. context kan kontrollera om en annullering har skett genom Done()-metoden och ger olika avbokningsmetoder genom metoder som WithCancel, WithTimeout, WithDeadline, WithValue.

Låt oss skapa ett enkelt exempel. Anta att du skriver kod för att hämta user, post och comment med hjälp av aggregator-mönstret för att hämta data. Och om alla förfrågningar måste göras inom 2 sekunder kan du skriva den som följer:

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("Timeout!")
20    case <-ch:
21        fmt.Println("All data is fetched!")
22    }
23}

Koden ovan matar ut Timeout! om den inte kan hämta all data inom 2 sekunder och matar ut All data is fetched! om den hämtar all data. Genom att använda context på det här sättet kan du enkelt hantera avbokningar och tidsgränser även i kod som kör flera gorutiner.

Olika context-relaterade funktioner och metoder kan kontrolleras i godoc context. Vi hoppas att du ska lära dig de enkla och använda dem bekvämt.

channel

unbuffered channel

channel är ett verktyg för kommunikation mellan gorutiner. channel kan skapas med make(chan T). Här är T typen av data som channel ska leverera. channel kan utbyta data med <-, och channel kan stängas med close.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Koden ovan matar ut 1 och 2 med hjälp av channel. Denna kod visar bara enkelt hur man skickar och tar emot värden i channel. Men channel erbjuder fler funktioner än så. Låt oss först lära oss om buffered channel och unbuffered channel. Först och främst är exemplet som skrivits ovan ett unbuffered channel, som kräver att åtgärden att skicka data till kanalen och åtgärden att ta emot data sker samtidigt. Om dessa åtgärder inte sker samtidigt kan det uppstå en dead lock.

buffered channel

Vad händer om koden ovan inte är en enkel utskrift utan två processer som utför tunga operationer? Om den andra processen hänger sig under lång tid medan den läser och utför bearbetning, kommer den första processen också att stoppas under den tiden. Vi kan använda buffered channel för att förhindra denna situation.

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Koden ovan matar ut 1 och 2 med hjälp av buffered channel. I den här koden används buffered channel för att säkerställa att åtgärden att skicka data till kanalen och åtgärden att ta emot data inte behöver ske samtidigt. Genom att lägga till en buffert i kanalen på det här sättet skapas ett utrymme motsvarande dess längd, vilket kan förhindra driftsfördröjningar som orsakas av påverkan av efterföljande operationer.

select

När du hanterar flera kanaler kan du enkelt implementera en fan-in-struktur med select-syntaxen.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

Koden ovan skapar tre kanaler som periodiskt skickar 1, 2 och 3 och använder select för att ta emot värden från kanalerna och mata ut dem. Genom att använda select på detta sätt kan du ta emot data från flera kanaler samtidigt och bearbeta värdena när de tas emot från kanalerna.

for range

channel kan enkelt ta emot data med hjälp av for range. Om for range används i en kanal fungerar det varje gång data läggs till i kanalen och avslutar loopen när kanalen stängs.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Koden ovan matar ut 1 och 2 med hjälp av channel. I den här koden används for range för att ta emot och mata ut data varje gång data läggs till i kanalen. Och loopen avslutas när kanalen stängs.

Som vi har skrivit några gånger ovan kan denna syntax också användas som ett enkelt synkroniseringsmedel.

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("Hello, World!")
 9    }()
10
11    fmt.Println("Waiting for goroutine...")
12    for range ch {}
13}

Koden ovan vilar i 1 sekund och matar ut Hello, World!. I den här koden används channel för att ändra synkron kod till asynkron kod. Genom att använda channel på det här sättet kan du enkelt ändra synkron kod till asynkron kod och ställa in join-punkten.

etc

  1. Om du skickar eller tar emot data i en nil-kanal kan du hamna i en oändlig loop och en deadlock.
  2. Om du skickar data efter att ha stängt kanalen kommer en panic att inträffa.
  3. Du behöver inte stänga kanalen, GC stänger kanalen medan den samlas in.

mutex

spinlock

spinlock är en synkroniseringsmetod som kontinuerligt försöker få en låsning genom att köra en loop. I Go-språket kan du enkelt implementera en spinlock med hjälp av en pekare.

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

Koden ovan är kod som implementerar spinlock-paketet. I den här koden används sync/atomic-paketet för att implementera SpinLock. Metoden Lock använder atomic.CompareAndSwapUintptr för att försöka få en låsning, och metoden Unlock använder atomic.StoreUintptr för att frigöra låsningen. Denna metod försöker låsa utan att vila, så den fortsätter att använda CPU:n tills den får låsningen, vilket kan leda till en oändlig loop. Därför är det bäst att använda spinlock för enkel synkronisering eller endast under en kort tid.

sync.Mutex

mutex är ett verktyg för synkronisering mellan gorutiner. mutex som tillhandahålls av sync-paketet tillhandahåller metoder som Lock, Unlock, RLock, RUnlock. mutex kan skapas med sync.Mutex, och en läs-/skrivlås kan användas med sync.RWMutex.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

I koden ovan får två gorutiner tillgång till samma count-variabel nästan samtidigt. Om vi i det här fallet skapar koden som får tillgång till count-variabeln som en kritisk region med hjälp av mutex kan vi förhindra samtidig åtkomst till count-variabeln. Då kommer denna kod att mata ut 2 lika oavsett hur många gånger den körs.

sync.RWMutex

sync.RWMutex är en mutex som kan användas för att skilja mellan läslås och skrivlås. Du kan låsa och frigöra ett läslås med hjälp av metoderna RLock och RUnlock.

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

Koden ovan är kod som implementerar ConcurrentMap med hjälp av sync.RWMutex. I den här koden låses läsning i metoden Get och skrivning låses i metoden Set för att på ett säkert sätt få åtkomst till och ändra data-kartan. Anledningen till att ett läslås krävs är att om det finns många enkla läsoperationer kan flera gorutiner utföra läsoperationer samtidigt genom att endast låsa läsning utan att låsa skrivning. På så sätt kan vi förbättra prestandan genom att endast låsa läsning i de fall där det inte är nödvändigt att låsa skrivning eftersom statusen inte ändras.

fakelock

fakelock är ett enkelt trick som implementerar sync.Locker. Denna struktur tillhandahåller samma metoder som sync.Mutex, men den utför inte någon faktisk operation.

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

Koden ovan är kod som implementerar fakelock-paketet. Detta paket implementerar sync.Locker och tillhandahåller metoderna Lock och Unlock, men de gör egentligen ingenting. Jag kommer att beskriva varför denna kod behövs när tillfället ges.

waitgroup

sync.WaitGroup

sync.WaitGroup är ett verktyg för att vänta tills alla gorutiner är klara. Det tillhandahåller metoderna Add, Done och Wait. Antalet gorutiner läggs till med metoden Add, och Done-metoden meddelar att en gorutins arbete har avslutats. Och metoden Wait väntar tills arbetet för alla gorutiner är klart.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Ovanstående kod använder sync.WaitGroup för att låta 100 gorutiner samtidigt addera värden till variabeln c. I denna kod används sync.WaitGroup för att vänta tills alla gorutiner har avslutats, varefter värdet som adderats till variabeln c skrivs ut. Att använda enbart kanaler kan vara tillräckligt för att fork & join-hantera ett mindre antal operationer, men att använda sync.WaitGroup är ett bra alternativ för att hantera ett stort antal fork & join-operationer.

med slice

Om det används tillsammans med en slice kan waitgroup vara ett utmärkt verktyg för att hantera samtidiga operationer utan låsningar.

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	arr := [10]int{}
12
13	for i := 0; i < 10; i++ {
14		wg.Add(1)
15		go func(id int) {
16			defer wg.Done()
17
18			arr[id] = rand.Intn(100)
19		}(i)
20	}
21
22	wg.Wait()
23	fmt.Println("Done")
24
25    for i, v := range arr {
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

Ovanstående kod använder enbart waitgroup för att låta varje gorutin samtidigt generera 10 slumpmässiga heltal och lagra dem på det tilldelade indexet. I denna kod används waitgroup för att vänta tills alla gorutiner har avslutats, varefter Done skrivs ut. Genom att använda waitgroup på detta sätt kan flera gorutiner utföra operationer samtidigt, lagra data utan låsningar tills alla gorutiner har avslutats, och utföra efterbearbetning gemensamt efter att operationerna har avslutats.

golang.org/x/sync/errgroup.ErrGroup

errgroup är ett paket som utökar sync.WaitGroup. Till skillnad från sync.WaitGroup avbryter errgroup alla gorutiner och returnerar ett fel om ett fel uppstår i någon av gorutinernas operationer.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}

Ovanstående kod använder errgroup för att skapa 10 gorutiner och genererar ett fel i den femte gorutinen. Avsiktligt genererades ett fel i den femte gorutinen för att visa ett fall där ett fel uppstår. Men i praktiken kan errgroup användas för att skapa gorutiner och utföra olika efterbearbetningar för fall där fel uppstår i varje gorutin.

once

Detta är ett verktyg för att exekvera kod som endast bör köras en gång. Relevant kod kan köras via följande konstruktörer.

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc gör det möjligt att enkelt säkerställa att den aktuella funktionen bara kan köras en gång i sin helhet.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}

Ovanstående kod använder sync.OnceFunc för att skriva ut Hello, World!. I denna kod används sync.OnceFunc för att skapa funktionen once, och även om once-funktionen anropas flera gånger skrivs Hello, World! endast ut en gång.

OnceValue

OnceValue ser inte bara till att den aktuella funktionen bara körs en gång i sin helhet, utan sparar också returvärdet från den aktuella funktionen och returnerar det sparade värdet när det anropas igen.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}

Ovanstående kod använder sync.OnceValue för att öka variabeln c med 1. I denna kod används sync.OnceValue för att skapa funktionen once, och även om once-funktionen anropas flera gånger returneras endast 1, vilket är värdet av c som endast har ökat en gång.

OnceValues

OnceValues fungerar på samma sätt som OnceValue, men kan returnera flera värden.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}

Ovanstående kod använder sync.OnceValues för att öka variabeln c med 1. I denna kod används sync.OnceValues för att skapa funktionen once, och även om once-funktionen anropas flera gånger returneras endast 1, vilket är värdet av c som endast har ökat en gång.

atomic

atomic-paketet tillhandahåller atomära operationer. atomic-paketet tillhandahåller metoder som Add, CompareAndSwap, Load, Store, Swap, men på senare tid rekommenderas att använda typer som Int64, Uint64, Pointer med mera.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Detta är samma exempel som användes tidigare. Det här är kod som atomärt ökar variabeln c med typen atomic.Int64. Med metoderna Add och Load kan variabeln atomärt ökas och avläsas. Dessutom kan värden lagras med metoden Store, värden kan bytas med metoden Swap och värden kan jämföras och bytas ut med metoden CompareAndSwap om de matchar.

cond

sync.Cond

cond-paketet tillhandahåller villkorsvariabler. cond-paketet kan skapas med sync.Cond och tillhandahåller metoderna Wait, Signal och Broadcast.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}

Ovanstående kod använder sync.Cond för att vänta tills variabeln ready blir true. I denna kod används sync.Cond för att vänta tills variabeln ready blir true, varefter Ready! skrivs ut. Genom att använda sync.Cond på detta sätt kan flera gorutiner samtidigt vänta tills ett visst villkor är uppfyllt.

Detta kan användas för att implementera en enkel queue.

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

Genom att använda sync.Cond på detta sätt kan man effektivt vänta och återuppta driften när villkoret är uppfyllt, istället för att använda en spin-lock som använder mycket CPU.

semaphore

golang.org/x/sync/semaphore.Semaphore

semaphore-paketet tillhandahåller semaforer. semaphore-paketet kan skapas med golang.org/x/sync/semaphore.Semaphore och tillhandahåller metoderna Acquire, Release och TryAcquire.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}

Ovanstående kod använder semaphore för att skapa en semafor, och med semaforen används metoden Acquire för att erhålla semaforen och metoden Release för att frigöra semaforen. I den här koden har vi visat hur semaphore används för att erhålla och frigöra en semafor.

Avslutningsvis

Detta borde räcka för grunderna. Jag hoppas att du med utgångspunkt i innehållet i den här artikeln förstår hur du använder gorutiner för att hantera samtidighet och faktiskt kan använda dem. Jag hoppas att den här artikeln har varit till hjälp för dig. Tack.