Go Concurrency Starter Pack
概要
簡潔な紹介
Go言語には、並行処理を管理するための多くのツールがあります。この記事では、その一部とテクニックを紹介します。
goroutine?
goroutineは、Go言語がサポートする新しい形式の並行処理モデルです。一般に、プログラムは同時に複数のタスクを実行するためにOSからOSスレッドを受け取り、コア数に応じて並行してタスクを実行します。そして、より小さな単位の並行処理を実行するためには、ユーザーランドでグリーンスレッドを生成し、一つのOSスレッド内で複数のグリーンスレッドが交代でタスクを実行するようにします。しかし、goroutineの場合、このような形式のグリーンスレッドをより小さく効率的に作成しました。これらのgoroutineは、スレッドよりも少ないメモリを使用し、スレッドよりも速く生成および交換できます。
goroutineを使用するには、単にgoキーワードを使用するだけで十分です。これにより、プログラムを作成する過程で、直感的に同期コードを非同期コードとして実行できるようになります。
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch) // チャネルを遅延して閉じる
12 time.Sleep(1 * time.Second) // 1秒間スリープ
13 fmt.Println("Hello, World!") // "Hello, World!"を出力
14 }()
15
16 fmt.Println("Waiting for goroutine...") // "Waiting for goroutine..."を出力
17 for range ch {} // goroutineがチャネルを閉じるまで待機
18}
このコードは、1秒休止してからHello, World!を出力する同期コードを、非同期フローに簡単に変更します。現在の例は単純ですが、少し複雑なコードを同期コードから非同期コードに変更すると、コードの可読性、可視性、理解度が、従来のasync awaitやpromiseのような方式よりもさらに向上します。
ただし、多くの場合、このような同期コードを単に非同期で呼び出すフローと、fork & joinのようなフロー(分割統治に似たフロー)を理解していない状態では、良くないgoroutineコードが作成されることもあります。このような場合に備えるためのいくつかの方法と技法を、この記事で紹介します。
並行処理の管理
context
最初の管理手法としてcontextが登場するのは意外かもしれません。しかし、Go言語においてcontextは、単なるキャンセル機能を超えて、全体の作業ツリーを管理する上で卓越した役割を果たします。もしご存知ない方のために、簡単にこのパッケージを説明します。
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background()) // キャンセル可能なcontextを作成
5 defer cancel() // 関数終了時にcancelを呼び出す
6
7 go func() {
8 <-ctx.Done() // contextがキャンセルされるまで待機
9 fmt.Println("Context is done!") // "Context is done!"を出力
10 }()
11
12 time.Sleep(1 * time.Second) // 1秒間スリープ
13
14 cancel() // contextをキャンセル
15
16 time.Sleep(1 * time.Second) // 1秒間スリープ
17}
上記のコードは、contextを使用して、1秒後にContext is done!と出力するコードです。contextはDone()メソッドを通じてキャンセルの有無を確認でき、WithCancel、WithTimeout、WithDeadline、WithValueなどのメソッドを通じて多様なキャンセル方法を提供します。
簡単な例を作成してみましょう。もし皆さんが何らかのデータを取得するためにaggregatorパターンを使用し、user、post、commentを取得するコードを作成すると仮定します。そして、すべてのリクエストが2秒以内に完了する必要がある場合、次のように記述できます。
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second) // 2秒のタイムアウトを持つcontextを作成
5 defer cancel() // 関数終了時にcancelを呼び出す
6
7 ch := make(chan struct{}) // 構造体型のチャネルを作成
8 go func() {
9 defer close(ch) // 関数終了時にチャネルを閉じる
10 user := getUser(ctx) // ユーザーデータを取得
11 post := getPost(ctx) // 投稿データを取得
12 comment := getComment(ctx) // コメントデータを取得
13
14 fmt.Println(user, post, comment) // 取得したデータを出力
15 }()
16
17 select {
18 case <-ctx.Done(): // contextがキャンセルされた場合
19 fmt.Println("Timeout!") // "Timeout!"を出力
20 case <-ch: // チャネルからデータが受信された場合
21 fmt.Println("All data is fetched!") // "All data is fetched!"を出力
22 }
23}
上記のコードは、2秒以内にすべてのデータを取得できない場合、Timeout!と出力し、すべてのデータを取得できた場合、All data is fetched!と出力します。このようにcontextを使用すると、複数のgoroutineが動作するコードでもキャンセルとタイムアウトを簡単に管理できます。
これに関連する様々なcontext関連関数とメソッドは、godoc contextで確認できます。簡単なものは学習して、便利に利用できるようになっていただければ幸いです。
channel
unbuffered channel
channelはgoroutine間の通信のためのツールです。channelはmake(chan T)で生成できます。このとき、Tは当該channelが伝達するデータの型です。channelは<-でデータを送受信でき、closeでchannelを閉じることができます。
1package main
2
3func main() {
4 ch := make(chan int) // 整数型チャネルを作成
5 go func() {
6 ch <- 1 // チャネルに1を送信
7 ch <- 2 // チャネルに2を送信
8 close(ch) // チャネルを閉じる
9 }()
10
11 for i := range ch { // チャネルから値を受信し続ける
12 fmt.Println(i) // 受信した値を出力
13 }
14}
上記のコードは、channelを使用して1と2を出力するコードです。このコードでは、単純にchannelに値を送信し、受信することのみを示しています。しかし、channelはこれよりも多くの機能を提供します。まず、buffered channelとunbuffered channelについて見ていきましょう。始める前に、上記の例はunbuffered channelであり、チャネルにデータを送信する動作とデータを受信する動作が同時に行われる必要があります。もしこれらの動作が同時に行われない場合、デッドロックが発生する可能性があります。
buffered channel
もし上記のコードが単純な出力ではなく、重い処理を実行する2つのプロセスだったらどうでしょうか?2番目のプロセスが読み込み処理を実行中に長時間停止した場合、1番目のプロセスもその間停止することになります。このような状況を防ぐために、buffered channelを使用できます。
1package main
2
3func main() {
4 ch := make(chan int, 2) // バッファサイズ2の整数型チャネルを作成
5 go func() {
6 ch <- 1 // チャネルに1を送信
7 ch <- 2 // チャネルに2を送信
8 close(ch) // チャネルを閉じる
9 }()
10
11 for i := range ch { // チャネルから値を受信し続ける
12 fmt.Println(i) // 受信した値を出力
13 }
14}
上記のコードは、buffered channelを使用して1と2を出力するコードです。このコードでは、buffered channelを使用することで、channelにデータを送信する動作とデータを受信する動作が同時に行われなくてもよいようにしました。このようにチャネルにバッファを設けることで、その長さ分の余裕が生まれ、後続のタスクの影響による作業遅延を防ぐことができます。
select
複数のチャネルを扱う際、select構文を使用すると、簡単にfan-in構造を実装できます。
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10) // バッファサイズ10の整数型チャネル1を作成
10 ch2 := make(chan int, 10) // バッファサイズ10の整数型チャネル2を作成
11 ch3 := make(chan int, 10) // バッファサイズ10の整数型チャネル3を作成
12
13 go func() {
14 for {
15 ch1 <- 1 // チャネル1に1を送信
16 time.Sleep(1 * time.Second) // 1秒間スリープ
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2 // チャネル2に2を送信
22 time.Sleep(2 * time.Second) // 2秒間スリープ
23 }
24 }()
25 go func() {
26 for {
27 ch3 <- 3 // チャネル3に3を送信
28 time.Sleep(3 * time.Second) // 3秒間スリープ
29 }
30 }()
31
32 for i := 0; i < 3; i++ { // 3回ループ
33 select {
34 case v := <-ch1: // チャネル1から値を受信した場合
35 fmt.Println(v) // 受信した値を出力
36 case v := <-ch2: // チャネル2から値を受信した場合
37 fmt.Println(v) // 受信した値を出力
38 case v := <-ch3: // チャネル3から値を受信した場合
39 fmt.Println(v) // 受信した値を出力
40 }
41 }
42}
上記のコードは、周期的に1, 2, 3を送信する3つのチャネルを作成し、selectを使用してチャネルから値を受け取って出力するコードです。このようにselectを使用すると、複数のチャネルから同時にデータを受信し、チャネルから値を受け取った順に処理できます。
for range
channelはfor rangeを使用して簡単にデータを受信できます。for rangeをチャネルに使うと、そのチャネルにデータが追加されるたびに動作し、チャネルが閉じられるとループを終了します。
1package main
2
3func main() {
4 ch := make(chan int) // 整数型チャネルを作成
5 go func() {
6 ch <- 1 // チャネルに1を送信
7 ch <- 2 // チャネルに2を送信
8 close(ch) // チャネルを閉じる
9 }()
10
11 for i := range ch { // チャネルから値を受信し続ける
12 fmt.Println(i) // 受信した値を出力
13 }
14}
上記のコードは、channelを使用して1と2を出力するコードです。このコードでは、for rangeを使用して、チャネルにデータが追加されるたびにデータを受け取って出力します。そして、チャネルが閉じられるとループを終了します。
上記に何度か書いたように、この構文は単純な同期手段にも使用できます。
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{}) // 構造体型のチャネルを作成
10 go func() {
11 defer close(ch) // 遅延してチャネルを閉じる
12 time.Sleep(1 * time.Second) // 1秒間スリープ
13 fmt.Println("Hello, World!") // "Hello, World!"を出力
14 }()
15
16 fmt.Println("Waiting for goroutine...") // "Waiting for goroutine..."を出力
17 for range ch {} // goroutineがチャネルを閉じるまで待機
18}
上記のコードは、1秒間休止してからHello, World!を出力するコードです。このコードでは、channelを使用して同期コードを非同期コードに変更しました。このようにchannelを使用すると、同期コードを非同期コードに簡単に変更し、join地点を設定できます。
etc
- nil channelにデータを送信または受信すると、無限ループに陥りデッドロックが発生する可能性があります。
- チャネルを閉じた後にデータを送信すると、panicが発生します。
- チャネルを明示的に閉じなくても、GCが回収する際にチャネルを閉じます。
mutex
spinlock
spinlockは、繰り返し処理を行いながらロックを試行する同期方法です。Go言語では、ポインタを使用することで簡単にスピンロックを実装できます。
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) { // ロックが0(アンロック状態)であれば1(ロック状態)に設定し、成功するまで繰り返す
14 runtime.Gosched() // CPUを他のgoroutineに譲る
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0) // ロックを0(アンロック状態)に設定
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{} // 新しいSpinLockを返す
24}
上記のコードはspinlockパッケージを実装したものです。このコードでは、sync/atomicパッケージを使用してSpinLockを実装しました。Lockメソッドではatomic.CompareAndSwapUintptrを使用してロックを試行し、Unlockメソッドではatomic.StoreUintptrを使用してロックを解除します。この方式は、休むことなくロックを試行するため、ロックを獲得するまでCPUを使い続け、無限ループに陥る可能性があります。したがって、spinlockは単純な同期や、短時間だけ使用する場合に使うのが良いでしょう。
sync.Mutex
mutexはgoroutine間の同期のためのツールです。syncパッケージが提供するmutexは、Lock、Unlock、RLock、RUnlockなどのメソッドを提供します。mutexはsync.Mutexで生成でき、sync.RWMutexで読み取り/書き込みロックを使用することもできます。
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex // ミューテックスを宣言
9 var count int // カウンター変数を宣言
10
11 go func() {
12 mu.Lock() // ミューテックスをロック
13 count++ // カウンターをインクリメント
14 mu.Unlock() // ミューテックスをアンロック
15 }()
16
17 mu.Lock() // ミューテックスをロック
18 count++ // カウンターをインクリメント
19 mu.Unlock() // ミューテックスをアンロック
20
21 println(count) // カウンターの値を出力
22}
上記のコードでは、ほぼ同時に2つのgoroutineが同じcount変数にアクセスします。このとき、mutexを使用してcount変数にアクセスするコードをクリティカルセクションにすることで、count変数への同時アクセスを防ぐことができます。そうすれば、このコードは何度実行しても同じ2を出力します。
sync.RWMutex
sync.RWMutexは、読み取りロックと書き込みロックを区別して使用できるmutexです。RLock、RUnlockメソッドを使用して読み取りロックをかけたり解除したりできます。
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex // 読み書きミューテックスを埋め込む
9 data map[K]V // データマップ
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock() // 読み取りロック
14 defer m.RUnlock() // 関数終了時に読み取りロックを解除
15
16 value, ok := m.data[key] // マップから値を取得
17 return value, ok // 値と存在フラグを返す
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock() // 書き込みロック
22 defer m.Unlock() // 関数終了時に書き込みロックを解除
23
24 m.data[key] = value // マップに値を設定
25}
上記のコードは、sync.RWMutexを使用してConcurrentMapを実装したコードです。このコードでは、Getメソッドで読み取りロックをかけ、Setメソッドで書き込みロックをかけることで、dataマップに安全にアクセスし、修正することができます。読み取りロックが必要な理由は、単純な読み取り操作が多い場合、書き込みロックをかけずに読み取りロックのみをかけることで、複数のgoroutineが同時に読み取り操作を実行できるようにするためです。これにより、状態の変更が不要で書き込みロックをかける必要がない場合は、読み取りロックのみをかけることでパフォーマンスを向上させることができます。
fakelock
fakelockはsync.Lockerを実装する簡単なトリックです。この構造体はsync.Mutexと同じメソッドを提供しますが、実際の動作はしません。
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {} // 何もしないLockメソッド
6
7func (f *FakeLock) Unlock() {} // 何もしないUnlockメソッド
上記のコードはfakelockパッケージを実装したものです。このパッケージはsync.Lockerを実装し、Lock、Unlockメソッドを提供しますが、実際には何の動作もしません。なぜこのようなコードが必要なのかは、機会があれば記述します。
waitgroup
sync.WaitGroup
sync.WaitGroupは、goroutineの作業がすべて終了するまで待機するツールです。Add、Done、Waitメソッドを提供し、Addメソッドでgoroutineの数を追加し、Doneメソッドでgoroutineの作業が終了したことを通知します。そして、Waitメソッドですべてのgoroutineの作業が終了するまで待機します。
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{} // WaitGroupを初期化
10 c := atomic.Int64{} // アトミックなInt64カウンターを初期化
11
12 for i := 0; i < 100 ; i++ { // 100回ループ
13 wg.Add(1) // WaitGroupのカウンターを1増やす
14 go func() {
15 defer wg.Done() // goroutine終了時にDoneを呼び出す
16 c.Add(1) // カウンターをアトミックに1増やす
17 }()
18 }
19
20 wg.Wait() // すべてのgoroutineが終了するまで待機
21 println(c.Load()) // カウンターの最終値を出力
22}
上記のコードは、sync.WaitGroupを使用して100個のgoroutineが同時にc変数に値を加算するコードです。このコードでは、sync.WaitGroupを使用してすべてのgoroutineが終了するまで待機した後、c変数に加算された値を出力します。単純にいくつかのタスクをfork & joinする場合にはチャネルのみでも十分ですが、大量のタスクをfork & joinする場合にはsync.WaitGroupを使用するのも良い選択肢です。
with slice
スライスと組み合わせて使用すると、waitgroupはロックなしで優れた並行実行タスク管理ツールとなり得ます。
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup // WaitGroupを宣言
11 arr := [10]int{} // 10個の整数を格納する配列を宣言
12
13 for i := 0; i < 10; i++ { // 10回ループ
14 wg.Add(1) // WaitGroupのカウンターを1増やす
15 go func(id int) { // goroutineを起動
16 defer wg.Done() // goroutine終了時にDoneを呼び出す
17
18 arr[id] = rand.Intn(100) // 配列の指定されたインデックスに0から99のランダムな整数を代入
19 }(i) // ループ変数をgoroutineに渡す
20 }
21
22 wg.Wait() // すべてのgoroutineが終了するまで待機
23 fmt.Println("Done") // "Done"を出力
24
25 for i, v := range arr { // 配列の各要素をループ
26 fmt.Printf("arr[%d] = %d\n", i, v) // インデックスと値を出力
27 }
28}
上記のコードは、waitgroupのみを使用して、各goroutineが同時に10個のランダムな整数を生成し、割り当てられたインデックスに保存するコードです。このコードでは、waitgroupを使用してすべてのgoroutineが終了するまで待機した後、Doneを出力します。このようにwaitgroupを使用すると、複数のgoroutineが同時にタスクを実行し、すべてのgoroutineが終了するまでロックなしでデータを保存し、タスク終了後に一括して後処理を行うことができます。
golang.org/x/sync/errgroup.ErrGroup
errgroupはsync.WaitGroupを拡張したパッケージです。errgroupはsync.WaitGroupとは異なり、goroutineの作業中にエラーが発生した場合、すべてのgoroutineをキャンセルし、エラーを返します。
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background()) // errgroupとcontextを初期化
11 _ = ctx // ctxは使用されないが、慣例として保持
12
13 for i := 0; i < 10; i++ { // 10回ループ
14 i := i // ループ変数をシャドウイングしてgoroutine内で正しくキャプチャ
15 g.Go(func() error { // goroutineを起動
16 if i == 5 { // iが5の場合
17 return fmt.Errorf("error") // エラーを返す
18 }
19 return nil // エラーがなければnilを返す
20 })
21 }
22
23 if err := g.Wait(); err != nil { // すべてのgoroutineが終了するまで待機し、エラーがあれば
24 fmt.Println(err) // エラーを出力
25 }
26}
上記のコードは、errgroupを使用して10個のgoroutineを生成し、5番目のgoroutineでエラーを発生させるコードです。意図的に5番目のgoroutineでエラーを発生させ、エラーが発生するケースを示しました。ただし、実際に使用する際には、errgroupを使用してgoroutineを生成し、各goroutineでエラーが発生した場合に、様々な後処理を行う方法で使用します。
once
一度だけ実行されるべきコードを実行するツールです。以下のコンストラクタを通じて関連コードを実行できます。
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFuncは、単にその関数が全体を通して一度だけ実行されるようにします。
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() { // 一度だけ実行される関数を定義
7 println("Hello, World!") // "Hello, World!"を出力
8 })
9
10 once() // 関数を呼び出す
11 once() // 関数を呼び出す(実行されない)
12 once() // 関数を呼び出す(実行されない)
13 once() // 関数を呼び出す(実行されない)
14 once() // 関数を呼び出す(実行されない)
15}
上記のコードは、sync.OnceFuncを使用してHello, World!を出力するコードです。このコードでは、sync.OnceFuncを使用してonce関数を生成し、once関数を複数回呼び出してもHello, World!が一度だけ出力されます。
OnceValue
OnceValueは、単にその関数が全体を通して一度だけ実行されるだけでなく、その関数の戻り値を保存し、再度呼び出されたときに保存された値を返します。
1package main
2
3import "sync"
4
5func main() {
6 c := 0 // カウンター変数を初期化
7 once := sync.OnceValue(func() int { // 一度だけ実行され、値を返す関数を定義
8 c += 1 // カウンターをインクリメント
9 return c // カウンターの値を返す
10 })
11
12 println(once()) // 関数を呼び出し、戻り値を出力
13 println(once()) // 関数を呼び出し、戻り値を出力(初回呼び出しの結果が返される)
14 println(once()) // 関数を呼び出し、戻り値を出力(初回呼び出しの結果が返される)
15 println(once()) // 関数を呼び出し、戻り値を出力(初回呼び出しの結果が返される)
16 println(once()) // 関数を呼び出し、戻り値を出力(初回呼び出しの結果が返される)
17}
上記のコードは、sync.OnceValueを使用してc変数を1ずつ増加させるコードです。このコードでは、sync.OnceValueを使用してonce関数を生成し、once関数を複数回呼び出しても、c変数が一度だけ増加した1を返します。
OnceValues
OnceValuesはOnceValueと同様に動作しますが、複数の値を返すことができます。
1package main
2
3import "sync"
4
5func main() {
6 c := 0 // カウンター変数を初期化
7 once := sync.OnceValues(func() (int, int) { // 一度だけ実行され、複数の値を返す関数を定義
8 c += 1 // カウンターをインクリメント
9 return c, c // カウンターの値を2つ返す
10 })
11
12 a, b := once() // 関数を呼び出し、2つの戻り値を受け取る
13 println(a, b) // 戻り値を出力
14 a, b = once() // 関数を呼び出し、2つの戻り値を受け取る(初回呼び出しの結果が返される)
15 println(a, b) // 戻り値を出力
16 a, b = once() // 関数を呼び出し、2つの戻り値を受け取る(初回呼び出しの結果が返される)
17 println(a, b) // 戻り値を出力
18 a, b = once() // 関数を呼び出し、2つの戻り値を受け取る(初回呼び出しの結果が返される)
19 println(a, b) // 戻り値を出力
20 a, b = once() // 関数を呼び出し、2つの戻り値を受け取る(初回呼び出しの結果が返される)
21 println(a, b) // 戻り値を出力
22}
上記のコードは、sync.OnceValuesを使用してc変数を1ずつ増加させるコードです。このコードでは、sync.OnceValuesを使用してonce関数を生成し、once関数を複数回呼び出しても、c変数が一度だけ増加した1を返します。
atomic
atomicパッケージは、アトミックな操作を提供するパッケージです。atomicパッケージはAdd、CompareAndSwap、Load、Store、Swapなどのメソッドを提供しますが、最近ではInt64、Uint64、Pointerなどの型の使用が推奨されています。
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{} // WaitGroupを初期化
10 c := atomic.Int64{} // アトミックなInt64カウンターを初期化
11
12 for i := 0; i < 100 ; i++ { // 100回ループ
13 wg.Add(1) // WaitGroupのカウンターを1増やす
14 go func() {
15 defer wg.Done() // goroutine終了時にDoneを呼び出す
16 c.Add(1) // カウンターをアトミックに1増やす
17 }()
18 }
19
20 wg.Wait() // すべてのgoroutineが終了するまで待機
21 println(c.Load()) // カウンターの最終値を出力
22}
先ほど使用された例です。atomic.Int64型を使用してc変数をアトミックに増加させるコードです。AddメソッドとLoadメソッドで変数をアトミックに増加させ、変数を読み込むことができます。また、Storeメソッドで値を保存し、Swapメソッドで値を交換し、CompareAndSwapメソッドで値を比較後に適合すれば交換することができます。
cond
sync.Cond
condパッケージは条件変数を提供するパッケージです。condパッケージはsync.Condで生成でき、Wait、Signal、Broadcastメソッドを提供します。
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{}) // Mutexを基盤とするCondを作成
9 ready := false // 準備完了フラグ
10
11 go func() {
12 c.L.Lock() // CondのMutexをロック
13 ready = true // 準備完了フラグをtrueに設定
14 c.Signal() // 待機中のgoroutineにシグナルを送る
15 c.L.Unlock() // CondのMutexをアンロック
16 }()
17
18 c.L.Lock() // CondのMutexをロック
19 for !ready { // readyがtrueになるまでループ
20 c.Wait() // Condで待機し、Mutexを一時的に解放
21 }
22 c.L.Unlock() // CondのMutexをアンロック
23
24 println("Ready!") // "Ready!"を出力
25}
上記のコードは、sync.Condを使用してready変数がtrueになるまで待機するコードです。このコードでは、sync.Condを使用してready変数がtrueになるまで待機した後、Ready!と出力します。このようにsync.Condを使用すると、複数のgoroutineが同時に特定の条件を満たすまで待機させることができます。
これを活用して、簡単なqueueを実装できます。
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex // ミューテックス
15 Cond *sync.Cond // 条件変数
16 Head *Node[T] // 先頭ノード
17 Tail *Node[T] // 末尾ノード
18 Len int // キューの長さ
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{} // 新しいキューを作成
23 q.Cond = sync.NewCond(&q.Mutex) // キューのミューテックスを基盤とする条件変数を作成
24 return q // キューを返す
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock() // キューのミューテックスをロック
29 defer q.Unlock() // 関数終了時にミューテックスをアンロック
30
31 node := &Node[T]{Value: value} // 新しいノードを作成
32 if q.Len == 0 { // キューが空の場合
33 q.Head = node // 新しいノードを先頭に設定
34 q.Tail = node // 新しいノードを末尾に設定
35 } else { // キューが空でない場合
36 q.Tail.Next = node // 現在の末尾ノードの次を新しいノードに設定
37 q.Tail = node // 新しいノードを末尾に設定
38 }
39 q.Len++ // キューの長さを増やす
40 q.Cond.Signal() // 待機中のgoroutineにシグナルを送る
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock() // キューのミューテックスをロック
45 defer q.Unlock() // 関数終了時にミューテックスをアンロック
46
47 for q.Len == 0 { // キューが空の場合
48 q.Cond.Wait() // 条件変数で待機し、ミューテックスを一時的に解放
49 }
50
51 node := q.Head // 先頭ノードを取得
52 q.Head = q.Head.Next // 先頭ノードを次のノードに更新
53 q.Len-- // キューの長さを減らす
54 return node.Value // 先頭ノードの値を返す
55}
このようにsync.Condを活用すると、spin-lockで多くのCPU使用量を消費する代わりに、効率的に待機し、条件が満たされたら再び動作させることができます。
semaphore
golang.org/x/sync/semaphore.Semaphore
semaphoreパッケージはセマフォを提供するパッケージです。semaphoreパッケージはgolang.org/x/sync/semaphore.Semaphoreで生成でき、Acquire、Release、TryAcquireメソッドを提供します。
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1) // 重み1の新しいセマフォを作成
11
12 if s.TryAcquire(1) { // 重み1のトークンを試行的に取得
13 fmt.Println("Acquired!") // 取得成功の場合
14 } else {
15 fmt.Println("Not Acquired!") // 取得失敗の場合
16 }
17
18 s.Release(1) // 重み1のトークンを解放
19}
上記のコードは、semaphoreを使用してセマフォを生成し、セマフォを使用してAcquireメソッドでセマフォを獲得し、Releaseメソッドでセマフォを解放するコードです。このコードでは、semaphoreを使用してセマフォを獲得し、解放する方法を示しました。
終わりに
基本的な内容はここまであれば十分だと思います。この記事の内容に基づいて、皆さんがgoroutineを使用して並行処理を管理する方法を理解し、実際に使用できるようになることを願っています。この記事が皆さんのお役に立てれば幸いです。ありがとうございました。