Go Стартов пакет за едновременно програмиране
Общ преглед
Кратко въведение
Езикът Go разполага с множество инструменти за управление на едновременността. В тази статия ще ви представим някои от тях, както и трикове за тяхното използване.
Goroutine?
goroutine е нов тип модел на едновременност, поддържан от езика Go. Обикновено, за да изпълнява множество задачи едновременно, програмата получава OS thread от операционната система и изпълнява задачите паралелно, колкото са ядрата. И за да се извърши едновременност в по-малък мащаб, в userland се създават green threads, позволяващи множество green threads да се изпълняват в рамките на един OS thread. В случая на goroutine, този тип green thread е направен още по-малък и ефективен. Тези goroutine използват по-малко памет от нишките и могат да бъдат създадени и заменени по-бързо от нишките.
За да използвате goroutine, просто използвайте ключовата дума go
. Това позволява интуитивно изпълнение на синхронен код като асинхронен код в процеса на писане на програмата.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch)
12 time.Sleep(1 * time.Second)
13 fmt.Println("Hello, World!")
14 }()
15
16 fmt.Println("Waiting for goroutine...")
17 for range ch {}
18}
Този код просто променя синхронния код, който отпечатва Hello, World!
след 1 секунда пауза, в асинхронен поток. Този пример е прост, но ако промените малко по-сложен код от синхронен в асинхронен, четивността, видимостта и разбирането на кода стават по-добри отколкото при методите async await или promise.
Въпреки това, в много случаи, ако не е разбран потокът на просто асинхронно извикване на този синхронен код и потокът като fork & join
(поток, подобен на разделяй и владей), може да се създаде лош goroutine код. В тази статия ще бъдат представени няколко метода и техники, за да се подготвите за такива случаи.
Управление на едновременността
context
Може да е изненадващо, че context
се появява като първа техника за управление. Въпреки това, в езика Go, context
играе отлична роля в управлението на цялото дърво на задачите, отвъд простата функция за отмяна. За тези, които не знаят, ще обясним накратко този пакет.
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background())
5 defer cancel()
6
7 go func() {
8 <-ctx.Done()
9 fmt.Println("Context is done!")
10 }()
11
12 time.Sleep(1 * time.Second)
13
14 cancel()
15
16 time.Sleep(1 * time.Second)
17}
Горният код използва context
, за да отпечата Context is done!
след 1 секунда. context
може да провери дали е отменен чрез метода Done()
, и предоставя различни методи за отмяна чрез методи като WithCancel
, WithTimeout
, WithDeadline
, WithValue
.
Нека създадем прост пример. Да предположим, че пишете код, който използва aggregator
pattern, за да извлече user
, post
и comment
, за да получите някакви данни. И ако всички заявки трябва да бъдат направени в рамките на 2 секунди, можете да го напишете по следния начин.
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
5 defer cancel()
6
7 ch := make(chan struct{})
8 go func() {
9 defer close(ch)
10 user := getUser(ctx)
11 post := getPost(ctx)
12 comment := getComment(ctx)
13
14 fmt.Println(user, post, comment)
15 }()
16
17 select {
18 case <-ctx.Done():
19 fmt.Println("Timeout!")
20 case <-ch:
21 fmt.Println("All data is fetched!")
22 }
23}
Горният код отпечатва Timeout!
, ако не успее да извлече всички данни в рамките на 2 секунди, и отпечатва All data is fetched!
, ако извлече всички данни. Използвайки context
по този начин, можете лесно да управлявате отмяната и изчакването в код, където работят множество goroutine.
Различни context-свързани функции и методи могат да бъдат намерени в godoc context. Надяваме се, че ще научите простите неща и ще ги използвате удобно.
channel
unbuffered channel
channel
е инструмент за комуникация между goroutine. channel
може да бъде създаден с make(chan T)
. В този случай, T
е типът данни, които този channel
ще предава. channel
може да изпраща и получава данни с <-
, и може да бъде затворен с close
.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Горният код отпечатва 1 и 2, използвайки channel
. Този код просто показва изпращане и получаване на стойности в channel
. Въпреки това, channel
предлага повече функции от това. Първо, нека научим за buffered channel
и unbuffered channel
. За начало, примерът, написан по-горе, е unbuffered channel
, където изпращането на данни към канала и получаването на данни трябва да се извършват едновременно. Ако тези действия не се извършват едновременно, може да възникне deadlock.
buffered channel
Ами ако горният код не е просто отпечатване, а два процеса, които изпълняват тежки задачи? Ако вторият процес чете и изпълнява процеса и отнеме дълго време, първият процес също ще спре за това време. Можем да използваме buffered channel
, за да предотвратим тази ситуация.
1package main
2
3func main() {
4 ch := make(chan int, 2)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Горният код отпечатва 1 и 2, използвайки buffered channel
. В този код, чрез използване на buffered channel
, направихме така, че изпращането на данни към channel
и получаването на данни да не трябва да се извършват едновременно. Чрез поставяне на буфер в канала, има допълнително пространство за тази дължина, което може да предотврати забавянето на задачите, причинено от влиянието на последващи задачи.
select
Когато се занимавате с множество канали, можете лесно да реализирате структура fan-in
, използвайки синтаксиса select
.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10)
10 ch2 := make(chan int, 10)
11 ch3 := make(chan int, 10)
12
13 go func() {
14 for {
15 ch1 <- 1
16 time.Sleep(1 * time.Second)
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2
22 time.Sleep(2 * time.Second)
23 }
24 }()
25 go func() {
26 for {
27 ch3 <- 3
28 time.Sleep(3 * time.Second)
29 }
30 }()
31
32 for i := 0; i < 3; i++ {
33 select {
34 case v := <-ch1:
35 fmt.Println(v)
36 case v := <-ch2:
37 fmt.Println(v)
38 case v := <-ch3:
39 fmt.Println(v)
40 }
41 }
42}
Горният код създава 3 канала, които периодично предават 1, 2 и 3, и използва select
, за да получи стойности от каналите и да ги отпечата. Използвайки select
по този начин, можете да получавате данни от множество канали едновременно и да ги обработвате, веднага щом получите стойностите от каналите.
for range
channel
може лесно да получава данни, използвайки for range
. Когато използвате for range
в канал, той се изпълнява всеки път, когато се добавят данни към канала, и спира цикъла, когато каналът е затворен.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Горният код отпечатва 1 и 2, използвайки channel
. В този код използваме for range
, за да получаваме и отпечатваме данни всеки път, когато се добавят данни към канала. И спира цикъла, когато каналът е затворен.
Както писахме няколко пъти по-горе, този синтаксис може да се използва и като просто средство за синхронизация.
1package main
2
3func main() {
4 ch := make(chan struct{})
5 go func() {
6 defer close(ch)
7 time.Sleep(1 * time.Second)
8 fmt.Println("Hello, World!")
9 }()
10
11 fmt.Println("Waiting for goroutine...")
12 for range ch {}
13}
Горният код отпечатва Hello, World!
след 1 секунда пауза. В този код използвахме channel
, за да променим синхронния код в асинхронен код. Използвайки channel
по този начин, можете лесно да промените синхронния код в асинхронен код и да зададете точката join
.
etc
- Ако изпратите или получите данни в nil channel, може да попаднете в безкраен цикъл и да възникне deadlock.
- Ако изпратите данни, след като каналът е затворен, възниква panic.
- Не е необходимо да затваряте канала, GC затваря канала, докато го събира.
mutex
spinlock
spinlock
е метод за синхронизация, който се опитва да заключи непрекъснато, докато върти цикъл. В езика Go можете лесно да приложите spinlock, като използвате указатели.
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14 runtime.Gosched()
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{}
24}
Горният код е код, който реализира пакета spinlock
. Този код използва пакета sync/atomic
, за да реализира SpinLock
. Методът Lock
използва atomic.CompareAndSwapUintptr
, за да опита заключване, а методът Unlock
използва atomic.StoreUintptr
, за да освободи заключването. Този метод се опитва да заключи, без да си почива, така че продължава да използва CPU, докато получи заключването, и може да попадне в безкраен цикъл. Ето защо е добра идея да използвате spinlock
за проста синхронизация или когато го използвате само за кратко време.
sync.Mutex
mutex
е инструмент за синхронизация между goroutine. mutex
, предоставен от пакета sync
, предоставя методи като Lock
, Unlock
, RLock
, RUnlock
. mutex
може да бъде създаден с sync.Mutex
, а sync.RWMutex
може да се използва за четене/записване на заключвания.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex
9 var count int
10
11 go func() {
12 mu.Lock()
13 count++
14 mu.Unlock()
15 }()
16
17 mu.Lock()
18 count++
19 mu.Unlock()
20
21 println(count)
22}
В горния код, почти едновременно, два goroutine имат достъп до една и съща променлива count
. В този случай, ако създадете секция, критична за кода, който има достъп до променливата count
, като използвате mutex
, можете да предотвратите едновременния достъп до променливата count
. Тогава този код ще отпечата 2
еднакво, независимо колко пъти се изпълнява.
sync.RWMutex
sync.RWMutex
е mutex
, който може да се използва, като се разграничават заключванията за четене и заключванията за писане. Можете да заключите и освободите заключването за четене, като използвате методите RLock
и RUnlock
.
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex
9 data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock()
14 defer m.RUnlock()
15
16 value, ok := m.data[key]
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock()
22 defer m.Unlock()
23
24 m.data[key] = value
25}
Горният код е код, който реализира ConcurrentMap
, използвайки sync.RWMutex
. В този код, методът Get
заключва заключването за четене, а методът Set
заключва заключването за писане, за да получи достъп и да модифицира безопасно картата data
. Причината, поради която е необходимо заключване за четене, е, че в много случаи на прости операции за четене, множество goroutine могат да извършват операции за четене едновременно, като просто заключват заключването за четене, без да заключват заключването за писане. Чрез това можете да подобрите производителността, като заключите само заключването за четене, когато няма промяна в състоянието и не е необходимо да заключвате заключването за писане.
fakelock
fakelock
е прост трик за реализиране на sync.Locker
. Тази структура предоставя същите методи като sync.Mutex
, но всъщност не прави нищо.
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}
Горният код е код, който реализира пакета fakelock
. Този пакет реализира sync.Locker
, за да предостави методите Lock
и Unlock
, но всъщност не прави нищо. Ще опишем защо е необходим такъв код, когато имаме възможност.
waitgroup
sync.WaitGroup
sync.WaitGroup
е инструмент за изчакване, докато всички задачи на goroutine приключат. Той предоставя методите Add
, Done
, Wait
, добавя броя на goroutine с метода Add
, и уведомява, че задачата на goroutine е приключила с метода Done
. И изчаква всички задачи на goroutine да приключат с метода Wait
.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Горният код е код, който използва sync.WaitGroup
, за да позволи на 100 горутини едновременно да добавят стойности към променливата c
. В този код, sync.WaitGroup
се използва, за да се изчака всички горутини да завършат, преди да се отпечата стойността, добавена към променливата c
. В случаите, когато трябва да се изпълнят fork & join
няколко задачи, е достатъчно да се използват само канали, но в случаите, когато трябва да се изпълнят fork & join
голям брой задачи, използването на sync.WaitGroup
също е добър избор.
with slice
Ако се използва с парче (slice), waitgroup
може да бъде чудесен инструмент за управление на едновременно изпълнявани задачи без заключване.
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup
11 arr := [10]int{}
12
13 for i := 0; i < 10; i++ {
14 wg.Add(1)
15 go func(id int) {
16 defer wg.Done()
17
18 arr[id] = rand.Intn(100)
19 }(i)
20 }
21
22 wg.Wait()
23 fmt.Println("Done")
24
25 for i, v := range arr {
26 fmt.Printf("arr[%d] = %d\n", i, v)
27 }
28}
Горният код е код, който използва само waitgroup
, за да позволи на всяка горутина едновременно да генерира 10 случайни цели числа и да ги съхрани в присвоения индекс. В този код, waitgroup
се използва, за да се изчака всички горутини да завършат, преди да се отпечата Done
. Използвайки waitgroup
по този начин, е възможно множество горутини да изпълняват задачи едновременно, да съхраняват данни без заключване, докато всички горутини не завършат, и да извършват последваща обработка наведнъж след приключване на задачите.
golang.org/x/sync/errgroup.ErrGroup
errgroup
е пакет, който разширява sync.WaitGroup
. За разлика от sync.WaitGroup
, ако възникне грешка в някоя от задачите на горутините, errgroup
отменя всички горутини и връща грешка.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background())
11 _ = ctx
12
13 for i := 0; i < 10; i++ {
14 i := i
15 g.Go(func() error {
16 if i == 5 {
17 return fmt.Errorf("error")
18 }
19 return nil
20 })
21 }
22
23 if err := g.Wait(); err != nil {
24 fmt.Println(err)
25 }
26}
Горният код е код, който използва errgroup
, за да създаде 10 горутини и да предизвика грешка в петата горутина. Умишлено предизвикахме грешка в петата горутина, за да покажем случая, когато възниква грешка. Въпреки това, когато се използва реално, е необходимо да се използва errgroup
за създаване на горутини и да се извърши различна последваща обработка за случаите, когато възникне грешка във всяка горутина.
once
Инструмент за изпълнение на код, който трябва да бъде изпълнен само веднъж. Можете да изпълните съответния код чрез конструктора по-долу.
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFunc
просто позволява на съответната функция да бъде изпълнена само веднъж в цялост.
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() {
7 println("Hello, World!")
8 })
9
10 once()
11 once()
12 once()
13 once()
14 once()
15}
Горният код е код, който използва sync.OnceFunc
, за да отпечата Hello, World!
. В този код, sync.OnceFunc
се използва, за да се създаде функцията once
и дори ако функцията once
бъде извикана няколко пъти, Hello, World!
се отпечатва само веднъж.
OnceValue
OnceValue
не само позволява на съответната функция да бъде изпълнена само веднъж в цялост, но също така запазва върнатата стойност на съответната функция и връща запазената стойност, когато бъде извикана отново.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValue(func() int {
8 c += 1
9 return c
10 })
11
12 println(once())
13 println(once())
14 println(once())
15 println(once())
16 println(once())
17}
Горният код е код, който използва sync.OnceValue
, за да увеличи променливата c
с 1. В този код, sync.OnceValue
се използва, за да се създаде функцията once
и дори ако функцията once
бъде извикана няколко пъти, се връща 1, като променливата c
се увеличава само веднъж.
OnceValues
OnceValues
работи по същия начин като OnceValue
, но може да връща множество стойности.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValues(func() (int, int) {
8 c += 1
9 return c, c
10 })
11
12 a, b := once()
13 println(a, b)
14 a, b = once()
15 println(a, b)
16 a, b = once()
17 println(a, b)
18 a, b = once()
19 println(a, b)
20 a, b = once()
21 println(a, b)
22}
Горният код е код, който използва sync.OnceValues
, за да увеличи променливата c
с 1. В този код, sync.OnceValues
се използва, за да се създаде функцията once
и дори ако функцията once
бъде извикана няколко пъти, се връща 1, като променливата c
се увеличава само веднъж.
atomic
atomic
пакетът е пакет, който предоставя атомарни операции. atomic
пакетът предоставя методи като Add
, CompareAndSwap
, Load
, Store
, Swap
, но напоследък се препоръчва използването на типове като Int64
, Uint64
, Pointer
.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Това е примерът, който беше използван преди. Това е код, който използва типа atomic.Int64
, за да увеличи атомарно променливата c
. Можете атомарно да увеличите променливата и да прочетете променливата с методите Add
и Load
. Също така, можете да запазите стойност с метода Store
, да замените стойност с метода Swap
и да сравните стойност и след това да я замените, ако е подходяща с метода CompareAndSwap
.
cond
sync.Cond
cond
пакетът е пакет, който предоставя условни променливи. cond
пакетът може да бъде създаден със sync.Cond
и предоставя методи Wait
, Signal
, Broadcast
.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{})
9 ready := false
10
11 go func() {
12 c.L.Lock()
13 ready = true
14 c.Signal()
15 c.L.Unlock()
16 }()
17
18 c.L.Lock()
19 for !ready {
20 c.Wait()
21 }
22 c.L.Unlock()
23
24 println("Ready!")
25}
Горният код е код, който използва sync.Cond
, за да изчака променливата ready
да стане true
. В този код, sync.Cond
се използва, за да се изчака променливата ready
да стане true
, преди да се отпечата Ready!
. Използвайки sync.Cond
по този начин, е възможно множество горутини да чакат едновременно, докато не бъде изпълнено определено условие.
Използвайки това, можете да реализирате проста queue
.
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex
15 Cond *sync.Cond
16 Head *Node[T]
17 Tail *Node[T]
18 Len int
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{}
23 q.Cond = sync.NewCond(&q.Mutex)
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock()
29 defer q.Unlock()
30
31 node := &Node[T]{Value: value}
32 if q.Len == 0 {
33 q.Head = node
34 q.Tail = node
35 } else {
36 q.Tail.Next = node
37 q.Tail = node
38 }
39 q.Len++
40 q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock()
45 defer q.Unlock()
46
47 for q.Len == 0 {
48 q.Cond.Wait()
49 }
50
51 node := q.Head
52 q.Head = q.Head.Next
53 q.Len--
54 return node.Value
55}
Използвайки sync.Cond
по този начин, можете ефективно да изчаквате и да действате отново, когато условието е изпълнено, вместо да използвате много CPU използване със spin-lock
.
semaphore
golang.org/x/sync/semaphore.Semaphore
semaphore
пакетът е пакет, който предоставя семафори. semaphore
пакетът може да бъде създаден с golang.org/x/sync/semaphore.Semaphore
и предоставя методи Acquire
, Release
, TryAcquire
.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1)
11
12 if s.TryAcquire(1) {
13 fmt.Println("Acquired!")
14 } else {
15 fmt.Println("Not Acquired!")
16 }
17
18 s.Release(1)
19}
Горният код е код, който използва semaphore
, за да създаде семафор, и използва семафора, за да получи семафора с метода Acquire
и да освободи семафора с метода Release
. В този код показах как да получите и освободите семафор с помощта на semaphore
.
В заключение
Изглежда, че това е всичко, от което се нуждаете за основните неща. Надявам се, че въз основа на съдържанието на тази статия, вие разбирате как да управлявате конкурентността с помощта на горутини и можете да го използвате на практика. Надявам се тази статия да ви е била полезна. Благодаря ви.