Go 並行性スターターパック
概要
簡単な紹介
Go言語には、多くの並行性管理のためのツールがあります。この記事では、その一部とコツを紹介します。
ゴルーチンとは?
goroutineは、Go言語でサポートされている新しい形式の並行性モデルです。一般的に、プログラムは複数のタスクを同時に実行するためにOSからOSスレッドを取得し、コア数に応じて並列にタスクを実行します。そして、より小さな単位の並行性を実現するためには、ユーザーランドでグリーンスレッドを生成し、1つのOSスレッド内で複数のグリーンスレッドが交代でタスクを実行するようにします。しかし、ゴルーチンの場合、このような形式のグリーンスレッドをさらに小さく効率的にしました。これらのゴルーチンはスレッドよりも使用するメモリが少なく、スレッドよりも迅速に生成および切り替えが可能です。
ゴルーチンを使用するには、単に go
キーワードを使用するだけで済みます。これにより、プログラムを作成する過程で、同期コードを非同期コードとして直感的に実行できます。
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch)
12 time.Sleep(1 * time.Second)
13 fmt.Println("Hello, World!")
14 }()
15
16 fmt.Println("Waiting for goroutine...")
17 for range ch {}
18}
このコードは、簡単に1秒間待機してから Hello, World!
を出力する同期コードを非同期フローに変更します。今の例は簡単ですが、少し複雑なコードを同期コードから非同期コードに変更すると、コードの可読性、可視性、理解度が既存の async await や promise のような方式よりもさらに向上します。
ただし、多くの場合、このような同期コードを単純に非同期で呼び出すフローと fork & join
のようなフロー(まるで分割統治法に似たフロー)を理解しない状態では、良くないゴルーチンコードが作られることもあります。このような場合に備えることができるいくつかの方法とテクニックを、この記事で紹介します。
並行性管理
context
最初の管理テクニックとして context
が登場するのは意外かもしれません。しかし、Go言語では context
は単純なキャンセル機能を超えて、全体の作業ツリーを管理する上で優れた役割を果たします。もしご存知ない方のために、簡単にこのパッケージを説明します。
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background())
5 defer cancel()
6
7 go func() {
8 <-ctx.Done()
9 fmt.Println("Context is done!")
10 }()
11
12 time.Sleep(1 * time.Second)
13
14 cancel()
15
16 time.Sleep(1 * time.Second)
17}
上記のコードは、context
を使用して、1秒後に Context is done!
を出力するコードです。context
は Done()
メソッドを通じてキャンセル状態を確認でき、WithCancel
、WithTimeout
、WithDeadline
、WithValue
などのメソッドを通じてさまざまなキャンセル方法を提供します。
簡単な例を作成してみましょう。もし皆さんが、何らかのデータを取得するために aggregator
パターンを使用して、user
、post
、comment
を取得するコードを作成すると仮定します。そして、すべてのリクエストが2秒以内に行われる必要がある場合、以下のように記述できます。
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
5 defer cancel()
6
7 ch := make(chan struct{})
8 go func() {
9 defer close(ch)
10 user := getUser(ctx)
11 post := getPost(ctx)
12 comment := getComment(ctx)
13
14 fmt.Println(user, post, comment)
15 }()
16
17 select {
18 case <-ctx.Done():
19 fmt.Println("Timeout!")
20 case <-ch:
21 fmt.Println("All data is fetched!")
22 }
23}
上記のコードは、2秒以内にすべてのデータを取得できない場合、Timeout!
を出力し、すべてのデータを取得した場合は All data is fetched!
を出力します。このような方法で context
を使用すると、複数のゴルーチンが動作するコードでも、キャンセルとタイムアウトを簡単に管理できます。
これに関連するさまざまな context 関連の関数とメソッドは、godoc context で確認できます。簡単なものは学習して、便利に利用できるようになっていただければと思います。
channel
unbuffered channel
channel
は、ゴルーチン間の通信のためのツールです。channel
は make(chan T)
で作成できます。このとき、T
は、その channel
が受け渡すデータの型です。channel
は <-
でデータの送受信が可能で、close
で channel
を閉じることができます。
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
上記のコードは、channel
を使用して1と2を出力するコードです。このコードでは、単純に channel
に値を送受信することだけを示しています。しかし、channel
はこれよりも多くの機能を提供します。まず、buffered channel
と unbuffered channel
について見ていきましょう。始める前に、上に書かれた例は unbuffered channel
であり、チャネルにデータを送信する動作とデータを受信する動作が同時に行われる必要があります。もしこれらの動作が同時に行われない場合、デッドロックが発生する可能性があります。
buffered channel
もし上記のコードが単純な出力ではなく、負荷の高い作業を行う2つのプロセスだったらどうでしょうか?2番目のプロセスが読み込んで処理を実行中に長時間ハングアップした場合、1番目のプロセスもその時間の間停止することになります。私たちはこのような状況を防ぐために buffered channel
を使用できます。
1package main
2
3func main() {
4 ch := make(chan int, 2)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
上記のコードは、buffered channel
を使用して1と2を出力するコードです。このコードでは、buffered channel
を使用して、channel
にデータを送信する動作とデータを受信する動作が同時に行われなくても良いようにしました。このようにチャネルにバッファを持たせることで、その長さ分の余裕が生まれ、後続の作業の影響によって発生する作業の遅延を防ぐことができます。
select
複数のチャネルを扱う際に、select
構文を使用すると、簡単に fan-in
構造を実装できます。
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10)
10 ch2 := make(chan int, 10)
11 ch3 := make(chan int, 10)
12
13 go func() {
14 for {
15 ch1 <- 1
16 time.Sleep(1 * time.Second)
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2
22 time.Sleep(2 * time.Second)
23 }
24 }()
25 go func() {
26 for {
27 ch3 <- 3
28 time.Sleep(3 * time.Second)
29 }
30 }()
31
32 for i := 0; i < 3; i++ {
33 select {
34 case v := <-ch1:
35 fmt.Println(v)
36 case v := <-ch2:
37 fmt.Println(v)
38 case v := <-ch3:
39 fmt.Println(v)
40 }
41 }
42}
上記のコードは、周期的に1、2、3を送信する3つのチャネルを作成し、select
を使用してチャネルから値を受信して出力するコードです。このような方法で select
を使用すると、複数のチャネルから同時にデータを受信しながら、チャネルから値を受け取るごとに処理できます。
for range
channel
は for range
を使用して簡単にデータを受信できます。for range
をチャネルに使用すると、そのチャネルにデータが追加されるたびに動作し、チャネルが閉じられるとループを終了します。
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
上記のコードは、channel
を使用して1と2を出力するコードです。このコードでは、for range
を使用してチャネルにデータが追加されるたびにデータを受信して出力します。そして、チャネルが閉じられるとループを終了します。
上に何度か書いたように、この構文は単純な同期手段にも使用できます。
1package main
2
3func main() {
4 ch := make(chan struct{})
5 go func() {
6 defer close(ch)
7 time.Sleep(1 * time.Second)
8 fmt.Println("Hello, World!")
9 }()
10
11 fmt.Println("Waiting for goroutine...")
12 for range ch {}
13}
上記のコードは、1秒待機してから Hello, World!
を出力するコードです。このコードでは channel
を使用して、同期コードを非同期コードに変更しました。このような方法で channel
を使用すると、同期コードを非同期コードに簡単に変更し、join
ポイントを設定できます。
etc
- nilチャネルにデータを送信または受信すると、無限ループに陥りデッドロックが発生する可能性があります。
- チャネルを閉じた後にデータを送信すると、panicが発生します。
- チャネルを特に閉じなくても、GCが回収する際にチャネルを閉じます。
mutex
spinlock
spinlock
は、ループを回しながらロックを試み続ける同期方法です。Go言語ではポインターを使用して簡単にスピンロックを実装できます。
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14 runtime.Gosched()
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{}
24}
上記のコードは、spinlock
パッケージを実装したコードです。このコードでは、sync/atomic
パッケージを使用して SpinLock
を実装しました。Lock
メソッドでは atomic.CompareAndSwapUintptr
を使用してロックを試み、Unlock
メソッドでは atomic.StoreUintptr
を使用してロックを解除します。この方式は、休むことなくロックを試みるため、ロックを取得するまでCPUを継続的に使用することになり、無限ループに陥る可能性があります。したがって、spinlock
は単純な同期に使用するか、短時間のみ使用する場合に使用するのが適切です。
sync.Mutex
mutex
はゴルーチン間の同期のためのツールです。sync
パッケージが提供する mutex
は、Lock
、Unlock
、RLock
、RUnlock
などのメソッドを提供します。mutex
は sync.Mutex
で作成でき、sync.RWMutex
で読み取り/書き込みロックを使用することもできます。
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex
9 var count int
10
11 go func() {
12 mu.Lock()
13 count++
14 mu.Unlock()
15 }()
16
17 mu.Lock()
18 count++
19 mu.Unlock()
20
21 println(count)
22}
上記のコードでは、ほぼ同時に2つのゴルーチンが同じ count
変数にアクセスします。このとき、mutex
を使用して count
変数にアクセスするコードを臨界領域にすると、count
変数への同時アクセスを防ぐことができます。そうすれば、このコードは何回実行しても同じく 2
を出力するようになります。
sync.RWMutex
sync.RWMutex
は読み取りロックと書き込みロックを区別して使用できる mutex
です。RLock
、RUnlock
メソッドを使用して、読み取りロックをかけて解除できます。
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex
9 data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock()
14 defer m.RUnlock()
15
16 value, ok := m.data[key]
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock()
22 defer m.Unlock()
23
24 m.data[key] = value
25}
上記のコードは、sync.RWMutex
を使用して ConcurrentMap
を実装したコードです。このコードでは、Get
メソッドで読み取りロックをかけ、Set
メソッドで書き込みロックをかけて data
マップに安全にアクセスし、修正できるようにしています。読み取りロックが必要な理由は、単純な読み取り作業が多い場合、書き込みロックをかけずに読み取りロックだけをかけて、複数のゴルーチンが同時に読み取り作業を実行できるようにするためです。これにより、状態の変更がなく、書き込みロックをかける必要がない場合には、読み取りロックだけをかけて性能を向上させることができます。
fakelock
fakelock
は sync.Locker
を実装する簡単なトリックです。この構造体は sync.Mutex
と同じメソッドを提供しますが、実際には動作しません。
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}
上記のコードは、fakelock
パッケージを実装したコードです。このパッケージは sync.Locker
を実装して Lock
、Unlock
メソッドを提供しますが、実際には何も動作しません。なぜこのようなコードが必要なのかは、機会があれば述べます。
waitgroup
sync.WaitGroup
sync.WaitGroup
は、ゴルーチンの作業がすべて完了するまで待機するためのツールです。Add
、Done
、Wait
メソッドを提供し、Add
メソッドでゴルーチンの数を追加し、Done
メソッドでゴルーチンの作業が完了したことを知らせます。そして、Wait
メソッドですべてのゴルーチンの作業が完了するまで待機します。
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
上記のコードは、sync.WaitGroup
を使用して100個のゴルーチンが同時にc
変数に値を加算するコードです。このコードでは、sync.WaitGroup
を使用してすべてのゴルーチンが終了するまで待機した後、c
変数に加算された値を出力します。単純にいくつかの作業をfork & join
する場合、チャネルのみを使用しても十分ですが、大量の作業をfork & join
する場合は、sync.WaitGroup
を使用することも良い選択肢です。
with slice
スライスと共に使用する場合、waitgroup
はロックなしで優れた同時実行作業を管理するツールになり得ます。
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup
11 arr := [10]int{}
12
13 for i := 0; i < 10; i++ {
14 wg.Add(1)
15 go func(id int) {
16 defer wg.Done()
17
18 arr[id] = rand.Intn(100)
19 }(i)
20 }
21
22 wg.Wait()
23 fmt.Println("Done")
24
25 for i, v := range arr {
26 fmt.Printf("arr[%d] = %d\n", i, v)
27 }
28}
上記のコードは、waitgroup
のみを使用して各ゴルーチンが同時に10個のランダムな整数を生成し、割り当てられたインデックスに保存するコードです。このコードでは、waitgroup
を使用してすべてのゴルーチンが終了するまで待機した後、Done
を出力します。このような方法でwaitgroup
を使用すると、複数のゴルーチンが同時に作業を実行し、すべてのゴルーチンが終了するまでロックなしでデータを保存し、作業終了後に一括して後処理を実行できます。
golang.org/x/sync/errgroup.ErrGroup
errgroup
は、sync.WaitGroup
を拡張したパッケージです。errgroup
はsync.WaitGroup
とは異なり、ゴルーチンの作業の1つでもエラーが発生すると、すべてのゴルーチンをキャンセルしてエラーを返します。
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background())
11 _ = ctx
12
13 for i := 0; i < 10; i++ {
14 i := i
15 g.Go(func() error {
16 if i == 5 {
17 return fmt.Errorf("error")
18 }
19 return nil
20 })
21 }
22
23 if err := g.Wait(); err != nil {
24 fmt.Println(err)
25 }
26}
上記のコードは、errgroup
を使用して10個のゴルーチンを生成し、5番目のゴルーチンでエラーを発生させるコードです。意図的に5番目のゴルーチンでエラーを発生させ、エラーが発生する場合を示しました。ただし、実際に使用する際には、errgroup
を使用してゴルーチンを生成し、各ゴルーチンでエラーが発生する場合について、さまざまな後処理を行う方法で使用できます。
once
一度だけ実行される必要があるコードを実行するツールです。以下の生成子を通じて関連コードを実行できます。
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFunc
は、単純に該当関数が全体にわたって1回だけ実行できるようにします。
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() {
7 println("Hello, World!")
8 })
9
10 once()
11 once()
12 once()
13 once()
14 once()
15}
上記のコードは、sync.OnceFunc
を使用してHello, World!
を出力するコードです。このコードでは、sync.OnceFunc
を使用してonce
関数を生成し、once
関数を複数回呼び出してもHello, World!
が1回だけ出力されます。
OnceValue
OnceValue
は、単純に該当関数が全体にわたって1回だけ実行されるだけでなく、該当関数の戻り値を保存して、再度呼び出すときに保存された値を返します。
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValue(func() int {
8 c += 1
9 return c
10 })
11
12 println(once())
13 println(once())
14 println(once())
15 println(once())
16 println(once())
17}
上記のコードは、sync.OnceValue
を使用してc
変数を1ずつ増加させるコードです。このコードでは、sync.OnceValue
を使用してonce
関数を生成し、once
関数を複数回呼び出してもc
変数が1回だけ増加した1を返します。
OnceValues
OnceValues
は、OnceValue
と同じように動作しますが、複数の値を返すことができます。
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValues(func() (int, int) {
8 c += 1
9 return c, c
10 })
11
12 a, b := once()
13 println(a, b)
14 a, b = once()
15 println(a, b)
16 a, b = once()
17 println(a, b)
18 a, b = once()
19 println(a, b)
20 a, b = once()
21 println(a, b)
22}
上記のコードは、sync.OnceValues
を使用してc
変数を1ずつ増加させるコードです。このコードでは、sync.OnceValues
を使用してonce
関数を生成し、once
関数を複数回呼び出してもc
変数が1回だけ増加した1を返します。
atomic
atomic
パッケージは、アトミックな演算を提供するパッケージです。atomic
パッケージは、Add
、CompareAndSwap
、Load
、Store
、Swap
などのメソッドを提供しますが、最近ではInt64
、Uint64
、Pointer
などのタイプの使用が推奨されています。
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
以前使用された例です。atomic.Int64
タイプを使用してc
変数をアトミックに増加させるコードです。Add
メソッドとLoad
メソッドで、アトミックに変数を増加させ、変数を読み取ることができます。また、Store
メソッドで値を保存し、Swap
メソッドで値を交換し、CompareAndSwap
メソッドで値を比較して適合すれば交換できます。
cond
sync.Cond
cond
パッケージは、条件変数を提供するパッケージです。cond
パッケージはsync.Cond
で生成でき、Wait
、Signal
、Broadcast
メソッドを提供します。
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{})
9 ready := false
10
11 go func() {
12 c.L.Lock()
13 ready = true
14 c.Signal()
15 c.L.Unlock()
16 }()
17
18 c.L.Lock()
19 for !ready {
20 c.Wait()
21 }
22 c.L.Unlock()
23
24 println("Ready!")
25}
上記のコードは、sync.Cond
を使用してready
変数がtrue
になるまで待機するコードです。このコードでは、sync.Cond
を使用してready
変数がtrue
になるまで待機した後、Ready!
を出力します。このような方法でsync.Cond
を使用すると、複数のゴルーチンが同時に特定の条件を満たすまで待機させることができます。
これを活用して、簡単なqueue
を実装できます。
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex
15 Cond *sync.Cond
16 Head *Node[T]
17 Tail *Node[T]
18 Len int
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{}
23 q.Cond = sync.NewCond(&q.Mutex)
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock()
29 defer q.Unlock()
30
31 node := &Node[T]{Value: value}
32 if q.Len == 0 {
33 q.Head = node
34 q.Tail = node
35 } else {
36 q.Tail.Next = node
37 q.Tail = node
38 }
39 q.Len++
40 q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock()
45 defer q.Unlock()
46
47 for q.Len == 0 {
48 q.Cond.Wait()
49 }
50
51 node := q.Head
52 q.Head = q.Head.Next
53 q.Len--
54 return node.Value
55}
このようにsync.Cond
を活用すると、spin-lock
で多くのCPU使用量を使用する代わりに、効率的に待機し、条件が満たされると再び動作できます。
semaphore
golang.org/x/sync/semaphore.Semaphore
semaphore
パッケージは、セマフォを提供するパッケージです。semaphore
パッケージは、golang.org/x/sync/semaphore.Semaphore
で生成でき、Acquire
、Release
、TryAcquire
メソッドを提供します。
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1)
11
12 if s.TryAcquire(1) {
13 fmt.Println("Acquired!")
14 } else {
15 fmt.Println("Not Acquired!")
16 }
17
18 s.Release(1)
19}
上記のコードは、semaphore
を使用してセマフォを生成し、セマフォを使用してAcquire
メソッドでセマフォを取得し、Release
メソッドでセマフォを解放するコードです。このコードでは、semaphore
を使用してセマフォを取得および解放する方法を示しました。
最後に
基本的な内容はここまでで十分だと思います。この記事の内容に基づいて、皆様がゴルーチンを使用して同時性を管理する方法を理解し、実際に使用できるようになることを願っています。この記事が皆様のお役に立てば幸いです。ありがとうございました。