GoSuda

Go Concurrency Starter Pack

By snowmerak
views ...

Обзор

Краткое введение

Язык Go предоставляет множество инструментов для управления параллелизмом. В этой статье мы представим некоторые из них и связанные с ними приемы.

Горутина?

goroutine — это новая модель параллелизма, поддерживаемая языком Go. Обычно программа получает OS threads от ОС для одновременного выполнения нескольких задач и выполняет задачи параллельно в количестве, соответствующем числу ядер. Для обеспечения параллелизма меньшего масштаба создаются green threads в userland, и несколько green threads выполняют задачи в рамках одного OS thread. Однако в случае goroutine такие green threads сделаны еще меньше и эффективнее. Такие goroutine используют меньше памяти, чем threads, и могут создаваться и заменяться быстрее, чем threads.

Для использования goroutine достаточно применить ключевое слово go. Это позволяет интуитивно запускать синхронный код как асинхронный в процессе написания программы.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch) // Откладываем закрытие канала до завершения горутины
12        time.Sleep(1 * time.Second) // Задержка на 1 секунду
13        fmt.Println("Hello, World!") // Вывод сообщения
14    }()
15
16    fmt.Println("Waiting for goroutine...") // Вывод сообщения
17    for range ch {} // Ожидание закрытия канала горутиной
18}

Этот код просто изменяет синхронный код, который ждет 1 секунду, а затем выводит Hello, World!, на асинхронный поток. Хотя этот пример прост, при изменении более сложного синхронного кода на асинхронный читабельность, наглядность и понятность кода становятся лучше, чем при использовании таких подходов, как async/await или Promise.

Однако во многих случаях, если не понимать поток вызовов синхронного кода в асинхронном режиме и такие потоки, как fork & join (подобные разделяй и властвуй), может быть написан неоптимальный код goroutine. В этой статье мы представим несколько методов и приемов, которые помогут справиться с такими ситуациями.

Управление параллелизмом

context

Появление context в качестве первого метода управления может показаться неожиданным. Однако в языке Go context играет выдающуюся роль в управлении всем деревом задач, выходя за рамки простой функции отмены. Для тех, кто не знаком с этим, я кратко объясню этот пакет.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background()) // Создание контекста с функцией отмены
 5    defer cancel() // Отложенный вызов функции отмены
 6
 7    go func() {
 8        <-ctx.Done() // Ожидание сигнала отмены контекста
 9        fmt.Println("Context is done!") // Вывод сообщения после отмены контекста
10    }()
11
12    time.Sleep(1 * time.Second) // Задержка на 1 секунду
13
14    cancel() // Отмена контекста
15
16    time.Sleep(1 * time.Second) // Задержка на 1 секунду
17}

Приведенный выше код использует context для вывода Context is done! через 1 секунду. context позволяет проверить состояние отмены с помощью метода Done() и предоставляет различные методы отмены, такие как WithCancel, WithTimeout, WithDeadline, WithValue.

Давайте создадим простой пример. Предположим, вы пишете код для получения user, post, comment с использованием паттерна aggregator для извлечения данных. И если все запросы должны быть выполнены в течение 2 секунд, вы можете написать это следующим образом.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second) // Создание контекста с таймаутом в 2 секунды
 5    defer cancel() // Отложенный вызов функции отмены
 6
 7    ch := make(chan struct{}) // Создание канала для синхронизации
 8    go func() {
 9        defer close(ch) // Отложенное закрытие канала
10        user := getUser(ctx) // Получение пользователя
11        post := getPost(ctx) // Получение поста
12        comment := getComment(ctx) // Получение комментария
13
14        fmt.Println(user, post, comment) // Вывод полученных данных
15    }()
16
17    select {
18    case <-ctx.Done(): // Если контекст отменен (таймаут)
19        fmt.Println("Timeout!") // Вывод сообщения о таймауте
20    case <-ch: // Если все данные получены
21        fmt.Println("All data is fetched!") // Вывод сообщения об успешном получении данных
22    }
23}

Приведенный выше код выводит Timeout!, если все данные не получены в течение 2 секунд, и All data is fetched!, если все данные получены. Таким образом, используя context, вы можете легко управлять отменой и таймаутами даже в коде, где работают несколько goroutine.

Различные функции и методы, связанные с context, доступны по ссылке godoc context. Я надеюсь, что вы сможете легко изучить и использовать их.

channel

unbuffered channel

channel — это инструмент для связи между goroutine. channel можно создать с помощью make(chan T). В этом случае T — это тип данных, которые будет передавать channel. Данные можно отправлять и получать через channel с помощью <-, а channel можно закрыть с помощью close.

 1package main
 2
 3func main() {
 4    ch := make(chan int) // Создание небуферизованного канала типа int
 5    go func() {
 6        ch <- 1 // Отправка 1 в канал
 7        ch <- 2 // Отправка 2 в канал
 8        close(ch) // Закрытие канала
 9    }()
10
11    for i := range ch { // Итерация по значениям из канала
12        fmt.Println(i) // Вывод каждого значения
13    }
14}

Приведенный выше код использует channel для вывода 1 и 2. В этом коде показана только отправка и получение значений из channel. Однако channel предоставляет гораздо больше функций. Сначала мы рассмотрим buffered channel и unbuffered channel. Прежде чем начать, приведенный выше пример является unbuffered channel, и действия по отправке данных в канал и получению данных из него должны происходить одновременно. Если эти действия не происходят одновременно, может возникнуть deadlock.

buffered channel

Что, если приведенный выше код — это не просто вывод, а два процесса, выполняющие тяжелую работу? Если второй процесс застрянет на длительное время при чтении и обработке, первый процесс также остановится на это время. Мы можем использовать buffered channel, чтобы предотвратить такую ситуацию.

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2) // Создание буферизованного канала типа int с буфером размером 2
 5    go func() {
 6        ch <- 1 // Отправка 1 в канал
 7        ch <- 2 // Отправка 2 в канал
 8        close(ch) // Закрытие канала
 9    }()
10
11    for i := range ch { // Итерация по значениям из канала
12        fmt.Println(i) // Вывод каждого значения
13    }
14}

Приведенный выше код использует buffered channel для вывода 1 и 2. В этом коде buffered channel используется для того, чтобы действия по отправке данных в канал и получению данных из него не обязательно происходили одновременно. Добавление буфера в канал создает запас по длине, что позволяет предотвратить задержки в работе, вызванные влиянием последующих задач.

select

При работе с несколькими каналами оператор select позволяет легко реализовать структуру fan-in.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10) // Создание буферизованного канала ch1
10    ch2 := make(chan int, 10) // Создание буферизованного канала ch2
11    ch3 := make(chan int, 10) // Создание буферизованного канала ch3
12
13    go func() { // Горутина для ch1
14        for {
15            ch1 <- 1 // Отправка 1 в ch1
16            time.Sleep(1 * time.Second) // Задержка на 1 секунду
17        }
18    }()
19    go func() { // Горутина для ch2
20        for {
21            ch2 <- 2 // Отправка 2 в ch2
22            time.Sleep(2 * time.Second) // Задержка на 2 секунды
23        }
24    }()
25    go func() { // Горутина для ch3
26        for {
27            ch3 <- 3 // Отправка 3 в ch3
28            time.Sleep(3 * time.Second) // Задержка на 3 секунды
29        }
30    }()
31
32    for i := 0; i < 3; i++ { // Цикл для получения 3 значений
33        select {
34        case v := <-ch1: // Получение значения из ch1
35            fmt.Println(v) // Вывод значения
36        case v := <-ch2: // Получение значения из ch2
37            fmt.Println(v) // Вывод значения
38        case v := <-ch3: // Получение значения из ch3
39            fmt.Println(v) // Вывод значения
40        }
41    }
42}

Приведенный выше код создает 3 канала, которые периодически передают 1, 2, 3, и выводит значения, полученные из каналов с помощью select. Таким образом, используя select, вы можете одновременно получать данные из нескольких каналов и обрабатывать их по мере поступления.

for range

channel может легко получать данные с помощью for range. Если for range используется для канала, он будет работать каждый раз, когда данные добавляются в этот канал, и завершит цикл, когда канал будет закрыт.

 1package main
 2
 3func main() {
 4    ch := make(chan int) // Создание канала типа int
 5    go func() {
 6        ch <- 1 // Отправка 1 в канал
 7        ch <- 2 // Отправка 2 в канал
 8        close(ch) // Закрытие канала
 9    }()
10
11    for i := range ch { // Итерация по значениям из канала
12        fmt.Println(i) // Вывод каждого значения
13    }
14}

Приведенный выше код использует channel для вывода 1 и 2. В этом коде for range используется для получения и вывода данных каждый раз, когда данные добавляются в канал. А когда канал закрывается, цикл завершается.

Как упоминалось выше несколько раз, этот синтаксис можно использовать и для простой синхронизации.

 1package main
 2
 3func main() {
 4    ch := make(chan struct{}) // Создание канала для синхронизации
 5    go func() {
 6        defer close(ch) // Отложенное закрытие канала
 7        time.Sleep(1 * time.Second) // Задержка на 1 секунду
 8        fmt.Println("Hello, World!") // Вывод сообщения
 9    }()
10
11    fmt.Println("Waiting for goroutine...") // Вывод сообщения
12    for range ch {} // Ожидание закрытия канала
13}

Приведенный выше код ждет 1 секунду, а затем выводит Hello, World!. В этом коде channel используется для преобразования синхронного кода в асинхронный. Таким образом, используя channel, вы можете легко преобразовать синхронный код в асинхронный и установить точки join.

etc

  1. Если данные отправляются или получаются из nil channel, может возникнуть deadlock из-за бесконечного цикла.
  2. Отправка данных в закрытый канал вызывает panic.
  3. Канал закрывается сборщиком мусора, даже если он не закрыт явным образом.

mutex

spinlock

spinlock — это метод синхронизации, который постоянно пытается получить блокировку в цикле. В языке Go можно легко реализовать spinlock с помощью указателей.

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) { // Попытка установить блокировку, пока не удастся
14        runtime.Gosched() // Уступает выполнение другим горутинам
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0) // Снимает блокировку
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{} // Создает и возвращает новый SpinLock
24}

Приведенный выше код реализует пакет spinlock. В этом коде SpinLock реализован с использованием пакета sync/atomic. Метод Lock пытается получить блокировку с помощью atomic.CompareAndSwapUintptr, а метод Unlock снимает блокировку с помощью atomic.StoreUintptr. Этот подход постоянно пытается получить блокировку, поэтому он непрерывно использует CPU до получения блокировки, что может привести к бесконечному циклу. Следовательно, spinlock рекомендуется использовать для простой синхронизации или в течение короткого времени.

sync.Mutex

mutex — это инструмент для синхронизации между goroutine. mutex, предоставляемый пакетом sync, предлагает методы, такие как Lock, Unlock, RLock, RUnlock. mutex можно создать с помощью sync.Mutex, а также использовать блокировку чтения/записи с помощью sync.RWMutex.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex // Объявление мьютекса
 9    var count int // Объявление переменной счетчика
10
11    go func() { // Запуск горутины
12        mu.Lock() // Захват мьютекса
13        count++ // Увеличение счетчика
14        mu.Unlock() // Освобождение мьютекса
15    }()
16
17    mu.Lock() // Захват мьютекса
18    count++ // Увеличение счетчика
19    mu.Unlock() // Освобождение мьютекса
20
21    println(count) // Вывод значения счетчика
22}

В приведенном выше коде две goroutine почти одновременно обращаются к одной и той же переменной count. В этом случае, если мы сделаем код, обращающийся к переменной count, критической секцией с помощью mutex, мы сможем предотвратить одновременный доступ к переменной count. Тогда этот код будет выводить 2 каждый раз, независимо от того, сколько раз он будет выполнен.

sync.RWMutex

sync.RWMutex — это mutex, который позволяет использовать блокировки чтения и записи отдельно. Методы RLock и RUnlock используются для установки и снятия блокировки чтения.

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex // Встраивание RWMutex для обеспечения конкурентного доступа
 9    data map[K]V // Карта для хранения данных
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock() // Захват блокировки чтения
14    defer m.RUnlock() // Освобождение блокировки чтения после завершения функции
15
16    value, ok := m.data[key] // Получение значения из карты
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock() // Захват блокировки записи
22    defer m.Unlock() // Освобождение блокировки записи после завершения функции
23
24    m.data[key] = value // Установка значения в карту
25}

Приведенный выше код реализует ConcurrentMap с использованием sync.RWMutex. В этом коде метод Get устанавливает блокировку чтения, а метод Set устанавливает блокировку записи, что позволяет безопасно получать доступ и изменять карту data. Блокировка чтения необходима для того, чтобы, когда есть много операций чтения, несколько goroutine могли одновременно выполнять операции чтения, не устанавливая блокировку записи, а только блокировку чтения. Таким образом, производительность может быть улучшена за счет использования только блокировки чтения, когда нет изменения состояния, и нет необходимости в блокировке записи.

fakelock

fakelock — это простой трюк, реализующий sync.Locker. Эта структура предоставляет те же методы, что и sync.Mutex, но фактически не выполняет никаких действий.

1package fakelock
2
3type FakeLock struct{} // Объявление пустой структуры FakeLock
4
5func (f *FakeLock) Lock() {} // Метод Lock, не выполняющий никаких действий
6
7func (f *FakeLock) Unlock() {} // Метод Unlock, не выполняющий никаких действий

Приведенный выше код реализует пакет fakelock. Этот пакет реализует sync.Locker, предоставляя методы Lock и Unlock, но фактически не выполняет никаких действий. Зачем нужен такой код, будет рассказано, если представится возможность.

waitgroup

sync.WaitGroup

sync.WaitGroup — это инструмент для ожидания завершения всех задач goroutine. Он предоставляет методы Add, Done, Wait. Метод Add добавляет количество goroutine, метод Done сигнализирует о завершении задачи goroutine. А метод Wait ждет завершения всех задач goroutine.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{} // Создание WaitGroup
10    c := atomic.Int64{} // Создание атомарного счетчика
11
12    for i := 0; i < 100 ; i++ { // Цикл для запуска 100 горутин
13        wg.Add(1) // Увеличение счетчика WaitGroup
14        go func() { // Запуск горутины
15            defer wg.Done() // Уменьшение счетчика WaitGroup после завершения горутины
16            c.Add(1) // Атомарное увеличение счетчика
17        }()
18    }
19
20    wg.Wait() // Ожидание завершения всех горутин
21    println(c.Load()) // Вывод значения счетчика
22}

Приведенный выше код использует sync.WaitGroup для того, чтобы 100 goroutine одновременно увеличивали значение переменной c. В этом коде sync.WaitGroup используется для ожидания завершения всех goroutine, а затем выводится значение, добавленное к переменной c. Для fork & join нескольких задач достаточно использовать только каналы, но для fork & join большого количества задач sync.WaitGroup также является хорошим выбором.

with slice

При использовании со слайсами waitgroup может стать отличным инструментом для управления параллельными задачами без блокировок.

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup // Объявление WaitGroup
11	arr := [10]int{} // Объявление массива из 10 целых чисел
12
13	for i := 0; i < 10; i++ { // Цикл для запуска 10 горутин
14		wg.Add(1) // Увеличение счетчика WaitGroup
15		go func(id int) { // Запуск горутины с передачей id
16			defer wg.Done() // Уменьшение счетчика WaitGroup после завершения горутины
17
18			arr[id] = rand.Intn(100) // Запись случайного числа в массив по индексу id
19		}(i)
20	}
21
22	wg.Wait() // Ожидание завершения всех горутин
23	fmt.Println("Done") // Вывод сообщения о завершении
24
25    for i, v := range arr { // Итерация по массиву и вывод значений
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

Приведенный выше код использует только waitgroup для одновременного создания 10 случайных целых чисел каждой goroutine и сохранения их в назначенном индексе. В этом коде waitgroup используется для ожидания завершения всех goroutine, а затем выводится Done. Таким образом, используя waitgroup, несколько goroutine могут одновременно выполнять задачи, сохранять данные без блокировок до завершения всех goroutine, а затем выполнять пакетную постобработку.

golang.org/x/sync/errgroup.ErrGroup

errgroup — это пакет, расширяющий sync.WaitGroup. В отличие от sync.WaitGroup, errgroup отменяет все goroutine и возвращает ошибку, если хотя бы одна из задач goroutine завершается с ошибкой.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background()) // Создание errgroup с контекстом
11    _ = ctx // Игнорирование переменной ctx
12
13    for i := 0; i < 10; i++ { // Цикл для запуска 10 горутин
14        i := i // Создание локальной копии i
15        g.Go(func() error { // Запуск горутины
16            if i == 5 { // Если i равно 5, возвращаем ошибку
17                return fmt.Errorf("error")
18            }
19            return nil // В противном случае возвращаем nil
20        })
21    }
22
23    if err := g.Wait(); err != nil { // Ожидание завершения всех горутин и проверка на ошибку
24        fmt.Println(err) // Вывод ошибки, если она есть
25    }
26}

Приведенный выше код использует errgroup для создания 10 goroutine и вызывает ошибку в 5-й goroutine. Я намеренно показал случай возникновения ошибки в пятой goroutine. Однако при реальном использовании errgroup для создания goroutine и обработки различных ошибок, возникающих в каждой goroutine, вы можете использовать его для выполнения различных последующих действий.

once

Инструмент для однократного выполнения кода. Соответствующий код можно запустить с помощью приведенных ниже конструкторов.

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc просто гарантирует, что данная функция будет выполнена ровно один раз за все время работы программы.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() { // Создание функции once, которая будет выполнена только один раз
 7        println("Hello, World!") // Вывод сообщения
 8    })
 9
10    once() // Первый вызов
11    once() // Второй вызов (не будет выполнен)
12    once() // Третий вызов (не будет выполнен)
13    once() // Четвертый вызов (не будет выполнен)
14    once() // Пятый вызов (не будет выполнен)
15}

Приведенный выше код использует sync.OnceFunc для вывода Hello, World!. В этом коде sync.OnceFunc используется для создания функции once, и даже если функция once вызывается несколько раз, Hello, World! выводится только один раз.

OnceValue

OnceValue не просто выполняет данную функцию один раз за все время, но и сохраняет возвращаемое значение этой функции, чтобы при повторном вызове вернуть сохраненное значение.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0 // Инициализация счетчика
 7    once := sync.OnceValue(func() int { // Создание функции once, которая будет выполнена только один раз и вернет значение
 8        c += 1 // Увеличение счетчика
 9        return c // Возврат значения счетчика
10    })
11
12    println(once()) // Первый вызов, выведет 1
13    println(once()) // Второй вызов, выведет 1 (возвращает сохраненное значение)
14    println(once()) // Третий вызов, выведет 1
15    println(once()) // Четвертый вызов, выведет 1
16    println(once()) // Пятый вызов, выведет 1
17}

Приведенный выше код использует sync.OnceValue для увеличения переменной c на 1. В этом коде sync.OnceValue используется для создания функции once, и даже если функция once вызывается несколько раз, переменная c увеличивается только один раз и возвращает 1.

OnceValues

OnceValues работает так же, как OnceValue, но может возвращать несколько значений.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0 // Инициализация счетчика
 7    once := sync.OnceValues(func() (int, int) { // Создание функции once, которая будет выполнена только один раз и вернет два значения
 8        c += 1 // Увеличение счетчика
 9        return c, c // Возврат двух значений счетчика
10    })
11
12    a, b := once() // Первый вызов
13    println(a, b) // Выведет 1 1
14    a, b = once() // Второй вызов (возвращает сохраненные значения)
15    println(a, b) // Выведет 1 1
16    a, b = once() // Третий вызов
17    println(a, b) // Выведет 1 1
18    a, b = once() // Четвертый вызов
19    println(a, b) // Выведет 1 1
20    a, b = once() // Пятый вызов
21    println(a, b) // Выведет 1 1
22}

Приведенный выше код использует sync.OnceValues для увеличения переменной c на 1. В этом коде sync.OnceValues используется для создания функции once, и даже если функция once вызывается несколько раз, переменная c увеличивается только один раз и возвращает 1.

atomic

Пакет atomic предоставляет атомарные операции. Пакет atomic предоставляет такие методы, как Add, CompareAndSwap, Load, Store, Swap, но в последнее время рекомендуется использовать такие типы, как Int64, Uint64, Pointer.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{} // Создание WaitGroup
10    c := atomic.Int64{} // Создание атомарного счетчика
11
12    for i := 0; i < 100 ; i++ { // Цикл для запуска 100 горутин
13        wg.Add(1) // Увеличение счетчика WaitGroup
14        go func() { // Запуск горутины
15            defer wg.Done() // Уменьшение счетчика WaitGroup после завершения горутины
16            c.Add(1) // Атомарное увеличение счетчика
17        }()
18    }
19
20    wg.Wait() // Ожидание завершения всех горутин
21    println(c.Load()) // Вывод значения счетчика
22}

Это пример, который использовался ранее. Код атомарно увеличивает переменную c с использованием типа atomic.Int64. Методы Add и Load позволяют атомарно увеличивать и считывать переменную. Кроме того, метод Store может сохранять значения, метод Swap может заменять значения, а метод CompareAndSwap может сравнивать значения и заменять их, если они подходят.

cond

sync.Cond

Пакет cond предоставляет условные переменные. Пакет cond можно создать с помощью sync.Cond и он предоставляет методы Wait, Signal, Broadcast.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{}) // Создание условной переменной с мьютексом
 9    ready := false // Флаг готовности
10
11    go func() { // Запуск горутины
12        c.L.Lock() // Захват мьютекса условной переменной
13        ready = true // Установка флага готовности в true
14        c.Signal() // Отправка сигнала одной ожидающей горутине
15        c.L.Unlock() // Освобождение мьютекса условной переменной
16    }()
17
18    c.L.Lock() // Захват мьютекса условной переменной
19    for !ready { // Цикл ожидания, пока ready не станет true
20        c.Wait() // Ожидание сигнала, освобождение мьютекса и повторный захват после получения сигнала
21    }
22    c.L.Unlock() // Освобождение мьютекса условной переменной
23
24    println("Ready!") // Вывод сообщения
25}

Приведенный выше код использует sync.Cond для ожидания, пока переменная ready не станет true. В этом коде sync.Cond используется для ожидания, пока переменная ready не станет true, а затем выводится Ready!. Таким образом, используя sync.Cond, вы можете заставить несколько goroutine ждать, пока не будет выполнено определенное условие.

Используя это, можно реализовать простую queue.

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{} // Создание нового экземпляра Queue
23    q.Cond = sync.NewCond(&q.Mutex) // Инициализация Cond с мьютексом Queue
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock() // Захват мьютекса
29    defer q.Unlock() // Освобождение мьютекса после завершения функции
30
31    node := &Node[T]{Value: value} // Создание нового узла
32    if q.Len == 0 { // Если очередь пуста
33        q.Head = node // Устанавливаем новый узел как голову
34        q.Tail = node // Устанавливаем новый узел как хвост
35    } else { // Если очередь не пуста
36        q.Tail.Next = node // Добавляем новый узел после текущего хвоста
37        q.Tail = node // Обновляем хвост
38    }
39    q.Len++ // Увеличиваем длину очереди
40    q.Cond.Signal() // Отправляем сигнал одной ожидающей горутине
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock() // Захват мьютекса
45    defer q.Unlock() // Освобождение мьютекса после завершения функции
46
47    for q.Len == 0 { // Цикл ожидания, пока очередь не станет пустой
48        q.Cond.Wait() // Ожидание сигнала, освобождение мьютекса и повторный захват после получения сигнала
49    }
50
51    node := q.Head // Получаем головной узел
52    q.Head = q.Head.Next // Перемещаем голову на следующий узел
53    q.Len-- // Уменьшаем длину очереди
54    return node.Value // Возвращаем значение из извлеченного узла
55}

Используя sync.Cond таким образом, вы можете эффективно ждать и возобновлять работу, когда условие будет выполнено, вместо того чтобы использовать spin-lock, который потребляет много ресурсов CPU.

semaphore

golang.org/x/sync/semaphore.Semaphore

Пакет semaphore предоставляет семафоры. Семафор можно создать с помощью golang.org/x/sync/semaphore.Semaphore, и он предоставляет методы Acquire, Release, TryAcquire.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1) // Создание взвешенного семафора с весом 1
11
12    if s.TryAcquire(1) { // Попытка захватить 1 единицу веса
13        fmt.Println("Acquired!") // Вывод сообщения о захвате
14    } else {
15        fmt.Println("Not Acquired!") // Вывод сообщения о незахвате
16    }
17
18    s.Release(1) // Освобождение 1 единицы веса
19}

Приведенный выше код использует semaphore для создания семафора, а затем использует методы Acquire для получения семафора и Release для его освобождения. В этом коде я показал, как получить и освободить семафор с помощью semaphore.

В заключение

Я думаю, что основных сведений достаточно. Надеюсь, что на основе содержания этой статьи вы сможете понять, как управлять параллелизмом с помощью goroutine, и сможете применять это на практике. Надеюсь, эта статья была вам полезна. Спасибо.