Go Concurrency Starter Pack
Overview
Brief Introduction
The Go language has numerous tools for managing concurrency. In this article, we will introduce some of them along with several tricks.
Goroutines?
A goroutine is a new type of concurrency model supported in the Go language. Typically, programs receive OS threads from the operating system to perform multiple tasks concurrently, executing tasks in parallel up to the number of cores. For finer-grained concurrency, user-level green threads are created, allowing several green threads to operate within a single OS thread. However, goroutines have made these green threads smaller and more efficient. These goroutines use less memory than threads and can be created and switched more quickly than threads.
To use a goroutine, one simply needs to use the go
keyword. This allows the execution of synchronous code as asynchronous code in an intuitive way during the program writing process.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch)
12 time.Sleep(1 * time.Second)
13 fmt.Println("Hello, World!")
14 }()
15
16 fmt.Println("Waiting for goroutine...")
17 for range ch {}
18}
This code simply changes a synchronous code that pauses for 1 second and prints Hello, World!
into an asynchronous flow. The current example is simple, but if slightly more complex code is changed from synchronous to asynchronous code, the code's readability, visibility, and understandability become better than with traditional methods like async await or promise.
However, in many cases, without understanding the flow of simply calling such synchronous code asynchronously and flows like fork & join
(a flow similar to divide and conquer), poor goroutine code can be created. In this article, we will introduce several methods and techniques that can prepare for such cases.
Concurrency Management
context
It might be surprising that context
appears as the first management technique. However, in the Go language, context
goes beyond simple cancellation functions and plays an excellent role in managing the entire work tree. For those who are not familiar with it, I will briefly explain this package.
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background())
5 defer cancel()
6
7 go func() {
8 <-ctx.Done()
9 fmt.Println("Context is done!")
10 }()
11
12 time.Sleep(1 * time.Second)
13
14 cancel()
15
16 time.Sleep(1 * time.Second)
17}
The above code uses context
to print Context is done!
after 1 second. The context
can check for cancellation through the Done()
method and provides various cancellation methods through WithCancel
, WithTimeout
, WithDeadline
, WithValue
, etc.
Let's create a simple example. Suppose you are writing code that fetches user
, post
, and comment
using the aggregator
pattern to retrieve some data. If all requests must be completed within 2 seconds, you can write the code as follows:
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
5 defer cancel()
6
7 ch := make(chan struct{})
8 go func() {
9 defer close(ch)
10 user := getUser(ctx)
11 post := getPost(ctx)
12 comment := getComment(ctx)
13
14 fmt.Println(user, post, comment)
15 }()
16
17 select {
18 case <-ctx.Done():
19 fmt.Println("Timeout!")
20 case <-ch:
21 fmt.Println("All data is fetched!")
22 }
23}
The code above prints Timeout!
if all data cannot be fetched within 2 seconds, and prints All data is fetched!
if all data is fetched. By using context
in this way, you can easily manage cancellation and timeout even in code where multiple goroutines are operating.
Various context-related functions and methods can be found at godoc context. It is hoped that you will learn the basics and use them comfortably.
channel
unbuffered channel
A channel
is a tool for communication between goroutines. A channel
can be created with make(chan T)
. Here, T
is the type of data the channel
will transfer. Data can be sent and received using <-
, and a channel
can be closed using close
.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
The above code uses a channel
to print 1 and 2. This code simply demonstrates sending and receiving values through the channel
. However, channel
offers more functionalities. First, let's examine buffered channel
and unbuffered channel
. The example written above is an unbuffered channel
, which means the action of sending data to the channel and the action of receiving data must occur simultaneously. If these actions do not occur simultaneously, a deadlock can occur.
buffered channel
What if the above code were not simply printing but was two processes performing heavy tasks? If the second process is reading and then gets stuck for a long time during processing, the first process will also stop for that duration. We can use a buffered channel
to prevent such situations.
1package main
2
3func main() {
4 ch := make(chan int, 2)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
The above code uses a buffered channel
to print 1 and 2. In this code, by using a buffered channel
, we have made it so that the action of sending data to the channel
and the action of receiving data do not have to occur simultaneously. By having a buffer in the channel like this, a margin is created corresponding to the length, which can prevent delays in operation caused by downstream tasks.
select
When dealing with multiple channels, you can easily implement a fan-in
structure using the select
statement.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10)
10 ch2 := make(chan int, 10)
11 ch3 := make(chan int, 10)
12
13 go func() {
14 for {
15 ch1 <- 1
16 time.Sleep(1 * time.Second)
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2
22 time.Sleep(2 * time.Second)
23 }
24 }()
25 go func() {
26 for {
27 ch3 <- 3
28 time.Sleep(3 * time.Second)
29 }
30 }()
31
32 for i := 0; i < 3; i++ {
33 select {
34 case v := <-ch1:
35 fmt.Println(v)
36 case v := <-ch2:
37 fmt.Println(v)
38 case v := <-ch3:
39 fmt.Println(v)
40 }
41 }
42}
The code above creates three channels that periodically send 1, 2, and 3, and uses select
to receive and print values from the channels. Using select
in this way allows you to receive data from multiple channels simultaneously and process the values as they are received from the channels.
for range
You can easily receive data from a channel
using for range
. When for range
is used with a channel, it operates whenever data is added to that channel, and the loop ends when the channel is closed.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
The above code uses a channel
to print 1 and 2. This code uses for range
to receive and print data whenever data is added to the channel. The loop ends when the channel is closed.
As written above several times, this syntax can also be used as a simple synchronization method.
1package main
2
3func main() {
4 ch := make(chan struct{})
5 go func() {
6 defer close(ch)
7 time.Sleep(1 * time.Second)
8 fmt.Println("Hello, World!")
9 }()
10
11 fmt.Println("Waiting for goroutine...")
12 for range ch {}
13}
The above code prints Hello, World!
after a 1-second pause. This code has used a channel
to change synchronous code to asynchronous code. By using channel
in this way, you can easily change synchronous code into asynchronous code and set the join
point.
etc
- Sending or receiving data on a nil channel can lead to an infinite loop, resulting in a deadlock.
- Sending data to a closed channel will cause a panic.
- It is not necessary to close a channel explicitly; the GC will close it when it is collected.
mutex
spinlock
A spinlock
is a synchronization method that repeatedly attempts to acquire a lock. In Go, a spinlock can easily be implemented using pointers.
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14 runtime.Gosched()
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{}
24}
The above code implements the spinlock
package. This code uses the sync/atomic
package to implement SpinLock
. The Lock
method uses atomic.CompareAndSwapUintptr
to attempt to acquire the lock, and the Unlock
method uses atomic.StoreUintptr
to release the lock. This method continuously attempts to acquire the lock without pausing; therefore, it continuously uses the CPU until the lock is obtained, which may lead to an infinite loop. Consequently, spinlock
is best used for simple synchronization or in cases where it is used only for a short period.
sync.Mutex
A mutex
is a tool for synchronizing goroutines. The mutex
provided by the sync
package offers methods such as Lock
, Unlock
, RLock
, and RUnlock
. A mutex
can be created with sync.Mutex
, and a read/write lock can also be used with sync.RWMutex
.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex
9 var count int
10
11 go func() {
12 mu.Lock()
13 count++
14 mu.Unlock()
15 }()
16
17 mu.Lock()
18 count++
19 mu.Unlock()
20
21 println(count)
22}
In the above code, two goroutines access the same count
variable almost simultaneously. By making the code accessing the count
variable a critical section using a mutex
, simultaneous access to the count
variable can be prevented. Thus, no matter how many times this code is executed, it will always print 2
.
sync.RWMutex
sync.RWMutex
is a mutex
that allows read locks and write locks to be used separately. Read locks can be acquired and released using the RLock
and RUnlock
methods.
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex
9 data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock()
14 defer m.RUnlock()
15
16 value, ok := m.data[key]
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock()
22 defer m.Unlock()
23
24 m.data[key] = value
25}
The above code implements ConcurrentMap
using sync.RWMutex
. This code uses a read lock in the Get
method and a write lock in the Set
method to safely access and modify the data
map. The reason a read lock is needed is to allow multiple goroutines to perform read operations simultaneously without acquiring a write lock when there are many simple read operations. This improves performance by using read locks only when a write lock is not necessary because there are no changes to the state.
fakelock
fakelock
is a simple trick that implements sync.Locker
. This struct provides the same methods as sync.Mutex
but does not actually perform any actions.
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}
The above code implements the fakelock
package. This package implements sync.Locker
, providing Lock
and Unlock
methods, but in reality, it does nothing. The reason why this code is needed will be described when the opportunity arises.
waitgroup
sync.WaitGroup
sync.WaitGroup
is a tool that waits until all goroutine tasks are finished. It provides Add
, Done
, and Wait
methods. The Add
method is used to add the number of goroutines, the Done
method is used to signal that a goroutine's task is complete, and the Wait
method is used to wait until all goroutines' tasks are finished.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
The aforementioned code employs sync.WaitGroup
to enable 100 goroutines to concurrently increment the value of the variable c
. In this code, sync.WaitGroup
is utilized to await the completion of all goroutines before printing the accumulated value of the variable c
. While employing channels suffices for simple fork & join
operations, utilizing sync.WaitGroup
presents a favorable alternative when dealing with a substantial number of fork & join
tasks.
with slice
When used in conjunction with slices, waitgroup
can function as an excellent tool for managing concurrent execution tasks without the need for locks.
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup
11 arr := [10]int{}
12
13 for i := 0; i < 10; i++ {
14 wg.Add(1)
15 go func(id int) {
16 defer wg.Done()
17
18 arr[id] = rand.Intn(100)
19 }(i)
20 }
21
22 wg.Wait()
23 fmt.Println("Done")
24
25 for i, v := range arr {
26 fmt.Printf("arr[%d] = %d\n", i, v)
27 }
28}
The code presented above uses only waitgroup
to generate 10 random integers concurrently in each goroutine and stores them at the assigned index. In this code, waitgroup
is used to wait until all goroutines are finished, and then Done
is printed. Using waitgroup
in this manner allows multiple goroutines to perform tasks concurrently, to store data without locks until all goroutines have finished, and to perform batch post-processing after the tasks are complete.
golang.org/x/sync/errgroup.ErrGroup
errgroup
is a package that extends sync.WaitGroup
. Unlike sync.WaitGroup
, if an error occurs in any of the goroutines, errgroup
cancels all goroutines and returns the error.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background())
11 _ = ctx
12
13 for i := 0; i < 10; i++ {
14 i := i
15 g.Go(func() error {
16 if i == 5 {
17 return fmt.Errorf("error")
18 }
19 return nil
20 })
21 }
22
23 if err := g.Wait(); err != nil {
24 fmt.Println(err)
25 }
26}
The code above creates 10 goroutines using errgroup
and causes an error in the 5th goroutine. The error is intentionally generated in the fifth goroutine to demonstrate a scenario where an error occurs. However, in actual usage, errgroup
can be used to create goroutines and handle various post-processing tasks when errors occur in each goroutine.
once
A tool for executing code that should only be run once. The following constructors can be used to execute the relevant code.
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFunc
simply ensures that a specific function is executed only once throughout the entire execution.
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() {
7 println("Hello, World!")
8 })
9
10 once()
11 once()
12 once()
13 once()
14 once()
15}
The code presented above uses sync.OnceFunc
to print Hello, World!
. In this code, sync.OnceFunc
is used to create the once
function, and even if the once
function is called multiple times, Hello, World!
is printed only once.
OnceValue
OnceValue
not only ensures that the function is executed only once throughout the entire execution, but also stores the return value of the function and returns the stored value when called again.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValue(func() int {
8 c += 1
9 return c
10 })
11
12 println(once())
13 println(once())
14 println(once())
15 println(once())
16 println(once())
17}
The code above uses sync.OnceValue
to increment the c
variable by 1. In this code, sync.OnceValue
is used to create the once
function, and even if the once
function is called multiple times, the c
variable is incremented only once, and the value 1 is returned.
OnceValues
OnceValues
operates in the same manner as OnceValue
, but it can return multiple values.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValues(func() (int, int) {
8 c += 1
9 return c, c
10 })
11
12 a, b := once()
13 println(a, b)
14 a, b = once()
15 println(a, b)
16 a, b = once()
17 println(a, b)
18 a, b = once()
19 println(a, b)
20 a, b = once()
21 println(a, b)
22}
The code above uses sync.OnceValues
to increment the c
variable by 1. In this code, sync.OnceValues
is used to create the once
function, and even if the once
function is called multiple times, the c
variable is incremented only once, and the value 1 is returned.
atomic
The atomic
package provides atomic operations. The atomic
package provides methods such as Add
, CompareAndSwap
, Load
, Store
, and Swap
, but recently, the use of types such as Int64
, Uint64
, and Pointer
is recommended.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
This is an example that was used earlier. This code uses the atomic.Int64
type to atomically increment the c
variable. The Add
method and Load
method can be used to atomically increment the variable and read the variable. In addition, the Store
method can be used to store a value, the Swap
method can be used to swap a value, and the CompareAndSwap
method can be used to compare a value and swap it if it matches.
cond
sync.Cond
The cond
package is a package that provides condition variables. The cond
package can be created using sync.Cond
, and it provides the Wait
, Signal
, and Broadcast
methods.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{})
9 ready := false
10
11 go func() {
12 c.L.Lock()
13 ready = true
14 c.Signal()
15 c.L.Unlock()
16 }()
17
18 c.L.Lock()
19 for !ready {
20 c.Wait()
21 }
22 c.L.Unlock()
23
24 println("Ready!")
25}
The code above uses sync.Cond
to wait until the ready
variable becomes true
. In this code, sync.Cond
is used to wait until the ready
variable becomes true
, and then Ready!
is printed. Using sync.Cond
in this way allows multiple goroutines to wait until a specific condition is satisfied simultaneously.
This can be used to implement a simple queue
.
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex
15 Cond *sync.Cond
16 Head *Node[T]
17 Tail *Node[T]
18 Len int
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{}
23 q.Cond = sync.NewCond(&q.Mutex)
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock()
29 defer q.Unlock()
30
31 node := &Node[T]{Value: value}
32 if q.Len == 0 {
33 q.Head = node
34 q.Tail = node
35 } else {
36 q.Tail.Next = node
37 q.Tail = node
38 }
39 q.Len++
40 q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock()
45 defer q.Unlock()
46
47 for q.Len == 0 {
48 q.Cond.Wait()
49 }
50
51 node := q.Head
52 q.Head = q.Head.Next
53 q.Len--
54 return node.Value
55}
By using sync.Cond
in this way, you can efficiently wait and resume operation when a condition is met, instead of using spin-lock
, which uses a lot of CPU resources.
semaphore
golang.org/x/sync/semaphore.Semaphore
The semaphore
package provides semaphores. The semaphore
package can be created using golang.org/x/sync/semaphore.Semaphore
, and it provides the Acquire
, Release
, and TryAcquire
methods.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1)
11
12 if s.TryAcquire(1) {
13 fmt.Println("Acquired!")
14 } else {
15 fmt.Println("Not Acquired!")
16 }
17
18 s.Release(1)
19}
The code above creates a semaphore using the semaphore
package, acquires the semaphore using the Acquire
method, and releases the semaphore using the Release
method. In this code, we have demonstrated how to acquire and release a semaphore using semaphore
.
Conclusion
The fundamental concepts should be adequately covered up to this point. Based on the content of this article, it is hoped that you have gained an understanding of how to manage concurrency using goroutines and can apply it practically. It is also hoped that this article has been beneficial to you. Thank you.