GoSuda

Go Starter Pack для параллельного программирования

By snowmerak
views ...

Обзор

Краткое введение

В языке Go есть множество инструментов для управления параллелизмом. В этой статье мы представим некоторые из них и трюки.

Горутины?

goroutine — это новая форма модели параллелизма, поддерживаемая языком Go. Обычно программа получает потоки ОС от ОС для одновременного выполнения нескольких задач и выполняет задачи параллельно в соответствии с количеством ядер. А для выполнения параллелизма меньшего размера в пользовательском пространстве создаются зеленые потоки, чтобы несколько зеленых потоков работали внутри одного потока ОС и выполняли задачи по очереди. Но в случае с горутинами эта форма зеленых потоков стала меньше и эффективнее. Эти горутины используют меньше памяти, чем потоки, и могут создаваться и переключаться быстрее, чем потоки.

Чтобы использовать горутину, нужно просто использовать ключевое слово go. Это позволяет интуитивно выполнять синхронный код как асинхронный в процессе написания программы.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("Hello, World!")
14    }()
15
16    fmt.Println("Waiting for goroutine...")
17    for range ch {}
18}

Этот код просто изменяет синхронный код, который ждет 1 секунду, а затем выводит Hello, World!, на асинхронный поток. Текущий пример прост, но если вы измените немного более сложный код с синхронного на асинхронный, читаемость, наглядность и понимание кода станут лучше, чем у существующих методов async await или promise.

Однако во многих случаях плохой код горутин создается, когда этот синхронный код просто вызывается асинхронно, а поток, подобный fork & join (поток, аналогичный принципу «разделяй и властвуй»), не понят. В этой статье мы представим несколько методов и приемов, которые могут подготовиться к этим случаям.

Управление параллелизмом

context

Неожиданно, что context появляется в качестве первого метода управления. Но в языке Go context выходит за рамки простой функции отмены и играет отличную роль в управлении всем деревом задач. Для тех, кто не знает, давайте кратко объясним этот пакет.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("Context is done!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

Приведенный выше код — это код, который выводит Context is done! через 1 секунду, используя context. context может проверить, отменена ли она, через метод Done(), и предоставляет различные методы отмены, такие как WithCancel, WithTimeout, WithDeadline, WithValue.

Давайте создадим простой пример. Предположим, вы пишете код для получения user, post и comment, используя шаблон aggregator, чтобы получить некоторые данные. И если все запросы должны быть выполнены в течение 2 секунд, вы можете написать их следующим образом.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("Timeout!")
20    case <-ch:
21        fmt.Println("All data is fetched!")
22    }
23}

Приведенный выше код выводит Timeout!, если не удается получить все данные в течение 2 секунд, и All data is fetched!, если все данные получены. Используя context таким образом, можно легко управлять отменой и тайм-аутом даже в коде, где работает несколько горутин.

Различные функции и методы, связанные с context, можно найти в godoc context. Я надеюсь, что вы изучите простые вещи и сможете использовать их с комфортом.

channel

unbuffered channel

channel — это инструмент для связи между горутинами. channel можно создать с помощью make(chan T). В это время T является типом данных, которые будет передавать этот channel. channel может отправлять и получать данные с помощью <-, и channel можно закрыть с помощью close.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Приведенный выше код — это код, который выводит 1 и 2, используя channel. Этот код просто показывает отправку и получение значений в channel. Но channel предлагает больше возможностей, чем это. Сначала давайте узнаем о buffered channel и unbuffered channel. Начнем с того, что приведенный выше пример представляет собой unbuffered channel, и действие отправки данных в канал и действие получения данных должны выполняться одновременно. Если это действие не выполняется одновременно, может возникнуть тупиковая ситуация.

buffered channel

Что, если приведенный выше код — это не просто вывод, а два процесса, выполняющие тяжелую работу? Если второй процесс зависает на длительное время при чтении и выполнении обработки, первый процесс также будет остановлен на это время. Мы можем использовать buffered channel, чтобы предотвратить эту ситуацию.

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Приведенный выше код — это код, который выводит 1 и 2, используя buffered channel. В этом коде мы используем buffered channel, чтобы действие отправки данных в channel и действие получения данных не выполнялись одновременно. Размещение буфера в канале таким образом позволяет избежать задержек в работе, вызванных влиянием последующих задач, благодаря наличию запаса, равного этой длине.

select

При работе с несколькими каналами можно легко реализовать структуру fan-in с помощью оператора select.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

Приведенный выше код — это код, который создает 3 канала, которые периодически передают 1, 2 и 3, и использует select для получения и вывода значений из каналов. Используя select таким образом, можно одновременно передавать данные из нескольких каналов и обрабатывать их по мере получения значений из каналов.

for range

channel может легко получать данные с помощью for range. Когда for range используется в канале, он работает каждый раз, когда в канал добавляются данные, и завершает цикл, когда канал закрывается.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Приведенный выше код — это код, который выводит 1 и 2, используя channel. В этом коде мы используем for range для получения и вывода данных каждый раз, когда данные добавляются в канал. И цикл завершается, когда канал закрывается.

Как я уже писал несколько раз выше, этот синтаксис можно использовать как простой способ синхронизации.

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("Hello, World!")
 9    }()
10
11    fmt.Println("Waiting for goroutine...")
12    for range ch {}
13}

Приведенный выше код — это код, который выводит Hello, World! после 1 секунды ожидания. В этом коде мы использовали channel для изменения синхронного кода на асинхронный. Используя channel таким образом, вы можете легко изменить синхронный код на асинхронный и установить точку join.

etc

  1. Если вы отправляете или получаете данные в nil channel, вы можете попасть в бесконечный цикл и вызвать тупиковую ситуацию.
  2. Если вы отправляете данные после закрытия канала, возникает паника.
  3. Даже если вы не закрываете канал, GC собирает его и закрывает канал.

mutex

spinlock

spinlock — это метод синхронизации, который пытается получить блокировку, перебирая цикл. В языке Go вы можете легко реализовать спин-блокировку, используя указатели.

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

Приведенный выше код — это код, который реализует пакет spinlock. В этом коде мы реализовали SpinLock, используя пакет sync/atomic. Метод Lock пытается получить блокировку, используя atomic.CompareAndSwapUintptr, а метод Unlock освобождает блокировку, используя atomic.StoreUintptr. Поскольку этот метод постоянно пытается получить блокировку, он продолжает использовать ЦП, пока не получит блокировку, и может попасть в бесконечный цикл. Поэтому spinlock лучше использовать для простой синхронизации или когда он используется только в течение короткого времени.

sync.Mutex

mutex — это инструмент для синхронизации между горутинами. mutex, предоставляемый пакетом sync, предоставляет такие методы, как Lock, Unlock, RLock, RUnlock. mutex можно создать с помощью sync.Mutex, а блокировку чтения/записи также можно использовать с помощью sync.RWMutex.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

В приведенном выше коде две горутины обращаются к одной и той же переменной count почти одновременно. В это время, если вы сделаете код доступа к переменной count критическим разделом, используя mutex, вы можете предотвратить одновременный доступ к переменной count. Тогда этот код будет всегда выводить 2 независимо от того, сколько раз он выполняется.

sync.RWMutex

sync.RWMutex — это mutex, который может использоваться с разделением блокировок чтения и записи. Вы можете заблокировать и разблокировать блокировку чтения, используя методы RLock и RUnlock.

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

Приведенный выше код — это код, который реализует ConcurrentMap, используя sync.RWMutex. В этом коде мы можем безопасно получать доступ и изменять карту data, заблокировав чтение в методе Get и заблокировав запись в методе Set. Причина, по которой необходима блокировка чтения, заключается в том, чтобы разрешить нескольким горутинам одновременно выполнять операции чтения, блокируя только чтение, а не блокируя запись, в случае, когда существует много простых операций чтения. Это позволяет повысить производительность, блокируя только чтение, когда нет необходимости блокировать запись, потому что состояние не меняется.

fakelock

fakelock — это простой трюк, реализующий sync.Locker. Эта структура предоставляет те же методы, что и sync.Mutex, но фактически не выполняет никаких действий.

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

Приведенный выше код — это код, который реализует пакет fakelock. Этот пакет реализует sync.Locker и предоставляет методы Lock и Unlock, но на самом деле ничего не делает. Я опишу, зачем нужен этот код, как будет возможность.

waitgroup

sync.WaitGroup

sync.WaitGroup — это инструмент для ожидания завершения всех работ горутины. Он предоставляет методы Add, Done и Wait. Добавьте количество горутин методом Add, сообщите о завершении работы горутины методом Done и подождите, пока не завершится работа всех горутин, используя метод Wait.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Этот код использует sync.WaitGroup для того, чтобы 100 горутин одновременно добавляли значение к переменной c. В этом коде sync.WaitGroup используется для ожидания завершения всех горутин, после чего выводится значение, добавленное к переменной c. В случаях, когда нужно просто выполнить несколько операций типа fork & join, достаточно использовать только каналы, но в случаях, когда нужно выполнить большое количество операций типа fork & join, использование sync.WaitGroup также является хорошим вариантом.

with slice

При использовании со слайсами waitgroup может стать отличным инструментом для управления параллельным выполнением задач без блокировок.

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	arr := [10]int{}
12
13	for i := 0; i < 10; i++ {
14		wg.Add(1)
15		go func(id int) {
16			defer wg.Done()
17
18			arr[id] = rand.Intn(100)
19		}(i)
20	}
21
22	wg.Wait()
23	fmt.Println("Done")
24
25    for i, v := range arr {
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

В приведенном выше коде используется только waitgroup, и каждая горутина одновременно генерирует 10 случайных целых чисел и сохраняет их по назначенному индексу. В этом коде waitgroup используется для ожидания завершения всех горутин, после чего выводится Done. Таким образом, используя waitgroup, можно обеспечить параллельное выполнение задач несколькими горутинами, сохранение данных без блокировок до завершения всех горутин и выполнение постобработки после завершения работы.

golang.org/x/sync/errgroup.ErrGroup

errgroup — это пакет, расширяющий sync.WaitGroup. В отличие от sync.WaitGroup, если в работе одной из горутин возникает ошибка, errgroup отменяет все горутины и возвращает ошибку.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}

В приведенном выше коде используется errgroup для создания 10 горутин, и в пятой горутине генерируется ошибка. Намеренно сгенерировав ошибку в пятой горутине, мы продемонстрировали случай возникновения ошибки. Однако на практике, используя errgroup, можно создавать горутины и выполнять различные виды постобработки в случае возникновения ошибки в каждой горутине.

once

Инструмент для выполнения кода, который должен быть выполнен только один раз. Следующие конструкторы позволяют выполнить соответствующий код.

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc просто обеспечивает выполнение данной функции только один раз за все время.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}

В приведенном выше коде используется sync.OnceFunc для вывода Hello, World!. В этом коде sync.OnceFunc используется для создания функции once, и даже если функция once вызывается несколько раз, Hello, World! выводится только один раз.

OnceValue

OnceValue не просто обеспечивает выполнение данной функции только один раз за все время, но и сохраняет возвращаемое значение данной функции, возвращая сохраненное значение при повторном вызове.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}

В приведенном выше коде используется sync.OnceValue для увеличения переменной c на 1. В этом коде sync.OnceValue используется для создания функции once, и даже если функция once вызывается несколько раз, возвращается 1, то есть значение переменной c, которое было увеличено только один раз.

OnceValues

OnceValues работает так же, как и OnceValue, но может возвращать несколько значений.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}

В приведенном выше коде используется sync.OnceValues для увеличения переменной c на 1. В этом коде sync.OnceValues используется для создания функции once, и даже если функция once вызывается несколько раз, возвращается 1, то есть значение переменной c, которое было увеличено только один раз.

atomic

Пакет atomic — это пакет, предоставляющий атомарные операции. Пакет atomic предоставляет такие методы, как Add, CompareAndSwap, Load, Store, Swap, но в последнее время рекомендуется использовать типы Int64, Uint64, Pointer и другие.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Это пример, который использовался ранее. Это код, в котором переменная c атомарно увеличивается с использованием типа atomic.Int64. С помощью методов Add и Load можно атомарно увеличивать переменную и считывать ее значение. Также можно сохранять значения с помощью метода Store, менять значения местами с помощью метода Swap, а также сравнивать и менять значения, если они подходят, с помощью метода CompareAndSwap.

cond

sync.Cond

Пакет cond — это пакет, предоставляющий условные переменные. Пакет cond можно создать с помощью sync.Cond и предоставляет методы Wait, Signal, Broadcast.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}

Приведенный выше код использует sync.Cond для ожидания, пока переменная ready не станет true. В этом коде sync.Cond используется для ожидания, пока переменная ready не станет true, после чего выводится Ready!. Таким образом, используя sync.Cond, можно заставить несколько горутин одновременно ждать выполнения определенного условия.

Используя это, можно реализовать простую очередь queue.

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

Используя sync.Cond таким образом, можно эффективно ожидать и возобновлять работу при выполнении условия, вместо того чтобы использовать spin-lock, который потребляет много ресурсов процессора.

semaphore

golang.org/x/sync/semaphore.Semaphore

Пакет semaphore — это пакет, предоставляющий семафоры. Пакет semaphore можно создать с помощью golang.org/x/sync/semaphore.Semaphore и предоставляет методы Acquire, Release, TryAcquire.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}

Приведенный выше код использует semaphore для создания семафора, получения семафора с помощью метода Acquire и освобождения семафора с помощью метода Release. В этом коде показано, как получить и освободить семафор с помощью semaphore.

Заключение

Похоже, что на этом можно закончить с основными моментами. Надеюсь, что на основе содержания этой статьи вы поняли, как использовать горутины для управления параллелизмом, и сможете применять это на практике. Я надеюсь, что эта статья окажется вам полезной. Спасибо.