Go konkurens programozási kezdőcsomag
Áttekintés
Rövid bemutatás
A Go nyelv számos eszközt kínál a konkurencia kezelésére. Ebben a cikkben bemutatunk néhányat ezek közül, valamint tippeket és trükköket.
Goroutine?
A goroutine a Go nyelv által támogatott új típusú konkurencia modell. Általában egy program az OS-től OS szálakat kap, hogy egyszerre több feladatot hajtson végre, és a magok számával párhuzamosan dolgozik. A kisebb egységű konkurencia megvalósításához a felhasználói térben zöld szálakat hozunk létre, amelyek egyetlen OS szálon belül futnak, és feladatokat hajtanak végre. A goroutine-ok azonban hatékonyabban és kisebb méretben valósítják meg ezt a zöld szál típust. Ezek a goroutine-ok kevesebb memóriát használnak, és gyorsabban hozhatók létre és cserélhetők, mint a szálak.
Goroutine használatához egyszerűen csak a go kulcsszót kell használni. Ez lehetővé teszi a szinkron kód aszinkron futtatását intuitív módon a program írása során.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch)
12 time.Sleep(1 * time.Second)
13 fmt.Println("Hello, World!")
14 }()
15
16 fmt.Println("Waiting for goroutine...")
17 for range ch {}
18}
Ez a kód egyszerűen átalakítja a szinkron kódot aszinkron folyamattá, amely 1 másodperc szünet után kiírja a Hello, World! szöveget. Bár ez az egyszerű példa, ha egy kicsit bonyolultabb kódot alakítunk át szinkronról aszinkronra, a kód olvashatósága, láthatósága és érthetősége jobb lesz, mint a hagyományos async await vagy promise alapú megközelítésekkel.
Azonban sok esetben, ha nem értjük a szinkron kód aszinkron hívásának folyamatát és az olyan fork & join típusú folyamatokat (mint például a divide and conquer), rossz goroutine kód születhet. Ez a cikk bemutat néhány módszert és technikát, amelyek segíthetnek felkészülni ezekre az esetekre.
Konkurencia kezelés
context
Meglepő lehet, hogy az első kezelési technika a context. A Go nyelvben azonban a context túlmutat az egyszerű megszakítási funkción, és kiválóan alkalmas a teljes feladatfa kezelésére. Azok számára, akik nem ismerik, röviden elmagyarázom ezt a csomagot.
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background())
5 defer cancel()
6
7 go func() {
8 <-ctx.Done()
9 fmt.Println("Context is done!")
10 }()
11
12 time.Sleep(1 * time.Second)
13
14 cancel()
15
16 time.Sleep(1 * time.Second)
17}
A fenti kód a context segítségével 1 másodperc múlva kiírja a Context is done! szöveget. A context a Done() metóduson keresztül ellenőrizheti a megszakítás állapotát, és olyan metódusokkal, mint a WithCancel, WithTimeout, WithDeadline, WithValue, különféle megszakítási módszereket kínál.
Készítsünk egy egyszerű példát. Tegyük fel, hogy egy aggregator mintát használva ír kódot, hogy adatokat (user, post, comment) hozzon le, és minden kérésnek 2 másodpercen belül teljesülnie kell. Ezt a következőképpen írhatja meg:
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
5 defer cancel()
6
7 ch := make(chan struct{})
8 go func() {
9 defer close(ch)
10 user := getUser(ctx)
11 post := getPost(ctx)
12 comment := getComment(ctx)
13
14 fmt.Println(user, post, comment)
15 }()
16
17 select {
18 case <-ctx.Done():
19 fmt.Println("Timeout!")
20 case <-ch:
21 fmt.Println("All data is fetched!")
22 }
23}
A fenti kód 2 másodpercen belül kiírja a Timeout! szöveget, ha nem tudja lekérni az összes adatot, és All data is fetched! szöveget, ha az összes adatot lekérte. Ezzel a context használatával könnyedén kezelhető a megszakítás és az időtúllépés több goroutine-t futtató kódban is.
A context számos kapcsolódó funkciója és metódusa a godoc context oldalon érhető el. Reméljük, hogy az alapvetőeket elsajátítva kényelmesen tudja majd használni.
channel
unbuffered channel
A channel egy eszköz a goroutine-ok közötti kommunikációra. Egy channel a make(chan T) paranccsal hozható létre. Ekkor a T a channel által továbbítandó adat típusa. A channel a <- operátorral tud adatokat küldeni és fogadni, és a close paranccsal zárható be.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
A fenti kód a channel segítségével kiírja az 1-et és a 2-t. Ez a kód egyszerűen csak azt mutatja be, hogyan lehet értékeket küldeni és fogadni egy channel-en keresztül. Azonban a channel ennél sokkal több funkciót kínál. Először is nézzük meg a buffered channel-t és az unbuffered channel-t. Mielőtt elkezdenénk, a fenti példa egy unbuffered channel, ahol az adatok küldése és fogadása a csatornán keresztül egyszerre kell, hogy megtörténjen. Ha ez nem történik meg egyszerre, deadlock alakulhat ki.
buffered channel
Mi van akkor, ha a fenti kód nem csak egyszerű kimeneti művelet, hanem két olyan folyamat, amely nehéz feladatokat végez? Ha a második folyamat olvasása és feldolgozása hosszú ideig elakad, az első folyamat is leáll arra az időre. Ennek elkerülése érdekében használhatunk buffered channel-t.
1package main
2
3func main() {
4 ch := make(chan int, 2)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
A fenti kód egy buffered channel használatával írja ki az 1 és 2 értékeket. Ebben a kódban a buffered channel segítségével a channel-re történő adatküldés és az adatok fogadása nem kell, hogy egyszerre történjen. Ha egy csatornának van pufferje, akkor ez a pufferhossznyi hely biztosítja, hogy a későbbi feladatok ne okozzanak munkafolyamat késedelmet.
select
Több csatorna kezelésekor a select utasítás segítségével könnyedén implementálható a fan-in struktúra.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10)
10 ch2 := make(chan int, 10)
11 ch3 := make(chan int, 10)
12
13 go func() {
14 for {
15 ch1 <- 1
16 time.Sleep(1 * time.Second)
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2
22 time.Sleep(2 * time.Second)
23 }
24 }()
25 go func() {
26 for {
27 ch3 <- 3
28 time.Sleep(3 * time.Second)
29 }
30 }()
31
32 for i := 0; i < 3; i++ {
33 select {
34 case v := <-ch1:
35 fmt.Println(v)
36 case v := <-ch2:
37 fmt.Println(v)
38 case v := <-ch3:
39 fmt.Println(v)
40 }
41 }
42}
A fenti kód három csatornát hoz létre, amelyek rendszeresen 1, 2, 3 értékeket küldenek, és select segítségével fogadja és kiírja az értékeket a csatornákról. Ezzel a select használatával egyszerre fogadhat adatokat több csatornáról, és azonnal feldolgozhatja azokat, amint megérkeznek.
for range
A channel könnyedén fogadhat adatokat a for range segítségével. Ha a for range -et egy csatornán használjuk, akkor az minden alkalommal működésbe lép, amikor adat kerül a csatornára, és a csatorna bezárásakor a ciklus leáll.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
A fenti kód egy channel segítségével írja ki az 1-et és a 2-t. Ez a kód a for range segítségével minden alkalommal kiírja az adatot, amikor adat kerül a csatornára. És a ciklus leáll, amikor a csatorna bezáródik.
Mint már többször is említettük, ez a szintaxis egyszerű szinkronizálási eszközként is használható.
1package main
2
3func main() {
4 ch := make(chan struct{})
5 go func() {
6 defer close(ch)
7 time.Sleep(1 * time.Second)
8 fmt.Println("Hello, World!")
9 }()
10
11 fmt.Println("Waiting for goroutine...")
12 for range ch {}
13}
A fenti kód 1 másodperc szünet után kiírja a Hello, World! szöveget. Ez a kód channel segítségével alakította át a szinkron kódot aszinkron kóddá. Ezzel a channel használatával könnyedén alakíthatjuk át a szinkron kódot aszinkron kóddá, és beállíthatunk join pontokat.
stb
- Ha nil channelre küldünk vagy onnan fogadunk adatot, végtelen ciklusba kerülhetünk, ami deadlockot okozhat.
- Ha egy channel bezárása után adatot küldünk, panic következik be.
- Még ha nem is zárjuk be a channelt, a GC bezárja azt, amikor begyűjti.
mutex
spinlock
A spinlock egy szinkronizációs módszer, amely ismétlődő ciklusban próbál zárat szerezni. Go nyelven egyszerűen implementálható a spinlock pointerek segítségével.
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14 runtime.Gosched()
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{}
24}
A fenti kód implementálja a spinlock csomagot. Ez a kód a sync/atomic csomagot használja a SpinLock implementálásához. A Lock metódus az atomic.CompareAndSwapUintptr segítségével próbál zárat szerezni, az Unlock metódus pedig az atomic.StoreUintptr segítségével oldja fel a zárat. Ez a módszer folyamatosan próbál zárat szerezni, így a CPU-t folyamatosan használja, amíg zárat nem szerez, ami végtelen ciklust okozhat. Ezért a spinlock-ot egyszerű szinkronizálásra vagy rövid ideig tartó használatra ajánlott.
sync.Mutex
A mutex egy eszköz a goroutine-ok közötti szinkronizációra. A sync csomag által biztosított mutex olyan metódusokat kínál, mint a Lock, Unlock, RLock, RUnlock. A mutex a sync.Mutex paranccsal hozható létre, és sync.RWMutex is használható olvasási/írási zárakhoz.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex
9 var count int
10
11 go func() {
12 mu.Lock()
13 count++
14 mu.Unlock()
15 }()
16
17 mu.Lock()
18 count++
19 mu.Unlock()
20
21 println(count)
22}
A fenti kódban két goroutine szinte egyszerre fér hozzá ugyanahhoz a count változóhoz. Ha mutex-et használunk a count változóhoz hozzáférő kód kritikus szakaszának létrehozására, megakadályozhatjuk a count változóhoz való egyidejű hozzáférést. Ekkor ez a kód minden futtatáskor ugyanazt a 2 értéket adja ki.
sync.RWMutex
A sync.RWMutex egy mutex, amely képes megkülönböztetni az olvasási és írási zárakat. Az RLock, RUnlock metódusok segítségével lehet olvasási zárat beállítani és feloldani.
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex
9 data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock()
14 defer m.RUnlock()
15
16 value, ok := m.data[key]
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock()
22 defer m.Unlock()
23
24 m.data[key] = value
25}
A fenti kód a sync.RWMutex segítségével implementálja a ConcurrentMap struktúrát. Ebben a kódban a Get metódus olvasási zárat alkalmaz, a Set metódus pedig írási zárat, így biztonságosan hozzáférhet és módosíthatja a data térképet. Az olvasási zárakra azért van szükség, mert sok olvasási művelet esetén, írási zárak alkalmazása nélkül, több goroutine is végezhet olvasási műveleteket egyszerre. Ezáltal javítható a teljesítmény, ha nincs szükség az állapot módosítására, és elegendő csak olvasási zárat alkalmazni.
fakelock
A fakelock egy egyszerű trükk, amely implementálja a sync.Locker interfészt. Ez a struktúra ugyanazokat a metódusokat kínálja, mint a sync.Mutex, de valójában nem végez semmilyen műveletet.
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}
A fenti kód implementálja a fakelock csomagot. Ez a csomag a sync.Locker interfészt implementálja, Lock és Unlock metódusokat biztosítva, de valójában nem végez semmilyen műveletet. Ha alkalom adódik, elmagyarázom, miért van szükség ilyen kódra.
waitgroup
sync.WaitGroup
A sync.WaitGroup egy eszköz, amely arra vár, hogy a goroutine-ok összes feladata befejeződjön. Add, Done és Wait metódusokat biztosít. Az Add metódussal hozzáadja a goroutine-ok számát, a Done metódussal jelzi, hogy a goroutine feladata befejeződött. A Wait metódus pedig megvárja, amíg az összes goroutine feladata befejeződik.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
A fenti kód a sync.WaitGroup segítségével 100 goroutine-t használ, amelyek egyidejűleg adnak hozzá értékeket a c változóhoz. Ez a kód a sync.WaitGroup segítségével megvárja, amíg az összes goroutine befejeződik, majd kiírja a c változóhoz hozzáadott értéket. Bár néhány feladat fork & join esetében elegendő a channel használata, nagyszámú feladat fork & join esetén a sync.WaitGroup használata is jó választás.
with slice
Szelettel együtt használva a waitgroup kiváló eszköze lehet a párhuzamos végrehajtási feladatok kezelésének zár nélkül.
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup
11 arr := [10]int{}
12
13 for i := 0; i < 10; i++ {
14 wg.Add(1)
15 go func(id int) {
16 defer wg.Done()
17
18 arr[id] = rand.Intn(100)
19 }(i)
20 }
21
22 wg.Wait()
23 fmt.Println("Done")
24
25 for i, v := range arr {
26 fmt.Printf("arr[%d] = %d\n", i, v)
27 }
28}
A fenti kód csak waitgroup segítségével hoz létre 10 véletlenszerű egész számot egyidejűleg minden goroutine-ban, és tárolja azokat a hozzárendelt indexen. Ez a kód waitgroup segítségével megvárja, amíg az összes goroutine befejeződik, majd kiírja a "Done" szöveget. Ezzel a waitgroup használatával több goroutine is végezhet feladatokat egyidejűleg, és az összes goroutine befejezése után zár nélkül tárolhatja az adatokat, és a feladat befejezése után egyszerre végezheti el az utófeldolgozást.
golang.org/x/sync/errgroup.ErrGroup
Az errgroup a sync.WaitGroup kiterjesztett csomagja. Az errgroup a sync.WaitGroup-tól eltérően, ha bármelyik goroutine feladatában hiba történik, megszakítja az összes goroutine-t és visszaadja a hibát.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background())
11 _ = ctx
12
13 for i := 0; i < 10; i++ {
14 i := i
15 g.Go(func() error {
16 if i == 5 {
17 return fmt.Errorf("error")
18 }
19 return nil
20 })
21 }
22
23 if err := g.Wait(); err != nil {
24 fmt.Println(err)
25 }
26}
A fenti kód errgroup segítségével 10 goroutine-t hoz létre, és az 5. goroutine-ban hibát generál. Szándékosan generáltam hibát az ötödik goroutine-ban, hogy bemutassam a hibás esetet. Azonban a gyakorlatban az errgroup segítségével hozhatunk létre goroutine-okat, és különféle utófeldolgozást végezhetünk, ha hiba történik az egyes goroutine-okban.
once
Ez egy olyan eszköz, amely egyszer futtatja a kódot. A kapcsolódó kód a következő konstruktoron keresztül futtatható.
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
A OnceFunc egyszerűen biztosítja, hogy a függvény csak egyszer fusson le a teljes életciklus során.
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() {
7 println("Hello, World!")
8 })
9
10 once()
11 once()
12 once()
13 once()
14 once()
15}
A fenti kód a sync.OnceFunc segítségével kiírja a Hello, World! szöveget. Ebben a kódban a sync.OnceFunc segítségével létrehozzuk az once függvényt, és az once függvény többszöri hívása esetén is csak egyszer írja ki a Hello, World! szöveget.
OnceValue
A OnceValue nem csupán azt biztosítja, hogy a függvény csak egyszer fusson le a teljes életciklus során, hanem elmenti a függvény visszatérési értékét, és a későbbi hívások során a tárolt értéket adja vissza.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValue(func() int {
8 c += 1
9 return c
10 })
11
12 println(once())
13 println(once())
14 println(once())
15 println(once())
16 println(once())
17}
A fenti kód a sync.OnceValue segítségével 1-gyel növeli a c változót. Ebben a kódban a sync.OnceValue segítségével létrehozzuk az once függvényt, és az once függvény többszöri hívása esetén is a c változó csak egyszer növelt 1-es értékét adja vissza.
OnceValues
A OnceValues ugyanúgy működik, mint a OnceValue, de több értéket is visszaadhat.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValues(func() (int, int) {
8 c += 1
9 return c, c
10 })
11
12 a, b := once()
13 println(a, b)
14 a, b = once()
15 println(a, b)
16 a, b = once()
17 println(a, b)
18 a, b = once()
19 println(a, b)
20 a, b = once()
21 println(a, b)
22}
A fenti kód a sync.OnceValues segítségével 1-gyel növeli a c változót. Ebben a kódban a sync.OnceValues segítségével létrehozzuk az once függvényt, és az once függvény többszöri hívása esetén is a c változó csak egyszer növelt 1-es értékét adja vissza.
atomic
Az atomic csomag atomi műveleteket biztosító csomag. Az atomic csomag olyan metódusokat kínál, mint az Add, CompareAndSwap, Load, Store, Swap, de újabban az Int64, Uint64, Pointer típusok használata ajánlott.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Ez a korábban használt példa. Ez a kód az atomic.Int64 típust használja a c változó atomi növelésére. Az Add és Load metódusok segítségével atomi módon növelhető és olvasható a változó. Ezenkívül a Store metódus segítségével értékeket tárolhatunk, a Swap metódussal értékeket cserélhetünk, és a CompareAndSwap metódussal összehasonlíthatjuk az értékeket, majd megfelelő esetben kicserélhetjük azokat.
cond
sync.Cond
A cond csomag feltételes változókat biztosító csomag. A cond csomag sync.Cond paranccsal hozható létre, és Wait, Signal, Broadcast metódusokat kínál.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{})
9 ready := false
10
11 go func() {
12 c.L.Lock()
13 ready = true
14 c.Signal()
15 c.L.Unlock()
16 }()
17
18 c.L.Lock()
19 for !ready {
20 c.Wait()
21 }
22 c.L.Unlock()
23
24 println("Ready!")
25}
A fenti kód a sync.Cond segítségével várja, amíg a ready változó true lesz. Ebben a kódban a sync.Cond segítségével megvárja, amíg a ready változó true lesz, majd kiírja a Ready! szöveget. Ezzel a sync.Cond használatával több goroutine is megvárhatja, amíg bizonyos feltételek teljesülnek.
Ezt felhasználva egyszerű queue-t implementálhatunk.
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex
15 Cond *sync.Cond
16 Head *Node[T]
17 Tail *Node[T]
18 Len int
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{}
23 q.Cond = sync.NewCond(&q.Mutex)
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock()
29 defer q.Unlock()
30
31 node := &Node[T]{Value: value}
32 if q.Len == 0 {
33 q.Head = node
34 q.Tail = node
35 } else {
36 q.Tail.Next = node
37 q.Tail = node
38 }
39 q.Len++
40 q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock()
45 defer q.Unlock()
46
47 for q.Len == 0 {
48 q.Cond.Wait()
49 }
50
51 node := q.Head
52 q.Head = q.Head.Next
53 q.Len--
54 return node.Value
55}
A sync.Cond ilyen módon történő felhasználásával a spin-lock magas CPU-használata helyett hatékonyan lehet várakozni, és a feltétel teljesülésekor újra működésbe lépni.
semaphore
golang.org/x/sync/semaphore.Semaphore
A semaphore csomag szemaforokat biztosító csomag. A semaphore csomag golang.org/x/sync/semaphore.Semaphore paranccsal hozható létre, és Acquire, Release, TryAcquire metódusokat kínál.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1)
11
12 if s.TryAcquire(1) {
13 fmt.Println("Acquired!")
14 } else {
15 fmt.Println("Not Acquired!")
16 }
17
18 s.Release(1)
19}
A fenti kód semaphore segítségével szemaforokat hoz létre, és a szemaforokat használja az Acquire metódussal a szemafor megszerzésére, és a Release metódussal a szemafor feloldására. Ebben a kódban bemutattam, hogyan lehet szemaforokat szerezni és feloldani a semaphore segítségével.
Befejezés
Az alapvető tartalomnak ennyinek kell lennie. Remélem, hogy e cikk alapján megértik a goroutine-ok használatával történő konkurencia kezelésének módját, és képesek lesznek a gyakorlatban is alkalmazni. Remélem, hogy ez a cikk hasznos volt Önöknek. Köszönöm.