Go Concurrency Starter Pack
Обзор
Краткое введение
Язык Go предоставляет множество инструментов для управления параллелизмом. В этой статье мы представим некоторые из них и связанные с ними приемы.
Горутина?
goroutine — это новая модель параллелизма, поддерживаемая языком Go. Обычно программа получает OS threads от ОС для одновременного выполнения нескольких задач и выполняет задачи параллельно в количестве, соответствующем числу ядер. Для обеспечения параллелизма меньшего масштаба создаются green threads в userland, и несколько green threads выполняют задачи в рамках одного OS thread. Однако в случае goroutine такие green threads сделаны еще меньше и эффективнее. Такие goroutine используют меньше памяти, чем threads, и могут создаваться и заменяться быстрее, чем threads.
Для использования goroutine достаточно применить ключевое слово go. Это позволяет интуитивно запускать синхронный код как асинхронный в процессе написания программы.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch) // Откладываем закрытие канала до завершения горутины
12 time.Sleep(1 * time.Second) // Задержка на 1 секунду
13 fmt.Println("Hello, World!") // Вывод сообщения
14 }()
15
16 fmt.Println("Waiting for goroutine...") // Вывод сообщения
17 for range ch {} // Ожидание закрытия канала горутиной
18}
Этот код просто изменяет синхронный код, который ждет 1 секунду, а затем выводит Hello, World!, на асинхронный поток. Хотя этот пример прост, при изменении более сложного синхронного кода на асинхронный читабельность, наглядность и понятность кода становятся лучше, чем при использовании таких подходов, как async/await или Promise.
Однако во многих случаях, если не понимать поток вызовов синхронного кода в асинхронном режиме и такие потоки, как fork & join (подобные разделяй и властвуй), может быть написан неоптимальный код goroutine. В этой статье мы представим несколько методов и приемов, которые помогут справиться с такими ситуациями.
Управление параллелизмом
context
Появление context в качестве первого метода управления может показаться неожиданным. Однако в языке Go context играет выдающуюся роль в управлении всем деревом задач, выходя за рамки простой функции отмены. Для тех, кто не знаком с этим, я кратко объясню этот пакет.
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background()) // Создание контекста с функцией отмены
5 defer cancel() // Отложенный вызов функции отмены
6
7 go func() {
8 <-ctx.Done() // Ожидание сигнала отмены контекста
9 fmt.Println("Context is done!") // Вывод сообщения после отмены контекста
10 }()
11
12 time.Sleep(1 * time.Second) // Задержка на 1 секунду
13
14 cancel() // Отмена контекста
15
16 time.Sleep(1 * time.Second) // Задержка на 1 секунду
17}
Приведенный выше код использует context для вывода Context is done! через 1 секунду. context позволяет проверить состояние отмены с помощью метода Done() и предоставляет различные методы отмены, такие как WithCancel, WithTimeout, WithDeadline, WithValue.
Давайте создадим простой пример. Предположим, вы пишете код для получения user, post, comment с использованием паттерна aggregator для извлечения данных. И если все запросы должны быть выполнены в течение 2 секунд, вы можете написать это следующим образом.
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second) // Создание контекста с таймаутом в 2 секунды
5 defer cancel() // Отложенный вызов функции отмены
6
7 ch := make(chan struct{}) // Создание канала для синхронизации
8 go func() {
9 defer close(ch) // Отложенное закрытие канала
10 user := getUser(ctx) // Получение пользователя
11 post := getPost(ctx) // Получение поста
12 comment := getComment(ctx) // Получение комментария
13
14 fmt.Println(user, post, comment) // Вывод полученных данных
15 }()
16
17 select {
18 case <-ctx.Done(): // Если контекст отменен (таймаут)
19 fmt.Println("Timeout!") // Вывод сообщения о таймауте
20 case <-ch: // Если все данные получены
21 fmt.Println("All data is fetched!") // Вывод сообщения об успешном получении данных
22 }
23}
Приведенный выше код выводит Timeout!, если все данные не получены в течение 2 секунд, и All data is fetched!, если все данные получены. Таким образом, используя context, вы можете легко управлять отменой и таймаутами даже в коде, где работают несколько goroutine.
Различные функции и методы, связанные с context, доступны по ссылке godoc context. Я надеюсь, что вы сможете легко изучить и использовать их.
channel
unbuffered channel
channel — это инструмент для связи между goroutine. channel можно создать с помощью make(chan T). В этом случае T — это тип данных, которые будет передавать channel. Данные можно отправлять и получать через channel с помощью <-, а channel можно закрыть с помощью close.
1package main
2
3func main() {
4 ch := make(chan int) // Создание небуферизованного канала типа int
5 go func() {
6 ch <- 1 // Отправка 1 в канал
7 ch <- 2 // Отправка 2 в канал
8 close(ch) // Закрытие канала
9 }()
10
11 for i := range ch { // Итерация по значениям из канала
12 fmt.Println(i) // Вывод каждого значения
13 }
14}
Приведенный выше код использует channel для вывода 1 и 2. В этом коде показана только отправка и получение значений из channel. Однако channel предоставляет гораздо больше функций. Сначала мы рассмотрим buffered channel и unbuffered channel. Прежде чем начать, приведенный выше пример является unbuffered channel, и действия по отправке данных в канал и получению данных из него должны происходить одновременно. Если эти действия не происходят одновременно, может возникнуть deadlock.
buffered channel
Что, если приведенный выше код — это не просто вывод, а два процесса, выполняющие тяжелую работу? Если второй процесс застрянет на длительное время при чтении и обработке, первый процесс также остановится на это время. Мы можем использовать buffered channel, чтобы предотвратить такую ситуацию.
1package main
2
3func main() {
4 ch := make(chan int, 2) // Создание буферизованного канала типа int с буфером размером 2
5 go func() {
6 ch <- 1 // Отправка 1 в канал
7 ch <- 2 // Отправка 2 в канал
8 close(ch) // Закрытие канала
9 }()
10
11 for i := range ch { // Итерация по значениям из канала
12 fmt.Println(i) // Вывод каждого значения
13 }
14}
Приведенный выше код использует buffered channel для вывода 1 и 2. В этом коде buffered channel используется для того, чтобы действия по отправке данных в канал и получению данных из него не обязательно происходили одновременно. Добавление буфера в канал создает запас по длине, что позволяет предотвратить задержки в работе, вызванные влиянием последующих задач.
select
При работе с несколькими каналами оператор select позволяет легко реализовать структуру fan-in.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10) // Создание буферизованного канала ch1
10 ch2 := make(chan int, 10) // Создание буферизованного канала ch2
11 ch3 := make(chan int, 10) // Создание буферизованного канала ch3
12
13 go func() { // Горутина для ch1
14 for {
15 ch1 <- 1 // Отправка 1 в ch1
16 time.Sleep(1 * time.Second) // Задержка на 1 секунду
17 }
18 }()
19 go func() { // Горутина для ch2
20 for {
21 ch2 <- 2 // Отправка 2 в ch2
22 time.Sleep(2 * time.Second) // Задержка на 2 секунды
23 }
24 }()
25 go func() { // Горутина для ch3
26 for {
27 ch3 <- 3 // Отправка 3 в ch3
28 time.Sleep(3 * time.Second) // Задержка на 3 секунды
29 }
30 }()
31
32 for i := 0; i < 3; i++ { // Цикл для получения 3 значений
33 select {
34 case v := <-ch1: // Получение значения из ch1
35 fmt.Println(v) // Вывод значения
36 case v := <-ch2: // Получение значения из ch2
37 fmt.Println(v) // Вывод значения
38 case v := <-ch3: // Получение значения из ch3
39 fmt.Println(v) // Вывод значения
40 }
41 }
42}
Приведенный выше код создает 3 канала, которые периодически передают 1, 2, 3, и выводит значения, полученные из каналов с помощью select. Таким образом, используя select, вы можете одновременно получать данные из нескольких каналов и обрабатывать их по мере поступления.
for range
channel может легко получать данные с помощью for range. Если for range используется для канала, он будет работать каждый раз, когда данные добавляются в этот канал, и завершит цикл, когда канал будет закрыт.
1package main
2
3func main() {
4 ch := make(chan int) // Создание канала типа int
5 go func() {
6 ch <- 1 // Отправка 1 в канал
7 ch <- 2 // Отправка 2 в канал
8 close(ch) // Закрытие канала
9 }()
10
11 for i := range ch { // Итерация по значениям из канала
12 fmt.Println(i) // Вывод каждого значения
13 }
14}
Приведенный выше код использует channel для вывода 1 и 2. В этом коде for range используется для получения и вывода данных каждый раз, когда данные добавляются в канал. А когда канал закрывается, цикл завершается.
Как упоминалось выше несколько раз, этот синтаксис можно использовать и для простой синхронизации.
1package main
2
3func main() {
4 ch := make(chan struct{}) // Создание канала для синхронизации
5 go func() {
6 defer close(ch) // Отложенное закрытие канала
7 time.Sleep(1 * time.Second) // Задержка на 1 секунду
8 fmt.Println("Hello, World!") // Вывод сообщения
9 }()
10
11 fmt.Println("Waiting for goroutine...") // Вывод сообщения
12 for range ch {} // Ожидание закрытия канала
13}
Приведенный выше код ждет 1 секунду, а затем выводит Hello, World!. В этом коде channel используется для преобразования синхронного кода в асинхронный. Таким образом, используя channel, вы можете легко преобразовать синхронный код в асинхронный и установить точки join.
etc
- Если данные отправляются или получаются из nil channel, может возникнуть deadlock из-за бесконечного цикла.
- Отправка данных в закрытый канал вызывает panic.
- Канал закрывается сборщиком мусора, даже если он не закрыт явным образом.
mutex
spinlock
spinlock — это метод синхронизации, который постоянно пытается получить блокировку в цикле. В языке Go можно легко реализовать spinlock с помощью указателей.
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) { // Попытка установить блокировку, пока не удастся
14 runtime.Gosched() // Уступает выполнение другим горутинам
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0) // Снимает блокировку
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{} // Создает и возвращает новый SpinLock
24}
Приведенный выше код реализует пакет spinlock. В этом коде SpinLock реализован с использованием пакета sync/atomic. Метод Lock пытается получить блокировку с помощью atomic.CompareAndSwapUintptr, а метод Unlock снимает блокировку с помощью atomic.StoreUintptr. Этот подход постоянно пытается получить блокировку, поэтому он непрерывно использует CPU до получения блокировки, что может привести к бесконечному циклу. Следовательно, spinlock рекомендуется использовать для простой синхронизации или в течение короткого времени.
sync.Mutex
mutex — это инструмент для синхронизации между goroutine. mutex, предоставляемый пакетом sync, предлагает методы, такие как Lock, Unlock, RLock, RUnlock. mutex можно создать с помощью sync.Mutex, а также использовать блокировку чтения/записи с помощью sync.RWMutex.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex // Объявление мьютекса
9 var count int // Объявление переменной счетчика
10
11 go func() { // Запуск горутины
12 mu.Lock() // Захват мьютекса
13 count++ // Увеличение счетчика
14 mu.Unlock() // Освобождение мьютекса
15 }()
16
17 mu.Lock() // Захват мьютекса
18 count++ // Увеличение счетчика
19 mu.Unlock() // Освобождение мьютекса
20
21 println(count) // Вывод значения счетчика
22}
В приведенном выше коде две goroutine почти одновременно обращаются к одной и той же переменной count. В этом случае, если мы сделаем код, обращающийся к переменной count, критической секцией с помощью mutex, мы сможем предотвратить одновременный доступ к переменной count. Тогда этот код будет выводить 2 каждый раз, независимо от того, сколько раз он будет выполнен.
sync.RWMutex
sync.RWMutex — это mutex, который позволяет использовать блокировки чтения и записи отдельно. Методы RLock и RUnlock используются для установки и снятия блокировки чтения.
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex // Встраивание RWMutex для обеспечения конкурентного доступа
9 data map[K]V // Карта для хранения данных
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock() // Захват блокировки чтения
14 defer m.RUnlock() // Освобождение блокировки чтения после завершения функции
15
16 value, ok := m.data[key] // Получение значения из карты
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock() // Захват блокировки записи
22 defer m.Unlock() // Освобождение блокировки записи после завершения функции
23
24 m.data[key] = value // Установка значения в карту
25}
Приведенный выше код реализует ConcurrentMap с использованием sync.RWMutex. В этом коде метод Get устанавливает блокировку чтения, а метод Set устанавливает блокировку записи, что позволяет безопасно получать доступ и изменять карту data. Блокировка чтения необходима для того, чтобы, когда есть много операций чтения, несколько goroutine могли одновременно выполнять операции чтения, не устанавливая блокировку записи, а только блокировку чтения. Таким образом, производительность может быть улучшена за счет использования только блокировки чтения, когда нет изменения состояния, и нет необходимости в блокировке записи.
fakelock
fakelock — это простой трюк, реализующий sync.Locker. Эта структура предоставляет те же методы, что и sync.Mutex, но фактически не выполняет никаких действий.
1package fakelock
2
3type FakeLock struct{} // Объявление пустой структуры FakeLock
4
5func (f *FakeLock) Lock() {} // Метод Lock, не выполняющий никаких действий
6
7func (f *FakeLock) Unlock() {} // Метод Unlock, не выполняющий никаких действий
Приведенный выше код реализует пакет fakelock. Этот пакет реализует sync.Locker, предоставляя методы Lock и Unlock, но фактически не выполняет никаких действий. Зачем нужен такой код, будет рассказано, если представится возможность.
waitgroup
sync.WaitGroup
sync.WaitGroup — это инструмент для ожидания завершения всех задач goroutine. Он предоставляет методы Add, Done, Wait. Метод Add добавляет количество goroutine, метод Done сигнализирует о завершении задачи goroutine. А метод Wait ждет завершения всех задач goroutine.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{} // Создание WaitGroup
10 c := atomic.Int64{} // Создание атомарного счетчика
11
12 for i := 0; i < 100 ; i++ { // Цикл для запуска 100 горутин
13 wg.Add(1) // Увеличение счетчика WaitGroup
14 go func() { // Запуск горутины
15 defer wg.Done() // Уменьшение счетчика WaitGroup после завершения горутины
16 c.Add(1) // Атомарное увеличение счетчика
17 }()
18 }
19
20 wg.Wait() // Ожидание завершения всех горутин
21 println(c.Load()) // Вывод значения счетчика
22}
Приведенный выше код использует sync.WaitGroup для того, чтобы 100 goroutine одновременно увеличивали значение переменной c. В этом коде sync.WaitGroup используется для ожидания завершения всех goroutine, а затем выводится значение, добавленное к переменной c. Для fork & join нескольких задач достаточно использовать только каналы, но для fork & join большого количества задач sync.WaitGroup также является хорошим выбором.
with slice
При использовании со слайсами waitgroup может стать отличным инструментом для управления параллельными задачами без блокировок.
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup // Объявление WaitGroup
11 arr := [10]int{} // Объявление массива из 10 целых чисел
12
13 for i := 0; i < 10; i++ { // Цикл для запуска 10 горутин
14 wg.Add(1) // Увеличение счетчика WaitGroup
15 go func(id int) { // Запуск горутины с передачей id
16 defer wg.Done() // Уменьшение счетчика WaitGroup после завершения горутины
17
18 arr[id] = rand.Intn(100) // Запись случайного числа в массив по индексу id
19 }(i)
20 }
21
22 wg.Wait() // Ожидание завершения всех горутин
23 fmt.Println("Done") // Вывод сообщения о завершении
24
25 for i, v := range arr { // Итерация по массиву и вывод значений
26 fmt.Printf("arr[%d] = %d\n", i, v)
27 }
28}
Приведенный выше код использует только waitgroup для одновременного создания 10 случайных целых чисел каждой goroutine и сохранения их в назначенном индексе. В этом коде waitgroup используется для ожидания завершения всех goroutine, а затем выводится Done. Таким образом, используя waitgroup, несколько goroutine могут одновременно выполнять задачи, сохранять данные без блокировок до завершения всех goroutine, а затем выполнять пакетную постобработку.
golang.org/x/sync/errgroup.ErrGroup
errgroup — это пакет, расширяющий sync.WaitGroup. В отличие от sync.WaitGroup, errgroup отменяет все goroutine и возвращает ошибку, если хотя бы одна из задач goroutine завершается с ошибкой.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background()) // Создание errgroup с контекстом
11 _ = ctx // Игнорирование переменной ctx
12
13 for i := 0; i < 10; i++ { // Цикл для запуска 10 горутин
14 i := i // Создание локальной копии i
15 g.Go(func() error { // Запуск горутины
16 if i == 5 { // Если i равно 5, возвращаем ошибку
17 return fmt.Errorf("error")
18 }
19 return nil // В противном случае возвращаем nil
20 })
21 }
22
23 if err := g.Wait(); err != nil { // Ожидание завершения всех горутин и проверка на ошибку
24 fmt.Println(err) // Вывод ошибки, если она есть
25 }
26}
Приведенный выше код использует errgroup для создания 10 goroutine и вызывает ошибку в 5-й goroutine. Я намеренно показал случай возникновения ошибки в пятой goroutine. Однако при реальном использовании errgroup для создания goroutine и обработки различных ошибок, возникающих в каждой goroutine, вы можете использовать его для выполнения различных последующих действий.
once
Инструмент для однократного выполнения кода. Соответствующий код можно запустить с помощью приведенных ниже конструкторов.
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFunc просто гарантирует, что данная функция будет выполнена ровно один раз за все время работы программы.
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() { // Создание функции once, которая будет выполнена только один раз
7 println("Hello, World!") // Вывод сообщения
8 })
9
10 once() // Первый вызов
11 once() // Второй вызов (не будет выполнен)
12 once() // Третий вызов (не будет выполнен)
13 once() // Четвертый вызов (не будет выполнен)
14 once() // Пятый вызов (не будет выполнен)
15}
Приведенный выше код использует sync.OnceFunc для вывода Hello, World!. В этом коде sync.OnceFunc используется для создания функции once, и даже если функция once вызывается несколько раз, Hello, World! выводится только один раз.
OnceValue
OnceValue не просто выполняет данную функцию один раз за все время, но и сохраняет возвращаемое значение этой функции, чтобы при повторном вызове вернуть сохраненное значение.
1package main
2
3import "sync"
4
5func main() {
6 c := 0 // Инициализация счетчика
7 once := sync.OnceValue(func() int { // Создание функции once, которая будет выполнена только один раз и вернет значение
8 c += 1 // Увеличение счетчика
9 return c // Возврат значения счетчика
10 })
11
12 println(once()) // Первый вызов, выведет 1
13 println(once()) // Второй вызов, выведет 1 (возвращает сохраненное значение)
14 println(once()) // Третий вызов, выведет 1
15 println(once()) // Четвертый вызов, выведет 1
16 println(once()) // Пятый вызов, выведет 1
17}
Приведенный выше код использует sync.OnceValue для увеличения переменной c на 1. В этом коде sync.OnceValue используется для создания функции once, и даже если функция once вызывается несколько раз, переменная c увеличивается только один раз и возвращает 1.
OnceValues
OnceValues работает так же, как OnceValue, но может возвращать несколько значений.
1package main
2
3import "sync"
4
5func main() {
6 c := 0 // Инициализация счетчика
7 once := sync.OnceValues(func() (int, int) { // Создание функции once, которая будет выполнена только один раз и вернет два значения
8 c += 1 // Увеличение счетчика
9 return c, c // Возврат двух значений счетчика
10 })
11
12 a, b := once() // Первый вызов
13 println(a, b) // Выведет 1 1
14 a, b = once() // Второй вызов (возвращает сохраненные значения)
15 println(a, b) // Выведет 1 1
16 a, b = once() // Третий вызов
17 println(a, b) // Выведет 1 1
18 a, b = once() // Четвертый вызов
19 println(a, b) // Выведет 1 1
20 a, b = once() // Пятый вызов
21 println(a, b) // Выведет 1 1
22}
Приведенный выше код использует sync.OnceValues для увеличения переменной c на 1. В этом коде sync.OnceValues используется для создания функции once, и даже если функция once вызывается несколько раз, переменная c увеличивается только один раз и возвращает 1.
atomic
Пакет atomic предоставляет атомарные операции. Пакет atomic предоставляет такие методы, как Add, CompareAndSwap, Load, Store, Swap, но в последнее время рекомендуется использовать такие типы, как Int64, Uint64, Pointer.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{} // Создание WaitGroup
10 c := atomic.Int64{} // Создание атомарного счетчика
11
12 for i := 0; i < 100 ; i++ { // Цикл для запуска 100 горутин
13 wg.Add(1) // Увеличение счетчика WaitGroup
14 go func() { // Запуск горутины
15 defer wg.Done() // Уменьшение счетчика WaitGroup после завершения горутины
16 c.Add(1) // Атомарное увеличение счетчика
17 }()
18 }
19
20 wg.Wait() // Ожидание завершения всех горутин
21 println(c.Load()) // Вывод значения счетчика
22}
Это пример, который использовался ранее. Код атомарно увеличивает переменную c с использованием типа atomic.Int64. Методы Add и Load позволяют атомарно увеличивать и считывать переменную. Кроме того, метод Store может сохранять значения, метод Swap может заменять значения, а метод CompareAndSwap может сравнивать значения и заменять их, если они подходят.
cond
sync.Cond
Пакет cond предоставляет условные переменные. Пакет cond можно создать с помощью sync.Cond и он предоставляет методы Wait, Signal, Broadcast.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{}) // Создание условной переменной с мьютексом
9 ready := false // Флаг готовности
10
11 go func() { // Запуск горутины
12 c.L.Lock() // Захват мьютекса условной переменной
13 ready = true // Установка флага готовности в true
14 c.Signal() // Отправка сигнала одной ожидающей горутине
15 c.L.Unlock() // Освобождение мьютекса условной переменной
16 }()
17
18 c.L.Lock() // Захват мьютекса условной переменной
19 for !ready { // Цикл ожидания, пока ready не станет true
20 c.Wait() // Ожидание сигнала, освобождение мьютекса и повторный захват после получения сигнала
21 }
22 c.L.Unlock() // Освобождение мьютекса условной переменной
23
24 println("Ready!") // Вывод сообщения
25}
Приведенный выше код использует sync.Cond для ожидания, пока переменная ready не станет true. В этом коде sync.Cond используется для ожидания, пока переменная ready не станет true, а затем выводится Ready!. Таким образом, используя sync.Cond, вы можете заставить несколько goroutine ждать, пока не будет выполнено определенное условие.
Используя это, можно реализовать простую queue.
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex
15 Cond *sync.Cond
16 Head *Node[T]
17 Tail *Node[T]
18 Len int
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{} // Создание нового экземпляра Queue
23 q.Cond = sync.NewCond(&q.Mutex) // Инициализация Cond с мьютексом Queue
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock() // Захват мьютекса
29 defer q.Unlock() // Освобождение мьютекса после завершения функции
30
31 node := &Node[T]{Value: value} // Создание нового узла
32 if q.Len == 0 { // Если очередь пуста
33 q.Head = node // Устанавливаем новый узел как голову
34 q.Tail = node // Устанавливаем новый узел как хвост
35 } else { // Если очередь не пуста
36 q.Tail.Next = node // Добавляем новый узел после текущего хвоста
37 q.Tail = node // Обновляем хвост
38 }
39 q.Len++ // Увеличиваем длину очереди
40 q.Cond.Signal() // Отправляем сигнал одной ожидающей горутине
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock() // Захват мьютекса
45 defer q.Unlock() // Освобождение мьютекса после завершения функции
46
47 for q.Len == 0 { // Цикл ожидания, пока очередь не станет пустой
48 q.Cond.Wait() // Ожидание сигнала, освобождение мьютекса и повторный захват после получения сигнала
49 }
50
51 node := q.Head // Получаем головной узел
52 q.Head = q.Head.Next // Перемещаем голову на следующий узел
53 q.Len-- // Уменьшаем длину очереди
54 return node.Value // Возвращаем значение из извлеченного узла
55}
Используя sync.Cond таким образом, вы можете эффективно ждать и возобновлять работу, когда условие будет выполнено, вместо того чтобы использовать spin-lock, который потребляет много ресурсов CPU.
semaphore
golang.org/x/sync/semaphore.Semaphore
Пакет semaphore предоставляет семафоры. Семафор можно создать с помощью golang.org/x/sync/semaphore.Semaphore, и он предоставляет методы Acquire, Release, TryAcquire.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1) // Создание взвешенного семафора с весом 1
11
12 if s.TryAcquire(1) { // Попытка захватить 1 единицу веса
13 fmt.Println("Acquired!") // Вывод сообщения о захвате
14 } else {
15 fmt.Println("Not Acquired!") // Вывод сообщения о незахвате
16 }
17
18 s.Release(1) // Освобождение 1 единицы веса
19}
Приведенный выше код использует semaphore для создания семафора, а затем использует методы Acquire для получения семафора и Release для его освобождения. В этом коде я показал, как получить и освободить семафор с помощью semaphore.
В заключение
Я думаю, что основных сведений достаточно. Надеюсь, что на основе содержания этой статьи вы сможете понять, как управлять параллелизмом с помощью goroutine, и сможете применять это на практике. Надеюсь, эта статья была вам полезна. Спасибо.