Go Starter Pack для параллельного программирования
Обзор
Краткое введение
В языке Go есть множество инструментов для управления параллелизмом. В этой статье мы представим некоторые из них и трюки.
Горутины?
goroutine — это новая форма модели параллелизма, поддерживаемая языком Go. Обычно программа получает потоки ОС от ОС для одновременного выполнения нескольких задач и выполняет задачи параллельно в соответствии с количеством ядер. А для выполнения параллелизма меньшего размера в пользовательском пространстве создаются зеленые потоки, чтобы несколько зеленых потоков работали внутри одного потока ОС и выполняли задачи по очереди. Но в случае с горутинами эта форма зеленых потоков стала меньше и эффективнее. Эти горутины используют меньше памяти, чем потоки, и могут создаваться и переключаться быстрее, чем потоки.
Чтобы использовать горутину, нужно просто использовать ключевое слово go
. Это позволяет интуитивно выполнять синхронный код как асинхронный в процессе написания программы.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch)
12 time.Sleep(1 * time.Second)
13 fmt.Println("Hello, World!")
14 }()
15
16 fmt.Println("Waiting for goroutine...")
17 for range ch {}
18}
Этот код просто изменяет синхронный код, который ждет 1 секунду, а затем выводит Hello, World!
, на асинхронный поток. Текущий пример прост, но если вы измените немного более сложный код с синхронного на асинхронный, читаемость, наглядность и понимание кода станут лучше, чем у существующих методов async await или promise.
Однако во многих случаях плохой код горутин создается, когда этот синхронный код просто вызывается асинхронно, а поток, подобный fork & join
(поток, аналогичный принципу «разделяй и властвуй»), не понят. В этой статье мы представим несколько методов и приемов, которые могут подготовиться к этим случаям.
Управление параллелизмом
context
Неожиданно, что context
появляется в качестве первого метода управления. Но в языке Go context
выходит за рамки простой функции отмены и играет отличную роль в управлении всем деревом задач. Для тех, кто не знает, давайте кратко объясним этот пакет.
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background())
5 defer cancel()
6
7 go func() {
8 <-ctx.Done()
9 fmt.Println("Context is done!")
10 }()
11
12 time.Sleep(1 * time.Second)
13
14 cancel()
15
16 time.Sleep(1 * time.Second)
17}
Приведенный выше код — это код, который выводит Context is done!
через 1 секунду, используя context
. context
может проверить, отменена ли она, через метод Done()
, и предоставляет различные методы отмены, такие как WithCancel
, WithTimeout
, WithDeadline
, WithValue
.
Давайте создадим простой пример. Предположим, вы пишете код для получения user
, post
и comment
, используя шаблон aggregator
, чтобы получить некоторые данные. И если все запросы должны быть выполнены в течение 2 секунд, вы можете написать их следующим образом.
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
5 defer cancel()
6
7 ch := make(chan struct{})
8 go func() {
9 defer close(ch)
10 user := getUser(ctx)
11 post := getPost(ctx)
12 comment := getComment(ctx)
13
14 fmt.Println(user, post, comment)
15 }()
16
17 select {
18 case <-ctx.Done():
19 fmt.Println("Timeout!")
20 case <-ch:
21 fmt.Println("All data is fetched!")
22 }
23}
Приведенный выше код выводит Timeout!
, если не удается получить все данные в течение 2 секунд, и All data is fetched!
, если все данные получены. Используя context
таким образом, можно легко управлять отменой и тайм-аутом даже в коде, где работает несколько горутин.
Различные функции и методы, связанные с context, можно найти в godoc context. Я надеюсь, что вы изучите простые вещи и сможете использовать их с комфортом.
channel
unbuffered channel
channel
— это инструмент для связи между горутинами. channel
можно создать с помощью make(chan T)
. В это время T
является типом данных, которые будет передавать этот channel
. channel
может отправлять и получать данные с помощью <-
, и channel
можно закрыть с помощью close
.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Приведенный выше код — это код, который выводит 1 и 2, используя channel
. Этот код просто показывает отправку и получение значений в channel
. Но channel
предлагает больше возможностей, чем это. Сначала давайте узнаем о buffered channel
и unbuffered channel
. Начнем с того, что приведенный выше пример представляет собой unbuffered channel
, и действие отправки данных в канал и действие получения данных должны выполняться одновременно. Если это действие не выполняется одновременно, может возникнуть тупиковая ситуация.
buffered channel
Что, если приведенный выше код — это не просто вывод, а два процесса, выполняющие тяжелую работу? Если второй процесс зависает на длительное время при чтении и выполнении обработки, первый процесс также будет остановлен на это время. Мы можем использовать buffered channel
, чтобы предотвратить эту ситуацию.
1package main
2
3func main() {
4 ch := make(chan int, 2)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Приведенный выше код — это код, который выводит 1 и 2, используя buffered channel
. В этом коде мы используем buffered channel
, чтобы действие отправки данных в channel
и действие получения данных не выполнялись одновременно. Размещение буфера в канале таким образом позволяет избежать задержек в работе, вызванных влиянием последующих задач, благодаря наличию запаса, равного этой длине.
select
При работе с несколькими каналами можно легко реализовать структуру fan-in
с помощью оператора select
.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10)
10 ch2 := make(chan int, 10)
11 ch3 := make(chan int, 10)
12
13 go func() {
14 for {
15 ch1 <- 1
16 time.Sleep(1 * time.Second)
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2
22 time.Sleep(2 * time.Second)
23 }
24 }()
25 go func() {
26 for {
27 ch3 <- 3
28 time.Sleep(3 * time.Second)
29 }
30 }()
31
32 for i := 0; i < 3; i++ {
33 select {
34 case v := <-ch1:
35 fmt.Println(v)
36 case v := <-ch2:
37 fmt.Println(v)
38 case v := <-ch3:
39 fmt.Println(v)
40 }
41 }
42}
Приведенный выше код — это код, который создает 3 канала, которые периодически передают 1, 2 и 3, и использует select
для получения и вывода значений из каналов. Используя select
таким образом, можно одновременно передавать данные из нескольких каналов и обрабатывать их по мере получения значений из каналов.
for range
channel
может легко получать данные с помощью for range
. Когда for range
используется в канале, он работает каждый раз, когда в канал добавляются данные, и завершает цикл, когда канал закрывается.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Приведенный выше код — это код, который выводит 1 и 2, используя channel
. В этом коде мы используем for range
для получения и вывода данных каждый раз, когда данные добавляются в канал. И цикл завершается, когда канал закрывается.
Как я уже писал несколько раз выше, этот синтаксис можно использовать как простой способ синхронизации.
1package main
2
3func main() {
4 ch := make(chan struct{})
5 go func() {
6 defer close(ch)
7 time.Sleep(1 * time.Second)
8 fmt.Println("Hello, World!")
9 }()
10
11 fmt.Println("Waiting for goroutine...")
12 for range ch {}
13}
Приведенный выше код — это код, который выводит Hello, World!
после 1 секунды ожидания. В этом коде мы использовали channel
для изменения синхронного кода на асинхронный. Используя channel
таким образом, вы можете легко изменить синхронный код на асинхронный и установить точку join
.
etc
- Если вы отправляете или получаете данные в nil channel, вы можете попасть в бесконечный цикл и вызвать тупиковую ситуацию.
- Если вы отправляете данные после закрытия канала, возникает паника.
- Даже если вы не закрываете канал, GC собирает его и закрывает канал.
mutex
spinlock
spinlock
— это метод синхронизации, который пытается получить блокировку, перебирая цикл. В языке Go вы можете легко реализовать спин-блокировку, используя указатели.
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14 runtime.Gosched()
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{}
24}
Приведенный выше код — это код, который реализует пакет spinlock
. В этом коде мы реализовали SpinLock
, используя пакет sync/atomic
. Метод Lock
пытается получить блокировку, используя atomic.CompareAndSwapUintptr
, а метод Unlock
освобождает блокировку, используя atomic.StoreUintptr
. Поскольку этот метод постоянно пытается получить блокировку, он продолжает использовать ЦП, пока не получит блокировку, и может попасть в бесконечный цикл. Поэтому spinlock
лучше использовать для простой синхронизации или когда он используется только в течение короткого времени.
sync.Mutex
mutex
— это инструмент для синхронизации между горутинами. mutex
, предоставляемый пакетом sync
, предоставляет такие методы, как Lock
, Unlock
, RLock
, RUnlock
. mutex
можно создать с помощью sync.Mutex
, а блокировку чтения/записи также можно использовать с помощью sync.RWMutex
.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex
9 var count int
10
11 go func() {
12 mu.Lock()
13 count++
14 mu.Unlock()
15 }()
16
17 mu.Lock()
18 count++
19 mu.Unlock()
20
21 println(count)
22}
В приведенном выше коде две горутины обращаются к одной и той же переменной count
почти одновременно. В это время, если вы сделаете код доступа к переменной count
критическим разделом, используя mutex
, вы можете предотвратить одновременный доступ к переменной count
. Тогда этот код будет всегда выводить 2
независимо от того, сколько раз он выполняется.
sync.RWMutex
sync.RWMutex
— это mutex
, который может использоваться с разделением блокировок чтения и записи. Вы можете заблокировать и разблокировать блокировку чтения, используя методы RLock
и RUnlock
.
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex
9 data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock()
14 defer m.RUnlock()
15
16 value, ok := m.data[key]
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock()
22 defer m.Unlock()
23
24 m.data[key] = value
25}
Приведенный выше код — это код, который реализует ConcurrentMap
, используя sync.RWMutex
. В этом коде мы можем безопасно получать доступ и изменять карту data
, заблокировав чтение в методе Get
и заблокировав запись в методе Set
. Причина, по которой необходима блокировка чтения, заключается в том, чтобы разрешить нескольким горутинам одновременно выполнять операции чтения, блокируя только чтение, а не блокируя запись, в случае, когда существует много простых операций чтения. Это позволяет повысить производительность, блокируя только чтение, когда нет необходимости блокировать запись, потому что состояние не меняется.
fakelock
fakelock
— это простой трюк, реализующий sync.Locker
. Эта структура предоставляет те же методы, что и sync.Mutex
, но фактически не выполняет никаких действий.
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}
Приведенный выше код — это код, который реализует пакет fakelock
. Этот пакет реализует sync.Locker
и предоставляет методы Lock
и Unlock
, но на самом деле ничего не делает. Я опишу, зачем нужен этот код, как будет возможность.
waitgroup
sync.WaitGroup
sync.WaitGroup
— это инструмент для ожидания завершения всех работ горутины. Он предоставляет методы Add
, Done
и Wait
. Добавьте количество горутин методом Add
, сообщите о завершении работы горутины методом Done
и подождите, пока не завершится работа всех горутин, используя метод Wait
.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Этот код использует sync.WaitGroup
для того, чтобы 100 горутин одновременно добавляли значение к переменной c
. В этом коде sync.WaitGroup
используется для ожидания завершения всех горутин, после чего выводится значение, добавленное к переменной c
. В случаях, когда нужно просто выполнить несколько операций типа fork & join
, достаточно использовать только каналы, но в случаях, когда нужно выполнить большое количество операций типа fork & join
, использование sync.WaitGroup
также является хорошим вариантом.
with slice
При использовании со слайсами waitgroup
может стать отличным инструментом для управления параллельным выполнением задач без блокировок.
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup
11 arr := [10]int{}
12
13 for i := 0; i < 10; i++ {
14 wg.Add(1)
15 go func(id int) {
16 defer wg.Done()
17
18 arr[id] = rand.Intn(100)
19 }(i)
20 }
21
22 wg.Wait()
23 fmt.Println("Done")
24
25 for i, v := range arr {
26 fmt.Printf("arr[%d] = %d\n", i, v)
27 }
28}
В приведенном выше коде используется только waitgroup
, и каждая горутина одновременно генерирует 10 случайных целых чисел и сохраняет их по назначенному индексу. В этом коде waitgroup
используется для ожидания завершения всех горутин, после чего выводится Done
. Таким образом, используя waitgroup
, можно обеспечить параллельное выполнение задач несколькими горутинами, сохранение данных без блокировок до завершения всех горутин и выполнение постобработки после завершения работы.
golang.org/x/sync/errgroup.ErrGroup
errgroup
— это пакет, расширяющий sync.WaitGroup
. В отличие от sync.WaitGroup
, если в работе одной из горутин возникает ошибка, errgroup
отменяет все горутины и возвращает ошибку.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background())
11 _ = ctx
12
13 for i := 0; i < 10; i++ {
14 i := i
15 g.Go(func() error {
16 if i == 5 {
17 return fmt.Errorf("error")
18 }
19 return nil
20 })
21 }
22
23 if err := g.Wait(); err != nil {
24 fmt.Println(err)
25 }
26}
В приведенном выше коде используется errgroup
для создания 10 горутин, и в пятой горутине генерируется ошибка. Намеренно сгенерировав ошибку в пятой горутине, мы продемонстрировали случай возникновения ошибки. Однако на практике, используя errgroup
, можно создавать горутины и выполнять различные виды постобработки в случае возникновения ошибки в каждой горутине.
once
Инструмент для выполнения кода, который должен быть выполнен только один раз. Следующие конструкторы позволяют выполнить соответствующий код.
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFunc
просто обеспечивает выполнение данной функции только один раз за все время.
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() {
7 println("Hello, World!")
8 })
9
10 once()
11 once()
12 once()
13 once()
14 once()
15}
В приведенном выше коде используется sync.OnceFunc
для вывода Hello, World!
. В этом коде sync.OnceFunc
используется для создания функции once
, и даже если функция once
вызывается несколько раз, Hello, World!
выводится только один раз.
OnceValue
OnceValue
не просто обеспечивает выполнение данной функции только один раз за все время, но и сохраняет возвращаемое значение данной функции, возвращая сохраненное значение при повторном вызове.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValue(func() int {
8 c += 1
9 return c
10 })
11
12 println(once())
13 println(once())
14 println(once())
15 println(once())
16 println(once())
17}
В приведенном выше коде используется sync.OnceValue
для увеличения переменной c
на 1. В этом коде sync.OnceValue
используется для создания функции once
, и даже если функция once
вызывается несколько раз, возвращается 1, то есть значение переменной c
, которое было увеличено только один раз.
OnceValues
OnceValues
работает так же, как и OnceValue
, но может возвращать несколько значений.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValues(func() (int, int) {
8 c += 1
9 return c, c
10 })
11
12 a, b := once()
13 println(a, b)
14 a, b = once()
15 println(a, b)
16 a, b = once()
17 println(a, b)
18 a, b = once()
19 println(a, b)
20 a, b = once()
21 println(a, b)
22}
В приведенном выше коде используется sync.OnceValues
для увеличения переменной c
на 1. В этом коде sync.OnceValues
используется для создания функции once
, и даже если функция once
вызывается несколько раз, возвращается 1, то есть значение переменной c
, которое было увеличено только один раз.
atomic
Пакет atomic
— это пакет, предоставляющий атомарные операции. Пакет atomic
предоставляет такие методы, как Add
, CompareAndSwap
, Load
, Store
, Swap
, но в последнее время рекомендуется использовать типы Int64
, Uint64
, Pointer
и другие.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Это пример, который использовался ранее. Это код, в котором переменная c
атомарно увеличивается с использованием типа atomic.Int64
. С помощью методов Add
и Load
можно атомарно увеличивать переменную и считывать ее значение. Также можно сохранять значения с помощью метода Store
, менять значения местами с помощью метода Swap
, а также сравнивать и менять значения, если они подходят, с помощью метода CompareAndSwap
.
cond
sync.Cond
Пакет cond
— это пакет, предоставляющий условные переменные. Пакет cond
можно создать с помощью sync.Cond
и предоставляет методы Wait
, Signal
, Broadcast
.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{})
9 ready := false
10
11 go func() {
12 c.L.Lock()
13 ready = true
14 c.Signal()
15 c.L.Unlock()
16 }()
17
18 c.L.Lock()
19 for !ready {
20 c.Wait()
21 }
22 c.L.Unlock()
23
24 println("Ready!")
25}
Приведенный выше код использует sync.Cond
для ожидания, пока переменная ready
не станет true
. В этом коде sync.Cond
используется для ожидания, пока переменная ready
не станет true
, после чего выводится Ready!
. Таким образом, используя sync.Cond
, можно заставить несколько горутин одновременно ждать выполнения определенного условия.
Используя это, можно реализовать простую очередь queue
.
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex
15 Cond *sync.Cond
16 Head *Node[T]
17 Tail *Node[T]
18 Len int
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{}
23 q.Cond = sync.NewCond(&q.Mutex)
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock()
29 defer q.Unlock()
30
31 node := &Node[T]{Value: value}
32 if q.Len == 0 {
33 q.Head = node
34 q.Tail = node
35 } else {
36 q.Tail.Next = node
37 q.Tail = node
38 }
39 q.Len++
40 q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock()
45 defer q.Unlock()
46
47 for q.Len == 0 {
48 q.Cond.Wait()
49 }
50
51 node := q.Head
52 q.Head = q.Head.Next
53 q.Len--
54 return node.Value
55}
Используя sync.Cond
таким образом, можно эффективно ожидать и возобновлять работу при выполнении условия, вместо того чтобы использовать spin-lock
, который потребляет много ресурсов процессора.
semaphore
golang.org/x/sync/semaphore.Semaphore
Пакет semaphore
— это пакет, предоставляющий семафоры. Пакет semaphore
можно создать с помощью golang.org/x/sync/semaphore.Semaphore
и предоставляет методы Acquire
, Release
, TryAcquire
.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1)
11
12 if s.TryAcquire(1) {
13 fmt.Println("Acquired!")
14 } else {
15 fmt.Println("Not Acquired!")
16 }
17
18 s.Release(1)
19}
Приведенный выше код использует semaphore
для создания семафора, получения семафора с помощью метода Acquire
и освобождения семафора с помощью метода Release
. В этом коде показано, как получить и освободить семафор с помощью semaphore
.
Заключение
Похоже, что на этом можно закончить с основными моментами. Надеюсь, что на основе содержания этой статьи вы поняли, как использовать горутины для управления параллелизмом, и сможете применять это на практике. Я надеюсь, что эта статья окажется вам полезной. Спасибо.