GoSuda

Go 并发入门包

By snowmerak
views ...

概述

简短介绍

Go 语言拥有许多用于并发管理的工具。本文将介绍其中的一些工具和技巧。

Goroutine?

goroutine 是 Go 语言支持的一种新型并发模型。通常,程序为了同时执行多个任务,会从操作系统获取操作系统线程,并以核心数量的并行方式执行任务。为了执行更小单位的并发,会在用户态创建绿色线程,以便在单个操作系统线程中运行多个绿色线程来执行任务。然而,goroutine 将这种形式的绿色线程做得更小、更高效。这些 goroutine 比线程占用更少的内存,并且可以比线程更快地创建和切换。

要使用 goroutine,只需使用 go 关键字即可。这使得在编写程序时,可以直观地将同步代码作为异步代码执行。

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("Hello, World!")
14    }()
15
16    fmt.Println("Waiting for goroutine...")
17    for range ch {}
18}

这段代码简单地将一个等待 1 秒后输出 Hello, World! 的同步代码更改为异步流程。尽管现在的示例很简单,但如果将稍微复杂的同步代码更改为异步代码,代码的可读性、可见性和理解度将比现有的 async await 或 promise 等方式更好。

然而,在许多情况下,如果不能理解仅仅将同步代码异步调用的流程,以及诸如 fork & join 这样的流程(类似于分治法的流程),就会产生不好的 goroutine 代码。本文将介绍几种可以应对这种情况的方法和技巧。

并发管理

context

第一个管理技巧是 context,这可能有点出乎意料。但在 Go 语言中,context 不仅仅是一个简单的取消功能,它在管理整个任务树方面也发挥着卓越的作用。为了那些不了解的人,我将简单地介绍一下这个包。

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("Context is done!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

上面的代码使用 context,在 1 秒后输出 Context is done!context 可以通过 Done() 方法检查是否被取消,并通过 WithCancelWithTimeoutWithDeadlineWithValue 等方法提供各种取消方式。

让我们创建一个简单的示例。假设您正在编写代码,使用 aggregator 模式来获取数据,例如获取 userpostcomment。如果所有请求都必须在 2 秒内完成,您可以这样编写代码:

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("Timeout!")
20    case <-ch:
21        fmt.Println("All data is fetched!")
22    }
23}

上面的代码如果在 2 秒内未能获取所有数据,则输出 Timeout!,如果获取了所有数据,则输出 All data is fetched!。通过这种方式使用 context,即使在多个 goroutine 运行的代码中,也可以轻松地管理取消和超时。

更多关于 context 的函数和方法可以在 godoc context 中查看。希望您能学习一些简单的用法并方便使用。

channel

unbuffered channel

channel 是用于 goroutine 之间通信的工具。可以使用 make(chan T) 创建 channel。其中,T 是该 channel 将要传递的数据的类型。channel 可以使用 <- 符号发送和接收数据,并可以使用 close 关闭 channel

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

上面的代码使用 channel 输出 1 和 2。这段代码仅仅展示了向 channel 发送和接收值的简单用法。但是,channel 提供了更多的功能。首先,让我们了解一下 buffered channelunbuffered channel。在开始之前,上面编写的示例是一个 unbuffered channel,这意味着向 channel 发送数据和接收数据的动作必须同时发生。如果这些动作没有同时发生,则可能会发生死锁。

buffered channel

如果上面的代码不是简单的输出,而是两个执行繁重任务的进程,会怎么样呢?如果第二个进程在读取和处理时出现长时间的停顿,那么第一个进程也会在该时间内停止。为了防止这种情况,我们可以使用 buffered channel

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

上面的代码使用 buffered channel 输出 1 和 2。在这段代码中,我们使用 buffered channel,使得向 channel 发送数据和接收数据的动作不必同时发生。这样在 channel 中设置缓冲区后,会产生相应长度的余量,从而可以防止由于后续任务的影响而导致的任务延迟。

select

在处理多个 channel 时,可以使用 select 语法轻松实现 fan-in 结构。

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

上面的代码创建了三个定期传递 1、2 和 3 的 channel,并使用 select 从 channel 接收值并输出。通过这种方式使用 select,可以在从多个 channel 同时接收数据的同时,按照从 channel 接收到的值的顺序进行处理。

for range

可以使用 for range 轻松地从 channel 接收数据。如果在 channel 上使用 for range,则每次向该 channel 添加数据时都会执行操作,并且当 channel 关闭时循环终止。

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

上面的代码使用 channel 输出 1 和 2。这段代码使用 for range,每次向 channel 添加数据时接收并输出数据。当 channel 关闭时循环终止。

正如上面多次编写的那样,这个语法也可以用作简单的同步手段。

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("Hello, World!")
 9    }()
10
11    fmt.Println("Waiting for goroutine...")
12    for range ch {}
13}

上面的代码在等待 1 秒后输出 Hello, World!。在这段代码中,我们使用 channel 将同步代码更改为异步代码。通过这种方式使用 channel,可以轻松地将同步代码更改为异步代码,并设置 join 点。

etc

  1. 如果向 nil channel 发送或接收数据,可能会陷入无限循环并发生死锁。
  2. 如果关闭 channel 后再发送数据,会发生 panic。
  3. 即使不关闭 channel,GC 也会在回收时关闭 channel。

mutex

spinlock

spinlock 是一种通过循环不断尝试获取锁的同步方法。在 Go 语言中,可以使用指针轻松实现自旋锁。

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

上面的代码实现了 spinlock 包。这段代码使用 sync/atomic 包实现了 SpinLockLock 方法使用 atomic.CompareAndSwapUintptr 尝试获取锁,Unlock 方法使用 atomic.StoreUintptr 释放锁。这种方法会不断尝试获取锁,因此会持续占用 CPU 直到获取到锁,从而可能陷入无限循环。因此,spinlock 最好用于简单的同步,或者仅在短时间内使用。

sync.Mutex

mutex 是用于 goroutine 之间同步的工具。sync 包提供的 mutex 提供了 LockUnlockRLockRUnlock 等方法。可以使用 sync.Mutex 创建 mutex,也可以使用 sync.RWMutex 来使用读/写锁。

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

在上面的代码中,两个 goroutine 几乎同时访问相同的 count 变量。此时,如果使用 mutex 将访问 count 变量的代码设为临界区,就可以防止对 count 变量的并发访问。那么,这段代码无论执行多少次都会输出 2

sync.RWMutex

sync.RWMutex 是一个可以区分读锁和写锁的 mutex。可以使用 RLockRUnlock 方法来获取和释放读锁。

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

上面的代码使用 sync.RWMutex 实现了一个 ConcurrentMap。这段代码在 Get 方法中获取读锁,在 Set 方法中获取写锁,从而安全地访问和修改 data map。之所以需要读锁,是因为在简单的读取操作较多的情况下,可以只获取读锁而不是写锁,从而允许多个 goroutine 同时执行读取操作。这样,在不需要更改状态的情况下,可以通过只获取读锁来提高性能。

fakelock

fakelock 是一个实现 sync.Locker 的简单技巧。这个结构体提供了与 sync.Mutex 相同的方法,但实际操作中什么都不做。

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

上面的代码实现了 fakelock 包。这个包通过实现 sync.Locker 提供了 LockUnlock 方法,但实际上没有任何操作。为什么需要这段代码,如果以后有机会我会详细解释。

waitgroup

sync.WaitGroup

sync.WaitGroup 是一个等待所有 goroutine 完成工作的工具。它提供 AddDoneWait 方法。Add 方法用于增加 goroutine 的数量,Done 方法用于通知 goroutine 的工作已完成,Wait 方法用于等待所有 goroutine 的工作完成。

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

上述代码利用 sync.WaitGroup 来实现 100 个 Go 协程同时向 c 变量累加数值的功能。该代码使用 sync.WaitGroup 来等待所有 Go 协程执行完毕,随后输出 c 变量的累加值。对于少量 fork & join 操作,仅使用通道即可满足需求;然而,当需要 fork & join 大量操作时,采用 sync.WaitGroup 亦是一种良好的选择。

with slice

若与切片一同使用,waitgroup 可以作为一种无需锁机制的有效工具,用于管理并发执行的任务。

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	arr := [10]int{}
12
13	for i := 0; i < 10; i++ {
14		wg.Add(1)
15		go func(id int) {
16			defer wg.Done()
17
18			arr[id] = rand.Intn(100)
19		}(i)
20	}
21
22	wg.Wait()
23	fmt.Println("Done")
24
25    for i, v := range arr {
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

上述代码仅使用 waitgroup,使得每个 Go 协程能够并发地生成 10 个随机整数,并将它们存储到各自对应的索引位置。在此代码中,使用 waitgroup 等待所有 Go 协程执行完毕后,输出 Done。通过这种方式使用 waitgroup,可以实现多个 Go 协程并发执行任务,并在无需锁机制的情况下存储数据,以及在所有任务完成后统一进行后续处理。

golang.org/x/sync/errgroup.ErrGroup

errgroup 是对 sync.WaitGroup 的扩展包。与 sync.WaitGroup 不同,errgroup 在任何一个 Go 协程执行出错时,会取消所有 Go 协程并返回错误信息。

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}

上述代码利用 errgroup 创建了 10 个 Go 协程,并在第 5 个 Go 协程中引发错误。此处有意地在第 5 个 Go 协程中引发错误,旨在展示发生错误的情况。在实际应用中,应使用 errgroup 创建 Go 协程,并针对每个 Go 协程可能产生的错误进行相应的后续处理。

once

用于执行仅需运行一次的代码的工具。可以通过以下构造函数执行相关代码。

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc 确保某个函数在整个程序生命周期中仅被执行一次。

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}

上述代码使用 sync.OnceFunc 输出 Hello, World!。这段代码中,sync.OnceFunc 用于创建一个 once 函数,尽管多次调用 once 函数,Hello, World! 也只会被输出一次。

OnceValue

OnceValue 不仅确保函数在整个程序生命周期中仅被执行一次,还会存储该函数的返回值,并在后续调用时返回已存储的值。

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}

上述代码使用 sync.OnceValue 来递增 c 变量的值。在此代码中,sync.OnceValue 创建了一个 once 函数,即使多次调用 once 函数,c 变量也只会被递增一次,并返回首次递增后的值 1。

OnceValues

OnceValues 的工作原理与 OnceValue 相同,但它能够返回多个值。

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}

上述代码使用 sync.OnceValues 来递增 c 变量的值。在此代码中,sync.OnceValues 创建了一个 once 函数,即使多次调用 once 函数,c 变量也只会被递增一次,并返回首次递增后的值 1。

atomic

atomic 包提供原子操作。atomic 包提供了 AddCompareAndSwapLoadStoreSwap 等方法,但最近建议使用 Int64Uint64Pointer 等类型。

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

这是之前使用过的示例。这段代码使用 atomic.Int64 类型原子性地递增 c 变量的值。Add 方法和 Load 方法用于原子性地递增和读取变量。此外,Store 方法用于存储值,Swap 方法用于交换值,而 CompareAndSwap 方法用于比较值并在匹配时进行交换。

cond

sync.Cond

cond 包提供条件变量。可以使用 sync.Cond 创建 cond 包,并提供 WaitSignalBroadcast 方法。

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}

上述代码使用 sync.Cond 等待 ready 变量变为 true。在此代码中,sync.Cond 用于等待 ready 变量变为 true,然后输出 Ready!。通过这种方式使用 sync.Cond,可以使多个 Go 协程在满足特定条件时同时等待。

利用这一点可以实现一个简单的 queue

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

通过这种方式使用 sync.Cond,可以有效地等待,并在条件满足时再次运行,而无需像 spin-lock 那样消耗大量的 CPU 资源。

semaphore

golang.org/x/sync/semaphore.Semaphore

semaphore 包提供信号量。可以使用 golang.org/x/sync/semaphore.Semaphore 创建 semaphore 包,并提供 AcquireReleaseTryAcquire 方法。

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}

上述代码使用 semaphore 创建信号量,并通过 Acquire 方法获取信号量,以及通过 Release 方法释放信号量。此代码展示了如何使用 semaphore 获取和释放信号量。

总结

基本内容到此为止。希望在本文的基础上,各位能够理解如何使用 Go 协程管理并发,并实际应用。如果本文对各位有所帮助,我将不胜荣幸。谢谢。