Go 并发入门套件
概述
简短介绍
Go语言提供了多种并发管理工具。本文将介绍其中一部分工具及其技巧。
Goroutine?
Goroutine是Go语言支持的一种新型并发模型。通常情况下,程序会从操作系统获取OS线程以同时执行多个任务,并根据核心数量并行执行这些任务。为了实现更小粒度的并发,用户空间会创建Green Thread(绿色线程),让多个绿色线程在一个OS线程内轮流执行任务。然而,Goroutine使这种形式的绿色线程更加小巧高效。Goroutine比线程使用更少的内存,并且可以比线程更快地创建和切换。
要使用Goroutine,只需使用go关键字即可。这使得在编写程序时,能够直观地将同步代码作为异步代码执行。
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch) // 关闭channel
12 time.Sleep(1 * time.Second) // 等待1秒
13 fmt.Println("Hello, World!") // 打印"Hello, World!"
14 }()
15
16 fmt.Println("Waiting for goroutine...") // 打印"Waiting for goroutine..."
17 for range ch {} // 等待channel关闭
18}
这段代码简单地将一个等待1秒并输出“Hello, World!”的同步代码转换为异步流程。尽管此示例很简单,但将一些复杂的同步代码转换为异步代码时,其代码的可读性、可见性和理解性会比传统的async/await或Promise等方式更好。
然而,在许多情况下,如果未能理解这种简单地将同步代码异步调用的流程以及fork & join(类似于分治法)的流程,可能会编写出不理想的Goroutine代码。本文将介绍几种方法和技巧,以应对这些情况。
并发管理
context
将context作为第一项管理技术可能会出乎意料。然而,在Go语言中,context不仅仅提供了取消功能,它在管理整个任务树方面也发挥着卓越的作用。对于不熟悉此包的读者,我们将简要解释其功能。
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background()) // 创建一个可取消的context
5 defer cancel() // 确保在函数退出时调用cancel
6
7 go func() {
8 <-ctx.Done() // 等待context被取消
9 fmt.Println("Context is done!") // 打印"Context is done!"
10 }()
11
12 time.Sleep(1 * time.Second) // 等待1秒
13
14 cancel() // 取消context
15
16 time.Sleep(1 * time.Second) // 再次等待1秒
17}
上述代码使用context,在1秒后输出“Context is done!”。context可以通过Done()方法检查是否已取消,并通过WithCancel、WithTimeout、WithDeadline、WithValue等方法提供多种取消方式。
我们来创建一个简单的示例。假设您为了获取某些数据,使用aggregator模式编写代码来获取user、post和comment。如果所有请求都必须在2秒内完成,您可以按如下方式编写:
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second) // 创建一个2秒超时的context
5 defer cancel() // 确保在函数退出时调用cancel
6
7 ch := make(chan struct{}) // 创建一个channel
8 go func() {
9 defer close(ch) // 确保在函数退出时关闭channel
10 user := getUser(ctx) // 获取用户数据
11 post := getPost(ctx) // 获取帖子数据
12 comment := getComment(ctx) // 获取评论数据
13
14 fmt.Println(user, post, comment) // 打印用户、帖子和评论数据
15 }()
16
17 select {
18 case <-ctx.Done(): // 如果context被取消(超时)
19 fmt.Println("Timeout!") // 打印"Timeout!"
20 case <-ch: // 如果channel关闭(所有数据获取完成)
21 fmt.Println("All data is fetched!") // 打印"All data is fetched!"
22 }
23}
上述代码在2秒内未能获取所有数据时会输出“Timeout!”,如果所有数据都已获取则输出“All data is fetched!”。通过这种方式使用context,即使在多个Goroutine运行的代码中,也能轻松管理取消和超时。
与此相关的各种context函数和方法可在godoc context中查阅。希望您能通过学习简单地利用它们。
channel
unbuffered channel
channel是Goroutine之间通信的工具。channel可以通过make(chan T)创建。其中,T是该channel将传递的数据类型。channel可以通过<-发送和接收数据,并通过close关闭channel。
1package main
2
3func main() {
4 ch := make(chan int) // 创建一个int类型的无缓冲channel
5 go func() {
6 ch <- 1 // 发送1到channel
7 ch <- 2 // 发送2到channel
8 close(ch) // 关闭channel
9 }()
10
11 for i := range ch { // 遍历channel接收数据
12 fmt.Println(i) // 打印接收到的数据
13 }
14}
上述代码使用channel输出1和2。这段代码仅展示了向channel发送和接收值。然而,channel提供了更多功能。首先,我们将了解buffered channel和unbuffered channel。在此之前,上述示例是unbuffered channel,即向通道发送数据和从通道接收数据的行为必须同时发生。如果这些行为不同时发生,可能会导致死锁。
buffered channel
如果上述代码不是简单的输出,而是两个执行繁重任务的进程呢?如果第二个进程在读取和处理过程中长时间挂起,那么第一个进程也会在此期间停止。为了防止这种情况发生,我们可以使用buffered channel。
1package main
2
3func main() {
4 ch := make(chan int, 2) // 创建一个int类型,容量为2的缓冲channel
5 go func() {
6 ch <- 1 // 发送1到channel
7 ch <- 2 // 发送2到channel
8 close(ch) // 关闭channel
9 }()
10
11 for i := range ch { // 遍历channel接收数据
12 fmt.Println(i) // 打印接收到的数据
13 }
14}
上述代码使用buffered channel输出1和2。这段代码使用buffered channel,使得向channel发送数据和从channel接收数据的行为不必同时发生。通过为通道设置缓冲区,可以获得相应长度的余量,从而防止因后续任务影响而导致的任务延迟。
select
在处理多个通道时,使用select语句可以轻松实现fan-in结构。
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10) // 创建一个容量为10的int类型channel
10 ch2 := make(chan int, 10) // 创建一个容量为10的int类型channel
11 ch3 := make(chan int, 10) // 创建一个容量为10的int类型channel
12
13 go func() {
14 for {
15 ch1 <- 1 // 每秒发送1到ch1
16 time.Sleep(1 * time.Second)
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2 // 每2秒发送2到ch2
22 time.Sleep(2 * time.Second)
23 }
24
25 }()
26 go func() {
27 for {
28 ch3 <- 3 // 每3秒发送3到ch3
29 time.Sleep(3 * time.Second)
30 }
31 }()
32
33 for i := 0; i < 3; i++ { // 循环3次
34 select {
35 case v := <-ch1: // 从ch1接收数据
36 fmt.Println(v) // 打印数据
37 case v := <-ch2: // 从ch2接收数据
38 fmt.Println(v) // 打印数据
39 case v := <-ch3: // 从ch3接收数据
40 fmt.Println(v) // 打印数据
41 }
42 }
43}
上述代码创建了3个周期性发送1、2、3的通道,并使用select从通道接收值并输出。通过这种方式使用select,可以在同时从多个通道接收数据的同时,按接收顺序处理通道中的值。
for range
channel可以使用for range轻松接收数据。当for range用于通道时,它会在每次通道添加数据时执行,并在通道关闭时终止循环。
1package main
2
3func main() {
4 ch := make(chan int) // 创建一个int类型的channel
5 go func() {
6 ch <- 1 // 发送1到channel
7 ch <- 2 // 发送2到channel
8 close(ch) // 关闭channel
9 }()
10
11 for i := range ch { // 遍历channel接收数据
12 fmt.Println(i) // 打印接收到的数据
13 }
14}
上述代码使用channel输出1和2。此代码使用for range在每次向通道添加数据时接收并输出数据。当通道关闭时,循环终止。
如上文所述,此语法也可用于简单的同步机制。
1package main
2
3func main() {
4 ch := make(chan struct{}) // 创建一个struct{}类型的channel
5 go func() {
6 defer close(ch) // 确保在函数退出时关闭channel
7 time.Sleep(1 * time.Second) // 等待1秒
8 fmt.Println("Hello, World!") // 打印"Hello, World!"
9 }()
10
11 fmt.Println("Waiting for goroutine...") // 打印"Waiting for goroutine..."
12 for range ch {} // 等待channel关闭
13}
上述代码等待1秒后输出“Hello, World!”。此代码使用channel将同步代码转换为异步代码。通过这种方式使用channel,可以轻松地将同步代码转换为异步代码,并设置join点。
etc
- 向nil
channel发送或接收数据可能导致死锁,因为会陷入无限循环。 - 在关闭
channel后发送数据会导致panic。 - 即使不显式关闭
channel,GC也会在回收时关闭它。
mutex
spinlock
spinlock是一种通过循环不断尝试获取锁的同步方法。在Go语言中,可以使用指针轻松实现自旋锁。
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) { // 循环尝试将lock从0设置为1
14 runtime.Gosched() // 让出CPU
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0) // 将lock设置为0以释放锁
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{} // 返回一个新的SpinLock实例
24}
上述代码实现了spinlock包。此代码使用sync/atomic包实现了SpinLock。Lock方法使用atomic.CompareAndSwapUintptr尝试获取锁,Unlock方法使用atomic.StoreUintptr释放锁。这种方式会不间断地尝试获取锁,因此在获取锁之前会持续占用CPU,可能导致无限循环。因此,spinlock建议用于简单的同步或仅在短时间内使用。
sync.Mutex
mutex是Goroutine之间同步的工具。sync包提供的mutex提供了Lock、Unlock、RLock、RUnlock等方法。mutex可以通过sync.Mutex创建,也可以使用sync.RWMutex进行读/写锁。
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex // 声明一个互斥锁
9 var count int // 声明一个整数计数器
10
11 go func() {
12 mu.Lock() // 获取锁
13 count++ // 增加计数器
14 mu.Unlock() // 释放锁
15 }()
16
17 mu.Lock() // 获取锁
18 count++ // 增加计数器
19 mu.Unlock() // 释放锁
20
21 println(count) // 打印计数器值
22}
上述代码中,两个Goroutine几乎同时访问相同的count变量。此时,通过使用mutex将访问count变量的代码设置为临界区,可以阻止对count变量的并发访问。这样,无论执行多少次,这段代码都会输出相同的2。
sync.RWMutex
sync.RWMutex是一个可以区分读锁和写锁的mutex。可以使用RLock和RUnlock方法来获取和释放读锁。
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex
9 data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock() // 获取读锁
14 defer m.RUnlock() // 确保在函数退出时释放读锁
15
16 value, ok := m.data[key] // 从map中获取值
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock() // 获取写锁
22 defer m.Unlock() // 确保在函数退出时释放写锁
23
24 m.data[key] = value // 向map中设置值
25}
上述代码使用sync.RWMutex实现了ConcurrentMap。此代码在Get方法中获取读锁,在Set方法中获取写锁,从而安全地访问和修改data映射。需要读锁的原因是,在大量读取操作的情况下,无需获取写锁,只需获取读锁即可允许多个Goroutine同时执行读取操作。通过这种方式,在状态无需更改而不需要获取写锁的情况下,仅获取读锁即可提高性能。
fakelock
fakelock是实现sync.Locker的一个简单技巧。这个结构体提供了与sync.Mutex相同的方法,但实际上不执行任何操作。
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {} // 空实现
6
7func (f *FakeLock) Unlock() {} // 空实现
上述代码实现了fakelock包。此包实现了sync.Locker并提供了Lock和Unlock方法,但实际上不执行任何操作。为什么需要这样的代码,如果有机会我会详细说明。
waitgroup
sync.WaitGroup
sync.WaitGroup是一个等待Goroutine所有任务完成的工具。它提供了Add、Done、Wait方法。Add方法用于增加Goroutine的数量,Done方法用于通知Goroutine任务已完成。Wait方法则等待所有Goroutine任务完成。
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{} // 创建一个WaitGroup
10 c := atomic.Int64{} // 创建一个原子计数器
11
12 for i := 0; i < 100 ; i++ { // 循环100次
13 wg.Add(1) // 增加WaitGroup计数
14 go func() {
15 defer wg.Done() // 确保在Goroutine退出时减少WaitGroup计数
16 c.Add(1) // 原子增加计数器
17 }()
18 }
19
20 wg.Wait() // 等待所有Goroutine完成
21 println(c.Load()) // 打印计数器的最终值
22}
上述代码使用sync.WaitGroup让100个Goroutine同时向变量c加值。此代码使用sync.WaitGroup等待所有Goroutine完成,然后输出变量c的最终值。对于少数任务的fork & join,仅使用通道就足够了;但对于大量任务的fork & join,使用sync.WaitGroup也是一个不错的选择。
with slice
与切片一起使用时,waitgroup可以成为一个优秀的无需锁的并发执行任务管理工具。
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup // 创建一个WaitGroup
11 arr := [10]int{} // 创建一个包含10个整数的数组
12
13 for i := 0; i < 10; i++ { // 循环10次
14 wg.Add(1) // 增加WaitGroup计数
15 go func(id int) { // 启动一个Goroutine
16 defer wg.Done() // 确保在Goroutine退出时减少WaitGroup计数
17
18 arr[id] = rand.Intn(100) // 在指定索引处存储一个随机整数
19 }(i) // 将循环变量i作为参数传递给Goroutine
20 }
21
22 wg.Wait() // 等待所有Goroutine完成
23 fmt.Println("Done") // 打印"Done"
24
25 for i, v := range arr { // 遍历数组
26 fmt.Printf("arr[%d] = %d\n", i, v) // 打印数组元素及其索引
27 }
28}
上述代码仅使用waitgroup,让每个Goroutine同时生成10个随机整数,并将其存储在分配的索引中。此代码使用waitgroup等待所有Goroutine完成,然后输出Done。通过这种方式使用waitgroup,可以使多个Goroutine同时执行任务,无需锁地存储数据,直到所有Goroutine完成,并在任务结束后统一进行后处理。
golang.org/x/sync/errgroup.ErrGroup
errgroup是sync.WaitGroup的扩展包。与sync.WaitGroup不同,errgroup在任何一个Goroutine任务发生错误时,会取消所有Goroutine并返回错误。
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background()) // 创建一个带有context的errgroup
11 _ = ctx // 忽略ctx变量,因为它在此示例中未直接使用
12
13 for i := 0; i < 10; i++ { // 循环10次
14 i := i // 捕获循环变量i
15 g.Go(func() error { // 启动一个Goroutine
16 if i == 5 { // 如果i等于5
17 return fmt.Errorf("error") // 返回一个错误
18 }
19 return nil // 否则返回nil
20 })
21 }
22
23 if err := g.Wait(); err != nil { // 等待所有Goroutine完成,并检查是否有错误
24 fmt.Println(err) // 打印错误
25 }
26}
上述代码使用errgroup创建10个Goroutine,并在第5个Goroutine中引发错误。我们故意在第五个Goroutine中引发错误,以展示错误发生的情况。然而,在实际使用中,您可以使用errgroup创建Goroutine,并针对每个Goroutine中发生的错误进行各种后续处理。
once
一个用于确保代码只执行一次的工具。可以通过以下构造函数执行相关代码。
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFunc仅确保该函数在整个生命周期中只执行一次。
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() { // 创建一个OnceFunc
7 println("Hello, World!") // 打印"Hello, World!"
8 })
9
10 once() // 调用once函数
11 once() // 再次调用once函数
12 once() // 再次调用once函数
13 once() // 再次调用once函数
14 once() // 再次调用once函数
15}
上述代码使用sync.OnceFunc输出“Hello, World!”。此代码使用sync.OnceFunc创建once函数,即使多次调用once函数,Hello, World!也只输出一次。
OnceValue
OnceValue不仅确保该函数在整个生命周期中只执行一次,还会存储该函数的返回值,并在再次调用时返回存储的值。
1package main
2
3import "sync"
4
5func main() {
6 c := 0 // 声明一个计数器
7 once := sync.OnceValue(func() int { // 创建一个OnceValue
8 c += 1 // 增加计数器
9 return c // 返回计数器值
10 })
11
12 println(once()) // 调用once函数并打印返回值
13 println(once()) // 再次调用once函数并打印返回值
14 println(once()) // 再次调用once函数并打印返回值
15 println(once()) // 再次调用once函数并打印返回值
16 println(once()) // 再次调用once函数并打印返回值
17}
上述代码使用sync.OnceValue将变量c递增1。此代码使用sync.OnceValue创建once函数,即使多次调用once函数,变量c也只递增一次并返回1。
OnceValues
OnceValues与OnceValue功能相同,但可以返回多个值。
1package main
2
3import "sync"
4
5func main() {
6 c := 0 // 声明一个计数器
7 once := sync.OnceValues(func() (int, int) { // 创建一个OnceValues
8 c += 1 // 增加计数器
9 return c, c // 返回两个计数器值
10 })
11
12 a, b := once() // 调用once函数并接收两个返回值
13 println(a, b) // 打印返回值
14 a, b = once() // 再次调用once函数并接收两个返回值
15 println(a, b) // 打印返回值
16 a, b = once() // 再次调用once函数并接收两个返回值
17 println(a, b) // 打印返回值
18 a, b = once() // 再次调用once函数并接收两个返回值
19 println(a, b) // 打印返回值
20 a, b = once() // 再次调用once函数并接收两个返回值
21 println(a, b) // 打印返回值
22}
上述代码使用sync.OnceValues将变量c递增1。此代码使用sync.OnceValues创建once函数,即使多次调用once函数,变量c也只递增一次并返回1。
atomic
atomic包提供了原子操作。atomic包提供了Add、CompareAndSwap、Load、Store、Swap等方法,但最近推荐使用Int64、Uint64、Pointer等类型。
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{} // 创建一个WaitGroup
10 c := atomic.Int64{} // 创建一个原子Int64计数器
11
12 for i := 0; i < 100 ; i++ { // 循环100次
13 wg.Add(1) // 增加WaitGroup计数
14 go func() {
15 defer wg.Done() // 确保在Goroutine退出时减少WaitGroup计数
16 c.Add(1) // 原子增加计数器
17 }()
18 }
19
20 wg.Wait() // 等待所有Goroutine完成
21 println(c.Load()) // 打印计数器的最终值
22}
这是之前用过的例子。它使用atomic.Int64类型原子地增加变量c。通过Add方法和Load方法,可以原子地增加变量并读取变量。此外,还可以通过Store方法存储值,通过Swap方法交换值,并通过CompareAndSwap方法比较值后在适当情况下进行交换。
cond
sync.Cond
cond包提供了条件变量。cond包可以通过sync.Cond创建,并提供了Wait、Signal、Broadcast方法。
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{}) // 创建一个条件变量,关联一个互斥锁
9 ready := false // 声明一个布尔变量表示是否准备就绪
10
11 go func() {
12 c.L.Lock() // 获取条件变量关联的锁
13 ready = true // 设置ready为true
14 c.Signal() // 发送信号,唤醒一个等待的Goroutine
15 c.L.Unlock() // 释放条件变量关联的锁
16 }()
17
18 c.L.Lock() // 获取条件变量关联的锁
19 for !ready { // 循环等待ready为true
20 c.Wait() // 释放锁并等待信号,被唤醒后重新获取锁
21 }
22 c.L.Unlock() // 释放条件变量关联的锁
23
24 println("Ready!") // 打印"Ready!"
25}
上述代码使用sync.Cond等待ready变量变为true。此代码使用sync.Cond等待ready变量变为true后,输出“Ready!”。通过这种方式使用sync.Cond,可以使多个Goroutine在满足特定条件之前等待。
可以利用它来实现一个简单的queue。
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex // 嵌入互斥锁
15 Cond *sync.Cond // 条件变量
16 Head *Node[T] // 队列头部
17 Tail *Node[T] // 队列尾部
18 Len int // 队列长度
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{} // 创建一个空的Queue
23 q.Cond = sync.NewCond(&q.Mutex) // 创建条件变量并关联队列的互斥锁
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock() // 获取队列锁
29 defer q.Unlock() // 确保在函数退出时释放队列锁
30
31 node := &Node[T]{Value: value} // 创建新节点
32 if q.Len == 0 { // 如果队列为空
33 q.Head = node // 新节点成为头部
34 q.Tail = node // 新节点成为尾部
35 } else {
36 q.Tail.Next = node // 将新节点添加到尾部
37 q.Tail = node // 更新尾部
38 }
39 q.Len++ // 增加队列长度
40 q.Cond.Signal() // 发送信号,通知有新元素入队
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock() // 获取队列锁
45 defer q.Unlock() // 确保在函数退出时释放队列锁
46
47 for q.Len == 0 { // 如果队列为空
48 q.Cond.Wait() // 等待信号,直到队列不为空
49 }
50
51 node := q.Head // 获取头部节点
52 q.Head = q.Head.Next // 更新头部
53 q.Len-- // 减少队列长度
54 return node.Value // 返回头部节点的值
55}
通过sync.Cond,可以有效地等待,并在条件满足时重新启动,而不是使用spin-lock消耗大量CPU资源。
semaphore
golang.org/x/sync/semaphore.Semaphore
semaphore包提供了信号量。semaphore包可以通过golang.org/x/sync/semaphore.Semaphore创建,并提供了Acquire、Release、TryAcquire方法。
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1) // 创建一个加权信号量,权重为1
11
12 if s.TryAcquire(1) { // 尝试获取权重为1的信号量
13 fmt.Println("Acquired!") // 如果成功获取,打印"Acquired!"
14 } else {
15 fmt.Println("Not Acquired!") // 如果未能获取,打印"Not Acquired!"
16 }
17
18 s.Release(1) // 释放权重为1的信号量
19}
上述代码使用semaphore创建一个信号量,并使用Acquire方法获取信号量,使用Release方法释放信号量。此代码展示了如何使用semaphore获取和释放信号量。
总结
基本内容就到此为止。希望通过本文的内容,您能理解并实际运用Goroutine进行并发管理。希望本文能对您有所帮助。谢谢。