Go 并发入门包
概述
简短介绍
Go 语言拥有许多用于并发管理的工具。本文将介绍其中的一些工具和技巧。
Goroutine?
goroutine 是 Go 语言支持的一种新型并发模型。通常,程序为了同时执行多个任务,会从操作系统获取操作系统线程,并以核心数量的并行方式执行任务。为了执行更小单位的并发,会在用户态创建绿色线程,以便在单个操作系统线程中运行多个绿色线程来执行任务。然而,goroutine 将这种形式的绿色线程做得更小、更高效。这些 goroutine 比线程占用更少的内存,并且可以比线程更快地创建和切换。
要使用 goroutine,只需使用 go 关键字即可。这使得在编写程序时,可以直观地将同步代码作为异步代码执行。
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch)
12 time.Sleep(1 * time.Second)
13 fmt.Println("Hello, World!")
14 }()
15
16 fmt.Println("Waiting for goroutine...")
17 for range ch {}
18}
这段代码简单地将一个等待 1 秒后输出 Hello, World! 的同步代码更改为异步流程。尽管现在的示例很简单,但如果将稍微复杂的同步代码更改为异步代码,代码的可读性、可见性和理解度将比现有的 async await 或 promise 等方式更好。
然而,在许多情况下,如果不能理解仅仅将同步代码异步调用的流程,以及诸如 fork & join 这样的流程(类似于分治法的流程),就会产生不好的 goroutine 代码。本文将介绍几种可以应对这种情况的方法和技巧。
并发管理
context
第一个管理技巧是 context,这可能有点出乎意料。但在 Go 语言中,context 不仅仅是一个简单的取消功能,它在管理整个任务树方面也发挥着卓越的作用。为了那些不了解的人,我将简单地介绍一下这个包。
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background())
5 defer cancel()
6
7 go func() {
8 <-ctx.Done()
9 fmt.Println("Context is done!")
10 }()
11
12 time.Sleep(1 * time.Second)
13
14 cancel()
15
16 time.Sleep(1 * time.Second)
17}
上面的代码使用 context,在 1 秒后输出 Context is done!。context 可以通过 Done() 方法检查是否被取消,并通过 WithCancel、WithTimeout、WithDeadline、WithValue 等方法提供各种取消方式。
让我们创建一个简单的示例。假设您正在编写代码,使用 aggregator 模式来获取数据,例如获取 user、post 和 comment。如果所有请求都必须在 2 秒内完成,您可以这样编写代码:
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
5 defer cancel()
6
7 ch := make(chan struct{})
8 go func() {
9 defer close(ch)
10 user := getUser(ctx)
11 post := getPost(ctx)
12 comment := getComment(ctx)
13
14 fmt.Println(user, post, comment)
15 }()
16
17 select {
18 case <-ctx.Done():
19 fmt.Println("Timeout!")
20 case <-ch:
21 fmt.Println("All data is fetched!")
22 }
23}
上面的代码如果在 2 秒内未能获取所有数据,则输出 Timeout!,如果获取了所有数据,则输出 All data is fetched!。通过这种方式使用 context,即使在多个 goroutine 运行的代码中,也可以轻松地管理取消和超时。
更多关于 context 的函数和方法可以在 godoc context 中查看。希望您能学习一些简单的用法并方便使用。
channel
unbuffered channel
channel 是用于 goroutine 之间通信的工具。可以使用 make(chan T) 创建 channel。其中,T 是该 channel 将要传递的数据的类型。channel 可以使用 <- 符号发送和接收数据,并可以使用 close 关闭 channel。
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
上面的代码使用 channel 输出 1 和 2。这段代码仅仅展示了向 channel 发送和接收值的简单用法。但是,channel 提供了更多的功能。首先,让我们了解一下 buffered channel 和 unbuffered channel。在开始之前,上面编写的示例是一个 unbuffered channel,这意味着向 channel 发送数据和接收数据的动作必须同时发生。如果这些动作没有同时发生,则可能会发生死锁。
buffered channel
如果上面的代码不是简单的输出,而是两个执行繁重任务的进程,会怎么样呢?如果第二个进程在读取和处理时出现长时间的停顿,那么第一个进程也会在该时间内停止。为了防止这种情况,我们可以使用 buffered channel。
1package main
2
3func main() {
4 ch := make(chan int, 2)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
上面的代码使用 buffered channel 输出 1 和 2。在这段代码中,我们使用 buffered channel,使得向 channel 发送数据和接收数据的动作不必同时发生。这样在 channel 中设置缓冲区后,会产生相应长度的余量,从而可以防止由于后续任务的影响而导致的任务延迟。
select
在处理多个 channel 时,可以使用 select 语法轻松实现 fan-in 结构。
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10)
10 ch2 := make(chan int, 10)
11 ch3 := make(chan int, 10)
12
13 go func() {
14 for {
15 ch1 <- 1
16 time.Sleep(1 * time.Second)
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2
22 time.Sleep(2 * time.Second)
23 }
24 }()
25 go func() {
26 for {
27 ch3 <- 3
28 time.Sleep(3 * time.Second)
29 }
30 }()
31
32 for i := 0; i < 3; i++ {
33 select {
34 case v := <-ch1:
35 fmt.Println(v)
36 case v := <-ch2:
37 fmt.Println(v)
38 case v := <-ch3:
39 fmt.Println(v)
40 }
41 }
42}
上面的代码创建了三个定期传递 1、2 和 3 的 channel,并使用 select 从 channel 接收值并输出。通过这种方式使用 select,可以在从多个 channel 同时接收数据的同时,按照从 channel 接收到的值的顺序进行处理。
for range
可以使用 for range 轻松地从 channel 接收数据。如果在 channel 上使用 for range,则每次向该 channel 添加数据时都会执行操作,并且当 channel 关闭时循环终止。
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
上面的代码使用 channel 输出 1 和 2。这段代码使用 for range,每次向 channel 添加数据时接收并输出数据。当 channel 关闭时循环终止。
正如上面多次编写的那样,这个语法也可以用作简单的同步手段。
1package main
2
3func main() {
4 ch := make(chan struct{})
5 go func() {
6 defer close(ch)
7 time.Sleep(1 * time.Second)
8 fmt.Println("Hello, World!")
9 }()
10
11 fmt.Println("Waiting for goroutine...")
12 for range ch {}
13}
上面的代码在等待 1 秒后输出 Hello, World!。在这段代码中,我们使用 channel 将同步代码更改为异步代码。通过这种方式使用 channel,可以轻松地将同步代码更改为异步代码,并设置 join 点。
etc
- 如果向 nil channel 发送或接收数据,可能会陷入无限循环并发生死锁。
- 如果关闭 channel 后再发送数据,会发生 panic。
- 即使不关闭 channel,GC 也会在回收时关闭 channel。
mutex
spinlock
spinlock 是一种通过循环不断尝试获取锁的同步方法。在 Go 语言中,可以使用指针轻松实现自旋锁。
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14 runtime.Gosched()
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{}
24}
上面的代码实现了 spinlock 包。这段代码使用 sync/atomic 包实现了 SpinLock。Lock 方法使用 atomic.CompareAndSwapUintptr 尝试获取锁,Unlock 方法使用 atomic.StoreUintptr 释放锁。这种方法会不断尝试获取锁,因此会持续占用 CPU 直到获取到锁,从而可能陷入无限循环。因此,spinlock 最好用于简单的同步,或者仅在短时间内使用。
sync.Mutex
mutex 是用于 goroutine 之间同步的工具。sync 包提供的 mutex 提供了 Lock、Unlock、RLock 和 RUnlock 等方法。可以使用 sync.Mutex 创建 mutex,也可以使用 sync.RWMutex 来使用读/写锁。
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex
9 var count int
10
11 go func() {
12 mu.Lock()
13 count++
14 mu.Unlock()
15 }()
16
17 mu.Lock()
18 count++
19 mu.Unlock()
20
21 println(count)
22}
在上面的代码中,两个 goroutine 几乎同时访问相同的 count 变量。此时,如果使用 mutex 将访问 count 变量的代码设为临界区,就可以防止对 count 变量的并发访问。那么,这段代码无论执行多少次都会输出 2。
sync.RWMutex
sync.RWMutex 是一个可以区分读锁和写锁的 mutex。可以使用 RLock 和 RUnlock 方法来获取和释放读锁。
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex
9 data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock()
14 defer m.RUnlock()
15
16 value, ok := m.data[key]
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock()
22 defer m.Unlock()
23
24 m.data[key] = value
25}
上面的代码使用 sync.RWMutex 实现了一个 ConcurrentMap。这段代码在 Get 方法中获取读锁,在 Set 方法中获取写锁,从而安全地访问和修改 data map。之所以需要读锁,是因为在简单的读取操作较多的情况下,可以只获取读锁而不是写锁,从而允许多个 goroutine 同时执行读取操作。这样,在不需要更改状态的情况下,可以通过只获取读锁来提高性能。
fakelock
fakelock 是一个实现 sync.Locker 的简单技巧。这个结构体提供了与 sync.Mutex 相同的方法,但实际操作中什么都不做。
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}
上面的代码实现了 fakelock 包。这个包通过实现 sync.Locker 提供了 Lock 和 Unlock 方法,但实际上没有任何操作。为什么需要这段代码,如果以后有机会我会详细解释。
waitgroup
sync.WaitGroup
sync.WaitGroup 是一个等待所有 goroutine 完成工作的工具。它提供 Add、Done 和 Wait 方法。Add 方法用于增加 goroutine 的数量,Done 方法用于通知 goroutine 的工作已完成,Wait 方法用于等待所有 goroutine 的工作完成。
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
上述代码利用 sync.WaitGroup 来实现 100 个 Go 协程同时向 c 变量累加数值的功能。该代码使用 sync.WaitGroup 来等待所有 Go 协程执行完毕,随后输出 c 变量的累加值。对于少量 fork & join 操作,仅使用通道即可满足需求;然而,当需要 fork & join 大量操作时,采用 sync.WaitGroup 亦是一种良好的选择。
with slice
若与切片一同使用,waitgroup 可以作为一种无需锁机制的有效工具,用于管理并发执行的任务。
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup
11 arr := [10]int{}
12
13 for i := 0; i < 10; i++ {
14 wg.Add(1)
15 go func(id int) {
16 defer wg.Done()
17
18 arr[id] = rand.Intn(100)
19 }(i)
20 }
21
22 wg.Wait()
23 fmt.Println("Done")
24
25 for i, v := range arr {
26 fmt.Printf("arr[%d] = %d\n", i, v)
27 }
28}
上述代码仅使用 waitgroup,使得每个 Go 协程能够并发地生成 10 个随机整数,并将它们存储到各自对应的索引位置。在此代码中,使用 waitgroup 等待所有 Go 协程执行完毕后,输出 Done。通过这种方式使用 waitgroup,可以实现多个 Go 协程并发执行任务,并在无需锁机制的情况下存储数据,以及在所有任务完成后统一进行后续处理。
golang.org/x/sync/errgroup.ErrGroup
errgroup 是对 sync.WaitGroup 的扩展包。与 sync.WaitGroup 不同,errgroup 在任何一个 Go 协程执行出错时,会取消所有 Go 协程并返回错误信息。
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background())
11 _ = ctx
12
13 for i := 0; i < 10; i++ {
14 i := i
15 g.Go(func() error {
16 if i == 5 {
17 return fmt.Errorf("error")
18 }
19 return nil
20 })
21 }
22
23 if err := g.Wait(); err != nil {
24 fmt.Println(err)
25 }
26}
上述代码利用 errgroup 创建了 10 个 Go 协程,并在第 5 个 Go 协程中引发错误。此处有意地在第 5 个 Go 协程中引发错误,旨在展示发生错误的情况。在实际应用中,应使用 errgroup 创建 Go 协程,并针对每个 Go 协程可能产生的错误进行相应的后续处理。
once
用于执行仅需运行一次的代码的工具。可以通过以下构造函数执行相关代码。
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFunc 确保某个函数在整个程序生命周期中仅被执行一次。
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() {
7 println("Hello, World!")
8 })
9
10 once()
11 once()
12 once()
13 once()
14 once()
15}
上述代码使用 sync.OnceFunc 输出 Hello, World!。这段代码中,sync.OnceFunc 用于创建一个 once 函数,尽管多次调用 once 函数,Hello, World! 也只会被输出一次。
OnceValue
OnceValue 不仅确保函数在整个程序生命周期中仅被执行一次,还会存储该函数的返回值,并在后续调用时返回已存储的值。
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValue(func() int {
8 c += 1
9 return c
10 })
11
12 println(once())
13 println(once())
14 println(once())
15 println(once())
16 println(once())
17}
上述代码使用 sync.OnceValue 来递增 c 变量的值。在此代码中,sync.OnceValue 创建了一个 once 函数,即使多次调用 once 函数,c 变量也只会被递增一次,并返回首次递增后的值 1。
OnceValues
OnceValues 的工作原理与 OnceValue 相同,但它能够返回多个值。
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValues(func() (int, int) {
8 c += 1
9 return c, c
10 })
11
12 a, b := once()
13 println(a, b)
14 a, b = once()
15 println(a, b)
16 a, b = once()
17 println(a, b)
18 a, b = once()
19 println(a, b)
20 a, b = once()
21 println(a, b)
22}
上述代码使用 sync.OnceValues 来递增 c 变量的值。在此代码中,sync.OnceValues 创建了一个 once 函数,即使多次调用 once 函数,c 变量也只会被递增一次,并返回首次递增后的值 1。
atomic
atomic 包提供原子操作。atomic 包提供了 Add、CompareAndSwap、Load、Store、Swap 等方法,但最近建议使用 Int64、Uint64、Pointer 等类型。
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
这是之前使用过的示例。这段代码使用 atomic.Int64 类型原子性地递增 c 变量的值。Add 方法和 Load 方法用于原子性地递增和读取变量。此外,Store 方法用于存储值,Swap 方法用于交换值,而 CompareAndSwap 方法用于比较值并在匹配时进行交换。
cond
sync.Cond
cond 包提供条件变量。可以使用 sync.Cond 创建 cond 包,并提供 Wait、Signal、Broadcast 方法。
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{})
9 ready := false
10
11 go func() {
12 c.L.Lock()
13 ready = true
14 c.Signal()
15 c.L.Unlock()
16 }()
17
18 c.L.Lock()
19 for !ready {
20 c.Wait()
21 }
22 c.L.Unlock()
23
24 println("Ready!")
25}
上述代码使用 sync.Cond 等待 ready 变量变为 true。在此代码中,sync.Cond 用于等待 ready 变量变为 true,然后输出 Ready!。通过这种方式使用 sync.Cond,可以使多个 Go 协程在满足特定条件时同时等待。
利用这一点可以实现一个简单的 queue。
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex
15 Cond *sync.Cond
16 Head *Node[T]
17 Tail *Node[T]
18 Len int
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{}
23 q.Cond = sync.NewCond(&q.Mutex)
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock()
29 defer q.Unlock()
30
31 node := &Node[T]{Value: value}
32 if q.Len == 0 {
33 q.Head = node
34 q.Tail = node
35 } else {
36 q.Tail.Next = node
37 q.Tail = node
38 }
39 q.Len++
40 q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock()
45 defer q.Unlock()
46
47 for q.Len == 0 {
48 q.Cond.Wait()
49 }
50
51 node := q.Head
52 q.Head = q.Head.Next
53 q.Len--
54 return node.Value
55}
通过这种方式使用 sync.Cond,可以有效地等待,并在条件满足时再次运行,而无需像 spin-lock 那样消耗大量的 CPU 资源。
semaphore
golang.org/x/sync/semaphore.Semaphore
semaphore 包提供信号量。可以使用 golang.org/x/sync/semaphore.Semaphore 创建 semaphore 包,并提供 Acquire、Release、TryAcquire 方法。
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1)
11
12 if s.TryAcquire(1) {
13 fmt.Println("Acquired!")
14 } else {
15 fmt.Println("Not Acquired!")
16 }
17
18 s.Release(1)
19}
上述代码使用 semaphore 创建信号量,并通过 Acquire 方法获取信号量,以及通过 Release 方法释放信号量。此代码展示了如何使用 semaphore 获取和释放信号量。
总结
基本内容到此为止。希望在本文的基础上,各位能够理解如何使用 Go 协程管理并发,并实际应用。如果本文对各位有所帮助,我将不胜荣幸。谢谢。