GoSuda

Go 並行性スターターパック

By snowmerak
views ...

概要

簡単な紹介

Go言語には、多くの並行性管理のためのツールがあります。この記事では、その一部とコツを紹介します。

ゴルーチンとは?

goroutineは、Go言語でサポートされている新しい形式の並行性モデルです。一般的に、プログラムは複数のタスクを同時に実行するためにOSからOSスレッドを取得し、コア数に応じて並列にタスクを実行します。そして、より小さな単位の並行性を実現するためには、ユーザーランドでグリーンスレッドを生成し、1つのOSスレッド内で複数のグリーンスレッドが交代でタスクを実行するようにします。しかし、ゴルーチンの場合、このような形式のグリーンスレッドをさらに小さく効率的にしました。これらのゴルーチンはスレッドよりも使用するメモリが少なく、スレッドよりも迅速に生成および切り替えが可能です。

ゴルーチンを使用するには、単に go キーワードを使用するだけで済みます。これにより、プログラムを作成する過程で、同期コードを非同期コードとして直感的に実行できます。

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("Hello, World!")
14    }()
15
16    fmt.Println("Waiting for goroutine...")
17    for range ch {}
18}

このコードは、簡単に1秒間待機してから Hello, World! を出力する同期コードを非同期フローに変更します。今の例は簡単ですが、少し複雑なコードを同期コードから非同期コードに変更すると、コードの可読性、可視性、理解度が既存の async await や promise のような方式よりもさらに向上します。

ただし、多くの場合、このような同期コードを単純に非同期で呼び出すフローと fork & join のようなフロー(まるで分割統治法に似たフロー)を理解しない状態では、良くないゴルーチンコードが作られることもあります。このような場合に備えることができるいくつかの方法とテクニックを、この記事で紹介します。

並行性管理

context

最初の管理テクニックとして context が登場するのは意外かもしれません。しかし、Go言語では context は単純なキャンセル機能を超えて、全体の作業ツリーを管理する上で優れた役割を果たします。もしご存知ない方のために、簡単にこのパッケージを説明します。

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("Context is done!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

上記のコードは、context を使用して、1秒後に Context is done! を出力するコードです。contextDone() メソッドを通じてキャンセル状態を確認でき、WithCancelWithTimeoutWithDeadlineWithValue などのメソッドを通じてさまざまなキャンセル方法を提供します。

簡単な例を作成してみましょう。もし皆さんが、何らかのデータを取得するために aggregator パターンを使用して、userpostcomment を取得するコードを作成すると仮定します。そして、すべてのリクエストが2秒以内に行われる必要がある場合、以下のように記述できます。

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("Timeout!")
20    case <-ch:
21        fmt.Println("All data is fetched!")
22    }
23}

上記のコードは、2秒以内にすべてのデータを取得できない場合、Timeout! を出力し、すべてのデータを取得した場合は All data is fetched! を出力します。このような方法で context を使用すると、複数のゴルーチンが動作するコードでも、キャンセルとタイムアウトを簡単に管理できます。

これに関連するさまざまな context 関連の関数とメソッドは、godoc context で確認できます。簡単なものは学習して、便利に利用できるようになっていただければと思います。

channel

unbuffered channel

channel は、ゴルーチン間の通信のためのツールです。channelmake(chan T) で作成できます。このとき、T は、その channel が受け渡すデータの型です。channel<- でデータの送受信が可能で、closechannel を閉じることができます。

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

上記のコードは、channel を使用して1と2を出力するコードです。このコードでは、単純に channel に値を送受信することだけを示しています。しかし、channel はこれよりも多くの機能を提供します。まず、buffered channelunbuffered channel について見ていきましょう。始める前に、上に書かれた例は unbuffered channel であり、チャネルにデータを送信する動作とデータを受信する動作が同時に行われる必要があります。もしこれらの動作が同時に行われない場合、デッドロックが発生する可能性があります。

buffered channel

もし上記のコードが単純な出力ではなく、負荷の高い作業を行う2つのプロセスだったらどうでしょうか?2番目のプロセスが読み込んで処理を実行中に長時間ハングアップした場合、1番目のプロセスもその時間の間停止することになります。私たちはこのような状況を防ぐために buffered channel を使用できます。

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

上記のコードは、buffered channel を使用して1と2を出力するコードです。このコードでは、buffered channel を使用して、channel にデータを送信する動作とデータを受信する動作が同時に行われなくても良いようにしました。このようにチャネルにバッファを持たせることで、その長さ分の余裕が生まれ、後続の作業の影響によって発生する作業の遅延を防ぐことができます。

select

複数のチャネルを扱う際に、select 構文を使用すると、簡単に fan-in 構造を実装できます。

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

上記のコードは、周期的に1、2、3を送信する3つのチャネルを作成し、select を使用してチャネルから値を受信して出力するコードです。このような方法で select を使用すると、複数のチャネルから同時にデータを受信しながら、チャネルから値を受け取るごとに処理できます。

for range

channelfor range を使用して簡単にデータを受信できます。for range をチャネルに使用すると、そのチャネルにデータが追加されるたびに動作し、チャネルが閉じられるとループを終了します。

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

上記のコードは、channel を使用して1と2を出力するコードです。このコードでは、for range を使用してチャネルにデータが追加されるたびにデータを受信して出力します。そして、チャネルが閉じられるとループを終了します。

上に何度か書いたように、この構文は単純な同期手段にも使用できます。

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("Hello, World!")
 9    }()
10
11    fmt.Println("Waiting for goroutine...")
12    for range ch {}
13}

上記のコードは、1秒待機してから Hello, World! を出力するコードです。このコードでは channel を使用して、同期コードを非同期コードに変更しました。このような方法で channel を使用すると、同期コードを非同期コードに簡単に変更し、join ポイントを設定できます。

etc

  1. nilチャネルにデータを送信または受信すると、無限ループに陥りデッドロックが発生する可能性があります。
  2. チャネルを閉じた後にデータを送信すると、panicが発生します。
  3. チャネルを特に閉じなくても、GCが回収する際にチャネルを閉じます。

mutex

spinlock

spinlock は、ループを回しながらロックを試み続ける同期方法です。Go言語ではポインターを使用して簡単にスピンロックを実装できます。

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

上記のコードは、spinlock パッケージを実装したコードです。このコードでは、sync/atomic パッケージを使用して SpinLock を実装しました。Lock メソッドでは atomic.CompareAndSwapUintptr を使用してロックを試み、Unlock メソッドでは atomic.StoreUintptr を使用してロックを解除します。この方式は、休むことなくロックを試みるため、ロックを取得するまでCPUを継続的に使用することになり、無限ループに陥る可能性があります。したがって、spinlock は単純な同期に使用するか、短時間のみ使用する場合に使用するのが適切です。

sync.Mutex

mutex はゴルーチン間の同期のためのツールです。sync パッケージが提供する mutex は、LockUnlockRLockRUnlock などのメソッドを提供します。mutexsync.Mutex で作成でき、sync.RWMutex で読み取り/書き込みロックを使用することもできます。

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

上記のコードでは、ほぼ同時に2つのゴルーチンが同じ count 変数にアクセスします。このとき、mutex を使用して count 変数にアクセスするコードを臨界領域にすると、count 変数への同時アクセスを防ぐことができます。そうすれば、このコードは何回実行しても同じく 2 を出力するようになります。

sync.RWMutex

sync.RWMutex は読み取りロックと書き込みロックを区別して使用できる mutex です。RLockRUnlock メソッドを使用して、読み取りロックをかけて解除できます。

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

上記のコードは、sync.RWMutex を使用して ConcurrentMap を実装したコードです。このコードでは、Get メソッドで読み取りロックをかけ、Set メソッドで書き込みロックをかけて data マップに安全にアクセスし、修正できるようにしています。読み取りロックが必要な理由は、単純な読み取り作業が多い場合、書き込みロックをかけずに読み取りロックだけをかけて、複数のゴルーチンが同時に読み取り作業を実行できるようにするためです。これにより、状態の変更がなく、書き込みロックをかける必要がない場合には、読み取りロックだけをかけて性能を向上させることができます。

fakelock

fakelocksync.Locker を実装する簡単なトリックです。この構造体は sync.Mutex と同じメソッドを提供しますが、実際には動作しません。

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

上記のコードは、fakelock パッケージを実装したコードです。このパッケージは sync.Locker を実装して LockUnlock メソッドを提供しますが、実際には何も動作しません。なぜこのようなコードが必要なのかは、機会があれば述べます。

waitgroup

sync.WaitGroup

sync.WaitGroup は、ゴルーチンの作業がすべて完了するまで待機するためのツールです。AddDoneWait メソッドを提供し、Add メソッドでゴルーチンの数を追加し、Done メソッドでゴルーチンの作業が完了したことを知らせます。そして、Wait メソッドですべてのゴルーチンの作業が完了するまで待機します。

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

上記のコードは、sync.WaitGroupを使用して100個のゴルーチンが同時にc変数に値を加算するコードです。このコードでは、sync.WaitGroupを使用してすべてのゴルーチンが終了するまで待機した後、c変数に加算された値を出力します。単純にいくつかの作業をfork & joinする場合、チャネルのみを使用しても十分ですが、大量の作業をfork & joinする場合は、sync.WaitGroupを使用することも良い選択肢です。

with slice

スライスと共に使用する場合、waitgroupはロックなしで優れた同時実行作業を管理するツールになり得ます。

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	arr := [10]int{}
12
13	for i := 0; i < 10; i++ {
14		wg.Add(1)
15		go func(id int) {
16			defer wg.Done()
17
18			arr[id] = rand.Intn(100)
19		}(i)
20	}
21
22	wg.Wait()
23	fmt.Println("Done")
24
25    for i, v := range arr {
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

上記のコードは、waitgroupのみを使用して各ゴルーチンが同時に10個のランダムな整数を生成し、割り当てられたインデックスに保存するコードです。このコードでは、waitgroupを使用してすべてのゴルーチンが終了するまで待機した後、Doneを出力します。このような方法でwaitgroupを使用すると、複数のゴルーチンが同時に作業を実行し、すべてのゴルーチンが終了するまでロックなしでデータを保存し、作業終了後に一括して後処理を実行できます。

golang.org/x/sync/errgroup.ErrGroup

errgroupは、sync.WaitGroupを拡張したパッケージです。errgroupsync.WaitGroupとは異なり、ゴルーチンの作業の1つでもエラーが発生すると、すべてのゴルーチンをキャンセルしてエラーを返します。

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}

上記のコードは、errgroupを使用して10個のゴルーチンを生成し、5番目のゴルーチンでエラーを発生させるコードです。意図的に5番目のゴルーチンでエラーを発生させ、エラーが発生する場合を示しました。ただし、実際に使用する際には、errgroupを使用してゴルーチンを生成し、各ゴルーチンでエラーが発生する場合について、さまざまな後処理を行う方法で使用できます。

once

一度だけ実行される必要があるコードを実行するツールです。以下の生成子を通じて関連コードを実行できます。

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFuncは、単純に該当関数が全体にわたって1回だけ実行できるようにします。

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}

上記のコードは、sync.OnceFuncを使用してHello, World!を出力するコードです。このコードでは、sync.OnceFuncを使用してonce関数を生成し、once関数を複数回呼び出してもHello, World!が1回だけ出力されます。

OnceValue

OnceValueは、単純に該当関数が全体にわたって1回だけ実行されるだけでなく、該当関数の戻り値を保存して、再度呼び出すときに保存された値を返します。

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}

上記のコードは、sync.OnceValueを使用してc変数を1ずつ増加させるコードです。このコードでは、sync.OnceValueを使用してonce関数を生成し、once関数を複数回呼び出してもc変数が1回だけ増加した1を返します。

OnceValues

OnceValuesは、OnceValueと同じように動作しますが、複数の値を返すことができます。

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}

上記のコードは、sync.OnceValuesを使用してc変数を1ずつ増加させるコードです。このコードでは、sync.OnceValuesを使用してonce関数を生成し、once関数を複数回呼び出してもc変数が1回だけ増加した1を返します。

atomic

atomicパッケージは、アトミックな演算を提供するパッケージです。atomicパッケージは、AddCompareAndSwapLoadStoreSwapなどのメソッドを提供しますが、最近ではInt64Uint64Pointerなどのタイプの使用が推奨されています。

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

以前使用された例です。atomic.Int64タイプを使用してc変数をアトミックに増加させるコードです。AddメソッドとLoadメソッドで、アトミックに変数を増加させ、変数を読み取ることができます。また、Storeメソッドで値を保存し、Swapメソッドで値を交換し、CompareAndSwapメソッドで値を比較して適合すれば交換できます。

cond

sync.Cond

condパッケージは、条件変数を提供するパッケージです。condパッケージはsync.Condで生成でき、WaitSignalBroadcastメソッドを提供します。

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}

上記のコードは、sync.Condを使用してready変数がtrueになるまで待機するコードです。このコードでは、sync.Condを使用してready変数がtrueになるまで待機した後、Ready!を出力します。このような方法でsync.Condを使用すると、複数のゴルーチンが同時に特定の条件を満たすまで待機させることができます。

これを活用して、簡単なqueueを実装できます。

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

このようにsync.Condを活用すると、spin-lockで多くのCPU使用量を使用する代わりに、効率的に待機し、条件が満たされると再び動作できます。

semaphore

golang.org/x/sync/semaphore.Semaphore

semaphoreパッケージは、セマフォを提供するパッケージです。semaphoreパッケージは、golang.org/x/sync/semaphore.Semaphoreで生成でき、AcquireReleaseTryAcquireメソッドを提供します。

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}

上記のコードは、semaphoreを使用してセマフォを生成し、セマフォを使用してAcquireメソッドでセマフォを取得し、Releaseメソッドでセマフォを解放するコードです。このコードでは、semaphoreを使用してセマフォを取得および解放する方法を示しました。

最後に

基本的な内容はここまでで十分だと思います。この記事の内容に基づいて、皆様がゴルーチンを使用して同時性を管理する方法を理解し、実際に使用できるようになることを願っています。この記事が皆様のお役に立てば幸いです。ありがとうございました。