GoSuda

Go Concurrency Starter Pack

By snowmerak
views ...

Overview

Brief Introduction

The Go language has numerous tools for managing concurrency. In this article, we will introduce some of them along with several tricks.

Goroutines?

A goroutine is a new type of concurrency model supported in the Go language. Typically, programs receive OS threads from the operating system to perform multiple tasks concurrently, executing tasks in parallel up to the number of cores. For finer-grained concurrency, user-level green threads are created, allowing several green threads to operate within a single OS thread. However, goroutines have made these green threads smaller and more efficient. These goroutines use less memory than threads and can be created and switched more quickly than threads.

To use a goroutine, one simply needs to use the go keyword. This allows the execution of synchronous code as asynchronous code in an intuitive way during the program writing process.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("Hello, World!")
14    }()
15
16    fmt.Println("Waiting for goroutine...")
17    for range ch {}
18}

This code simply changes a synchronous code that pauses for 1 second and prints Hello, World! into an asynchronous flow. The current example is simple, but if slightly more complex code is changed from synchronous to asynchronous code, the code's readability, visibility, and understandability become better than with traditional methods like async await or promise.

However, in many cases, without understanding the flow of simply calling such synchronous code asynchronously and flows like fork & join (a flow similar to divide and conquer), poor goroutine code can be created. In this article, we will introduce several methods and techniques that can prepare for such cases.

Concurrency Management

context

It might be surprising that context appears as the first management technique. However, in the Go language, context goes beyond simple cancellation functions and plays an excellent role in managing the entire work tree. For those who are not familiar with it, I will briefly explain this package.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("Context is done!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

The above code uses context to print Context is done! after 1 second. The context can check for cancellation through the Done() method and provides various cancellation methods through WithCancel, WithTimeout, WithDeadline, WithValue, etc.

Let's create a simple example. Suppose you are writing code that fetches user, post, and comment using the aggregator pattern to retrieve some data. If all requests must be completed within 2 seconds, you can write the code as follows:

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("Timeout!")
20    case <-ch:
21        fmt.Println("All data is fetched!")
22    }
23}

The code above prints Timeout! if all data cannot be fetched within 2 seconds, and prints All data is fetched! if all data is fetched. By using context in this way, you can easily manage cancellation and timeout even in code where multiple goroutines are operating.

Various context-related functions and methods can be found at godoc context. It is hoped that you will learn the basics and use them comfortably.

channel

unbuffered channel

A channel is a tool for communication between goroutines. A channel can be created with make(chan T). Here, T is the type of data the channel will transfer. Data can be sent and received using <-, and a channel can be closed using close.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

The above code uses a channel to print 1 and 2. This code simply demonstrates sending and receiving values through the channel. However, channel offers more functionalities. First, let's examine buffered channel and unbuffered channel. The example written above is an unbuffered channel, which means the action of sending data to the channel and the action of receiving data must occur simultaneously. If these actions do not occur simultaneously, a deadlock can occur.

buffered channel

What if the above code were not simply printing but was two processes performing heavy tasks? If the second process is reading and then gets stuck for a long time during processing, the first process will also stop for that duration. We can use a buffered channel to prevent such situations.

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

The above code uses a buffered channel to print 1 and 2. In this code, by using a buffered channel, we have made it so that the action of sending data to the channel and the action of receiving data do not have to occur simultaneously. By having a buffer in the channel like this, a margin is created corresponding to the length, which can prevent delays in operation caused by downstream tasks.

select

When dealing with multiple channels, you can easily implement a fan-in structure using the select statement.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

The code above creates three channels that periodically send 1, 2, and 3, and uses select to receive and print values from the channels. Using select in this way allows you to receive data from multiple channels simultaneously and process the values as they are received from the channels.

for range

You can easily receive data from a channel using for range. When for range is used with a channel, it operates whenever data is added to that channel, and the loop ends when the channel is closed.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

The above code uses a channel to print 1 and 2. This code uses for range to receive and print data whenever data is added to the channel. The loop ends when the channel is closed.

As written above several times, this syntax can also be used as a simple synchronization method.

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("Hello, World!")
 9    }()
10
11    fmt.Println("Waiting for goroutine...")
12    for range ch {}
13}

The above code prints Hello, World! after a 1-second pause. This code has used a channel to change synchronous code to asynchronous code. By using channel in this way, you can easily change synchronous code into asynchronous code and set the join point.

etc

  1. Sending or receiving data on a nil channel can lead to an infinite loop, resulting in a deadlock.
  2. Sending data to a closed channel will cause a panic.
  3. It is not necessary to close a channel explicitly; the GC will close it when it is collected.

mutex

spinlock

A spinlock is a synchronization method that repeatedly attempts to acquire a lock. In Go, a spinlock can easily be implemented using pointers.

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

The above code implements the spinlock package. This code uses the sync/atomic package to implement SpinLock. The Lock method uses atomic.CompareAndSwapUintptr to attempt to acquire the lock, and the Unlock method uses atomic.StoreUintptr to release the lock. This method continuously attempts to acquire the lock without pausing; therefore, it continuously uses the CPU until the lock is obtained, which may lead to an infinite loop. Consequently, spinlock is best used for simple synchronization or in cases where it is used only for a short period.

sync.Mutex

A mutex is a tool for synchronizing goroutines. The mutex provided by the sync package offers methods such as Lock, Unlock, RLock, and RUnlock. A mutex can be created with sync.Mutex, and a read/write lock can also be used with sync.RWMutex.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

In the above code, two goroutines access the same count variable almost simultaneously. By making the code accessing the count variable a critical section using a mutex, simultaneous access to the count variable can be prevented. Thus, no matter how many times this code is executed, it will always print 2.

sync.RWMutex

sync.RWMutex is a mutex that allows read locks and write locks to be used separately. Read locks can be acquired and released using the RLock and RUnlock methods.

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

The above code implements ConcurrentMap using sync.RWMutex. This code uses a read lock in the Get method and a write lock in the Set method to safely access and modify the data map. The reason a read lock is needed is to allow multiple goroutines to perform read operations simultaneously without acquiring a write lock when there are many simple read operations. This improves performance by using read locks only when a write lock is not necessary because there are no changes to the state.

fakelock

fakelock is a simple trick that implements sync.Locker. This struct provides the same methods as sync.Mutex but does not actually perform any actions.

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

The above code implements the fakelock package. This package implements sync.Locker, providing Lock and Unlock methods, but in reality, it does nothing. The reason why this code is needed will be described when the opportunity arises.

waitgroup

sync.WaitGroup

sync.WaitGroup is a tool that waits until all goroutine tasks are finished. It provides Add, Done, and Wait methods. The Add method is used to add the number of goroutines, the Done method is used to signal that a goroutine's task is complete, and the Wait method is used to wait until all goroutines' tasks are finished.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

The aforementioned code employs sync.WaitGroup to enable 100 goroutines to concurrently increment the value of the variable c. In this code, sync.WaitGroup is utilized to await the completion of all goroutines before printing the accumulated value of the variable c. While employing channels suffices for simple fork & join operations, utilizing sync.WaitGroup presents a favorable alternative when dealing with a substantial number of fork & join tasks.

with slice

When used in conjunction with slices, waitgroup can function as an excellent tool for managing concurrent execution tasks without the need for locks.

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	arr := [10]int{}
12
13	for i := 0; i < 10; i++ {
14		wg.Add(1)
15		go func(id int) {
16			defer wg.Done()
17
18			arr[id] = rand.Intn(100)
19		}(i)
20	}
21
22	wg.Wait()
23	fmt.Println("Done")
24
25    for i, v := range arr {
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

The code presented above uses only waitgroup to generate 10 random integers concurrently in each goroutine and stores them at the assigned index. In this code, waitgroup is used to wait until all goroutines are finished, and then Done is printed. Using waitgroup in this manner allows multiple goroutines to perform tasks concurrently, to store data without locks until all goroutines have finished, and to perform batch post-processing after the tasks are complete.

golang.org/x/sync/errgroup.ErrGroup

errgroup is a package that extends sync.WaitGroup. Unlike sync.WaitGroup, if an error occurs in any of the goroutines, errgroup cancels all goroutines and returns the error.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}

The code above creates 10 goroutines using errgroup and causes an error in the 5th goroutine. The error is intentionally generated in the fifth goroutine to demonstrate a scenario where an error occurs. However, in actual usage, errgroup can be used to create goroutines and handle various post-processing tasks when errors occur in each goroutine.

once

A tool for executing code that should only be run once. The following constructors can be used to execute the relevant code.

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc simply ensures that a specific function is executed only once throughout the entire execution.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}

The code presented above uses sync.OnceFunc to print Hello, World!. In this code, sync.OnceFunc is used to create the once function, and even if the once function is called multiple times, Hello, World! is printed only once.

OnceValue

OnceValue not only ensures that the function is executed only once throughout the entire execution, but also stores the return value of the function and returns the stored value when called again.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}

The code above uses sync.OnceValue to increment the c variable by 1. In this code, sync.OnceValue is used to create the once function, and even if the once function is called multiple times, the c variable is incremented only once, and the value 1 is returned.

OnceValues

OnceValues operates in the same manner as OnceValue, but it can return multiple values.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}

The code above uses sync.OnceValues to increment the c variable by 1. In this code, sync.OnceValues is used to create the once function, and even if the once function is called multiple times, the c variable is incremented only once, and the value 1 is returned.

atomic

The atomic package provides atomic operations. The atomic package provides methods such as Add, CompareAndSwap, Load, Store, and Swap, but recently, the use of types such as Int64, Uint64, and Pointer is recommended.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

This is an example that was used earlier. This code uses the atomic.Int64 type to atomically increment the c variable. The Add method and Load method can be used to atomically increment the variable and read the variable. In addition, the Store method can be used to store a value, the Swap method can be used to swap a value, and the CompareAndSwap method can be used to compare a value and swap it if it matches.

cond

sync.Cond

The cond package is a package that provides condition variables. The cond package can be created using sync.Cond, and it provides the Wait, Signal, and Broadcast methods.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}

The code above uses sync.Cond to wait until the ready variable becomes true. In this code, sync.Cond is used to wait until the ready variable becomes true, and then Ready! is printed. Using sync.Cond in this way allows multiple goroutines to wait until a specific condition is satisfied simultaneously.

This can be used to implement a simple queue.

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

By using sync.Cond in this way, you can efficiently wait and resume operation when a condition is met, instead of using spin-lock, which uses a lot of CPU resources.

semaphore

golang.org/x/sync/semaphore.Semaphore

The semaphore package provides semaphores. The semaphore package can be created using golang.org/x/sync/semaphore.Semaphore, and it provides the Acquire, Release, and TryAcquire methods.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}

The code above creates a semaphore using the semaphore package, acquires the semaphore using the Acquire method, and releases the semaphore using the Release method. In this code, we have demonstrated how to acquire and release a semaphore using semaphore.

Conclusion

The fundamental concepts should be adequately covered up to this point. Based on the content of this article, it is hoped that you have gained an understanding of how to manage concurrency using goroutines and can apply it practically. It is also hoped that this article has been beneficial to you. Thank you.