GoSuda

Go Стартов пакет за едновременно програмиране

By snowmerak
views ...

Общ преглед

Кратко въведение

Езикът Go разполага с множество инструменти за управление на едновременността. В тази статия ще ви представим някои от тях, както и трикове за тяхното използване.

Goroutine?

goroutine е нов тип модел на едновременност, поддържан от езика Go. Обикновено, за да изпълнява множество задачи едновременно, програмата получава OS thread от операционната система и изпълнява задачите паралелно, колкото са ядрата. И за да се извърши едновременност в по-малък мащаб, в userland се създават green threads, позволяващи множество green threads да се изпълняват в рамките на един OS thread. В случая на goroutine, този тип green thread е направен още по-малък и ефективен. Тези goroutine използват по-малко памет от нишките и могат да бъдат създадени и заменени по-бързо от нишките.

За да използвате goroutine, просто използвайте ключовата дума go. Това позволява интуитивно изпълнение на синхронен код като асинхронен код в процеса на писане на програмата.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("Hello, World!")
14    }()
15
16    fmt.Println("Waiting for goroutine...")
17    for range ch {}
18}

Този код просто променя синхронния код, който отпечатва Hello, World! след 1 секунда пауза, в асинхронен поток. Този пример е прост, но ако промените малко по-сложен код от синхронен в асинхронен, четивността, видимостта и разбирането на кода стават по-добри отколкото при методите async await или promise.

Въпреки това, в много случаи, ако не е разбран потокът на просто асинхронно извикване на този синхронен код и потокът като fork & join (поток, подобен на разделяй и владей), може да се създаде лош goroutine код. В тази статия ще бъдат представени няколко метода и техники, за да се подготвите за такива случаи.

Управление на едновременността

context

Може да е изненадващо, че context се появява като първа техника за управление. Въпреки това, в езика Go, context играе отлична роля в управлението на цялото дърво на задачите, отвъд простата функция за отмяна. За тези, които не знаят, ще обясним накратко този пакет.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("Context is done!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

Горният код използва context, за да отпечата Context is done! след 1 секунда. context може да провери дали е отменен чрез метода Done(), и предоставя различни методи за отмяна чрез методи като WithCancel, WithTimeout, WithDeadline, WithValue.

Нека създадем прост пример. Да предположим, че пишете код, който използва aggregator pattern, за да извлече user, post и comment, за да получите някакви данни. И ако всички заявки трябва да бъдат направени в рамките на 2 секунди, можете да го напишете по следния начин.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("Timeout!")
20    case <-ch:
21        fmt.Println("All data is fetched!")
22    }
23}

Горният код отпечатва Timeout!, ако не успее да извлече всички данни в рамките на 2 секунди, и отпечатва All data is fetched!, ако извлече всички данни. Използвайки context по този начин, можете лесно да управлявате отмяната и изчакването в код, където работят множество goroutine.

Различни context-свързани функции и методи могат да бъдат намерени в godoc context. Надяваме се, че ще научите простите неща и ще ги използвате удобно.

channel

unbuffered channel

channel е инструмент за комуникация между goroutine. channel може да бъде създаден с make(chan T). В този случай, T е типът данни, които този channel ще предава. channel може да изпраща и получава данни с <-, и може да бъде затворен с close.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Горният код отпечатва 1 и 2, използвайки channel. Този код просто показва изпращане и получаване на стойности в channel. Въпреки това, channel предлага повече функции от това. Първо, нека научим за buffered channel и unbuffered channel. За начало, примерът, написан по-горе, е unbuffered channel, където изпращането на данни към канала и получаването на данни трябва да се извършват едновременно. Ако тези действия не се извършват едновременно, може да възникне deadlock.

buffered channel

Ами ако горният код не е просто отпечатване, а два процеса, които изпълняват тежки задачи? Ако вторият процес чете и изпълнява процеса и отнеме дълго време, първият процес също ще спре за това време. Можем да използваме buffered channel, за да предотвратим тази ситуация.

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Горният код отпечатва 1 и 2, използвайки buffered channel. В този код, чрез използване на buffered channel, направихме така, че изпращането на данни към channel и получаването на данни да не трябва да се извършват едновременно. Чрез поставяне на буфер в канала, има допълнително пространство за тази дължина, което може да предотврати забавянето на задачите, причинено от влиянието на последващи задачи.

select

Когато се занимавате с множество канали, можете лесно да реализирате структура fan-in, използвайки синтаксиса select.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

Горният код създава 3 канала, които периодично предават 1, 2 и 3, и използва select, за да получи стойности от каналите и да ги отпечата. Използвайки select по този начин, можете да получавате данни от множество канали едновременно и да ги обработвате, веднага щом получите стойностите от каналите.

for range

channel може лесно да получава данни, използвайки for range. Когато използвате for range в канал, той се изпълнява всеки път, когато се добавят данни към канала, и спира цикъла, когато каналът е затворен.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Горният код отпечатва 1 и 2, използвайки channel. В този код използваме for range, за да получаваме и отпечатваме данни всеки път, когато се добавят данни към канала. И спира цикъла, когато каналът е затворен.

Както писахме няколко пъти по-горе, този синтаксис може да се използва и като просто средство за синхронизация.

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("Hello, World!")
 9    }()
10
11    fmt.Println("Waiting for goroutine...")
12    for range ch {}
13}

Горният код отпечатва Hello, World! след 1 секунда пауза. В този код използвахме channel, за да променим синхронния код в асинхронен код. Използвайки channel по този начин, можете лесно да промените синхронния код в асинхронен код и да зададете точката join.

etc

  1. Ако изпратите или получите данни в nil channel, може да попаднете в безкраен цикъл и да възникне deadlock.
  2. Ако изпратите данни, след като каналът е затворен, възниква panic.
  3. Не е необходимо да затваряте канала, GC затваря канала, докато го събира.

mutex

spinlock

spinlock е метод за синхронизация, който се опитва да заключи непрекъснато, докато върти цикъл. В езика Go можете лесно да приложите spinlock, като използвате указатели.

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

Горният код е код, който реализира пакета spinlock. Този код използва пакета sync/atomic, за да реализира SpinLock. Методът Lock използва atomic.CompareAndSwapUintptr, за да опита заключване, а методът Unlock използва atomic.StoreUintptr, за да освободи заключването. Този метод се опитва да заключи, без да си почива, така че продължава да използва CPU, докато получи заключването, и може да попадне в безкраен цикъл. Ето защо е добра идея да използвате spinlock за проста синхронизация или когато го използвате само за кратко време.

sync.Mutex

mutex е инструмент за синхронизация между goroutine. mutex, предоставен от пакета sync, предоставя методи като Lock, Unlock, RLock, RUnlock. mutex може да бъде създаден с sync.Mutex, а sync.RWMutex може да се използва за четене/записване на заключвания.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

В горния код, почти едновременно, два goroutine имат достъп до една и съща променлива count. В този случай, ако създадете секция, критична за кода, който има достъп до променливата count, като използвате mutex, можете да предотвратите едновременния достъп до променливата count. Тогава този код ще отпечата 2 еднакво, независимо колко пъти се изпълнява.

sync.RWMutex

sync.RWMutex е mutex, който може да се използва, като се разграничават заключванията за четене и заключванията за писане. Можете да заключите и освободите заключването за четене, като използвате методите RLock и RUnlock.

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

Горният код е код, който реализира ConcurrentMap, използвайки sync.RWMutex. В този код, методът Get заключва заключването за четене, а методът Set заключва заключването за писане, за да получи достъп и да модифицира безопасно картата data. Причината, поради която е необходимо заключване за четене, е, че в много случаи на прости операции за четене, множество goroutine могат да извършват операции за четене едновременно, като просто заключват заключването за четене, без да заключват заключването за писане. Чрез това можете да подобрите производителността, като заключите само заключването за четене, когато няма промяна в състоянието и не е необходимо да заключвате заключването за писане.

fakelock

fakelock е прост трик за реализиране на sync.Locker. Тази структура предоставя същите методи като sync.Mutex, но всъщност не прави нищо.

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

Горният код е код, който реализира пакета fakelock. Този пакет реализира sync.Locker, за да предостави методите Lock и Unlock, но всъщност не прави нищо. Ще опишем защо е необходим такъв код, когато имаме възможност.

waitgroup

sync.WaitGroup

sync.WaitGroup е инструмент за изчакване, докато всички задачи на goroutine приключат. Той предоставя методите Add, Done, Wait, добавя броя на goroutine с метода Add, и уведомява, че задачата на goroutine е приключила с метода Done. И изчаква всички задачи на goroutine да приключат с метода Wait.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Горният код е код, който използва sync.WaitGroup, за да позволи на 100 горутини едновременно да добавят стойности към променливата c. В този код, sync.WaitGroup се използва, за да се изчака всички горутини да завършат, преди да се отпечата стойността, добавена към променливата c. В случаите, когато трябва да се изпълнят fork & join няколко задачи, е достатъчно да се използват само канали, но в случаите, когато трябва да се изпълнят fork & join голям брой задачи, използването на sync.WaitGroup също е добър избор.

with slice

Ако се използва с парче (slice), waitgroup може да бъде чудесен инструмент за управление на едновременно изпълнявани задачи без заключване.

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	arr := [10]int{}
12
13	for i := 0; i < 10; i++ {
14		wg.Add(1)
15		go func(id int) {
16			defer wg.Done()
17
18			arr[id] = rand.Intn(100)
19		}(i)
20	}
21
22	wg.Wait()
23	fmt.Println("Done")
24
25    for i, v := range arr {
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

Горният код е код, който използва само waitgroup, за да позволи на всяка горутина едновременно да генерира 10 случайни цели числа и да ги съхрани в присвоения индекс. В този код, waitgroup се използва, за да се изчака всички горутини да завършат, преди да се отпечата Done. Използвайки waitgroup по този начин, е възможно множество горутини да изпълняват задачи едновременно, да съхраняват данни без заключване, докато всички горутини не завършат, и да извършват последваща обработка наведнъж след приключване на задачите.

golang.org/x/sync/errgroup.ErrGroup

errgroup е пакет, който разширява sync.WaitGroup. За разлика от sync.WaitGroup, ако възникне грешка в някоя от задачите на горутините, errgroup отменя всички горутини и връща грешка.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}

Горният код е код, който използва errgroup, за да създаде 10 горутини и да предизвика грешка в петата горутина. Умишлено предизвикахме грешка в петата горутина, за да покажем случая, когато възниква грешка. Въпреки това, когато се използва реално, е необходимо да се използва errgroup за създаване на горутини и да се извърши различна последваща обработка за случаите, когато възникне грешка във всяка горутина.

once

Инструмент за изпълнение на код, който трябва да бъде изпълнен само веднъж. Можете да изпълните съответния код чрез конструктора по-долу.

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc просто позволява на съответната функция да бъде изпълнена само веднъж в цялост.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}

Горният код е код, който използва sync.OnceFunc, за да отпечата Hello, World!. В този код, sync.OnceFunc се използва, за да се създаде функцията once и дори ако функцията once бъде извикана няколко пъти, Hello, World! се отпечатва само веднъж.

OnceValue

OnceValue не само позволява на съответната функция да бъде изпълнена само веднъж в цялост, но също така запазва върнатата стойност на съответната функция и връща запазената стойност, когато бъде извикана отново.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}

Горният код е код, който използва sync.OnceValue, за да увеличи променливата c с 1. В този код, sync.OnceValue се използва, за да се създаде функцията once и дори ако функцията once бъде извикана няколко пъти, се връща 1, като променливата c се увеличава само веднъж.

OnceValues

OnceValues работи по същия начин като OnceValue, но може да връща множество стойности.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}

Горният код е код, който използва sync.OnceValues, за да увеличи променливата c с 1. В този код, sync.OnceValues се използва, за да се създаде функцията once и дори ако функцията once бъде извикана няколко пъти, се връща 1, като променливата c се увеличава само веднъж.

atomic

atomic пакетът е пакет, който предоставя атомарни операции. atomic пакетът предоставя методи като Add, CompareAndSwap, Load, Store, Swap, но напоследък се препоръчва използването на типове като Int64, Uint64, Pointer.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Това е примерът, който беше използван преди. Това е код, който използва типа atomic.Int64, за да увеличи атомарно променливата c. Можете атомарно да увеличите променливата и да прочетете променливата с методите Add и Load. Също така, можете да запазите стойност с метода Store, да замените стойност с метода Swap и да сравните стойност и след това да я замените, ако е подходяща с метода CompareAndSwap.

cond

sync.Cond

cond пакетът е пакет, който предоставя условни променливи. cond пакетът може да бъде създаден със sync.Cond и предоставя методи Wait, Signal, Broadcast.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}

Горният код е код, който използва sync.Cond, за да изчака променливата ready да стане true. В този код, sync.Cond се използва, за да се изчака променливата ready да стане true, преди да се отпечата Ready!. Използвайки sync.Cond по този начин, е възможно множество горутини да чакат едновременно, докато не бъде изпълнено определено условие.

Използвайки това, можете да реализирате проста queue.

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

Използвайки sync.Cond по този начин, можете ефективно да изчаквате и да действате отново, когато условието е изпълнено, вместо да използвате много CPU използване със spin-lock.

semaphore

golang.org/x/sync/semaphore.Semaphore

semaphore пакетът е пакет, който предоставя семафори. semaphore пакетът може да бъде създаден с golang.org/x/sync/semaphore.Semaphore и предоставя методи Acquire, Release, TryAcquire.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}

Горният код е код, който използва semaphore, за да създаде семафор, и използва семафора, за да получи семафора с метода Acquire и да освободи семафора с метода Release. В този код показах как да получите и освободите семафор с помощта на semaphore.

В заключение

Изглежда, че това е всичко, от което се нуждаете за основните неща. Надявам се, че въз основа на съдържанието на тази статия, вие разбирате как да управлявате конкурентността с помощта на горутини и можете да го използвате на практика. Надявам се тази статия да ви е била полезна. Благодаря ви.