Go Konsep Dasar Konkurensi
개요
짧은 소개
Bahasa Go memiliki banyak alat untuk manajemen konkurensi. Artikel ini akan memperkenalkan beberapa di antaranya beserta trik-triknya.
Goroutine?
Goroutine adalah model konkurensi baru yang didukung oleh bahasa Go. Umumnya, program menggunakan OS thread dari sistem operasi untuk menjalankan beberapa tugas secara bersamaan, dan menjalankan tugas secara paralel sebanyak jumlah inti prosesor. Untuk konkurensi dalam unit yang lebih kecil, program membuat green thread di userland, sehingga beberapa green thread dapat bergantian menjalankan tugas dalam satu OS thread. Namun, goroutine membuat green thread tersebut lebih kecil dan efisien. Goroutine menggunakan memori lebih sedikit dan dapat dibuat serta dialihkan lebih cepat daripada thread.
Untuk menggunakan goroutine, cukup gunakan kata kunci go
. Hal ini memungkinkan penulisan program secara intuitif untuk menjalankan kode sinkron sebagai kode asinkron.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch := make(chan struct{})
10 go func() {
11 defer close(ch)
12 time.Sleep(1 * time.Second)
13 fmt.Println("Hello, World!")
14 }()
15
16 fmt.Println("Waiting for goroutine...")
17 for range ch {}
18}
Kode ini mengubah kode sinkron yang sederhana yang menunggu 1 detik lalu mencetak Hello, World!
menjadi aliran asinkron. Contoh ini sederhana, tetapi mengubah kode yang sedikit lebih kompleks dari kode sinkron ke kode asinkron akan meningkatkan keterbacaan, visibilitas, dan pemahaman kode dibandingkan dengan metode seperti async await
atau promise
.
Namun, dalam banyak kasus, aliran pemanggilan kode sinkron secara sederhana ke asinkron dan aliran seperti fork & join
(mirip dengan aliran pembagian kekuasaan) yang tidak dipahami dengan baik dapat menghasilkan kode goroutine yang buruk. Artikel ini akan memperkenalkan beberapa metode dan teknik untuk mengantisipasi hal tersebut.
Manajemen Konkurensi
context
Penggunaan context
sebagai teknik manajemen pertama mungkin mengejutkan. Namun, dalam bahasa Go, context
tidak hanya sebatas fungsi pembatalan, tetapi juga berperan penting dalam manajemen keseluruhan pohon tugas. Untuk yang belum mengetahuinya, berikut penjelasan singkat tentang paket tersebut.
1package main
2
3func main() {
4 ctx, cancel := context.WithCancel(context.Background())
5 defer cancel()
6
7 go func() {
8 <-ctx.Done()
9 fmt.Println("Context is done!")
10 }()
11
12 time.Sleep(1 * time.Second)
13
14 cancel()
15
16 time.Sleep(1 * time.Second)
17}
Kode di atas menggunakan context
untuk mencetak Context is done!
setelah 1 detik. context
dapat memeriksa pembatalan melalui metode Done()
, dan menyediakan berbagai metode pembatalan seperti WithCancel
, WithTimeout
, WithDeadline
, dan WithValue
.
Mari kita buat contoh sederhana. Misalkan Anda menulis kode yang menggunakan pola aggregator
untuk mengambil data user
, post
, dan comment
. Dan semua permintaan harus dilakukan dalam waktu 2 detik, maka Anda dapat menulisnya sebagai berikut:
1package main
2
3func main() {
4 ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
5 defer cancel()
6
7 ch := make(chan struct{})
8 go func() {
9 defer close(ch)
10 user := getUser(ctx)
11 post := getPost(ctx)
12 comment := getComment(ctx)
13
14 fmt.Println(user, post, comment)
15 }()
16
17 select {
18 case <-ctx.Done():
19 fmt.Println("Timeout!")
20 case <-ch:
21 fmt.Println("All data is fetched!")
22 }
23}
Kode di atas akan mencetak Timeout!
jika tidak dapat mengambil semua data dalam waktu 2 detik, dan mencetak All data is fetched!
jika semua data berhasil diambil. Dengan cara ini, context
memudahkan manajemen pembatalan dan batas waktu bahkan dalam kode yang menjalankan beberapa goroutine.
Berbagai fungsi dan metode terkait context
dapat dilihat di godoc context. Semoga Anda dapat mempelajarinya dan menggunakannya dengan mudah.
channel
unbuffered channel
channel
adalah alat untuk komunikasi antar goroutine. channel
dapat dibuat dengan make(chan T)
. T
adalah tipe data yang akan dikirim oleh channel
tersebut. channel
dapat mengirim dan menerima data dengan <-
, dan menutup channel
dengan close
.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Kode di atas menggunakan channel
untuk mencetak 1 dan 2. Kode ini hanya menunjukkan pengiriman dan penerimaan nilai pada channel
. Namun, channel
menyediakan lebih banyak fungsi. Mari kita bahas buffered channel
dan unbuffered channel
. Sebagai permulaan, contoh di atas adalah unbuffered channel
, di mana tindakan mengirim data ke channel dan menerima data harus dilakukan secara bersamaan. Jika tindakan ini tidak dilakukan secara bersamaan, maka dapat terjadi deadlock.
buffered channel
Bagaimana jika kode di atas bukan hanya mencetak output, tetapi menjalankan dua proses yang berat? Jika proses kedua membutuhkan waktu lama untuk membaca dan memproses, maka proses pertama juga akan berhenti selama waktu tersebut. Kita dapat menggunakan buffered channel
untuk mencegah situasi ini.
1package main
2
3func main() {
4 ch := make(chan int, 2)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Kode di atas menggunakan buffered channel
untuk mencetak 1 dan 2. Kode ini menggunakan buffered channel
agar tindakan mengirim data ke channel dan menerima data tidak harus dilakukan secara bersamaan. Dengan menambahkan buffer ke channel, akan ada ruang yang cukup untuk mencegah penundaan pekerjaan yang disebabkan oleh pekerjaan dengan prioritas rendah.
select
Ketika menangani beberapa channel, sintaks select
memudahkan implementasi struktur fan-in
.
1package main
2
3import (
4 "fmt"
5 "time"
6)
7
8func main() {
9 ch1 := make(chan int, 10)
10 ch2 := make(chan int, 10)
11 ch3 := make(chan int, 10)
12
13 go func() {
14 for {
15 ch1 <- 1
16 time.Sleep(1 * time.Second)
17 }
18 }()
19 go func() {
20 for {
21 ch2 <- 2
22 time.Sleep(2 * time.Second)
23 }
24 }()
25 go func() {
26 for {
27 ch3 <- 3
28 time.Sleep(3 * time.Second)
29 }
30 }()
31
32 for i := 0; i < 3; i++ {
33 select {
34 case v := <-ch1:
35 fmt.Println(v)
36 case v := <-ch2:
37 fmt.Println(v)
38 case v := <-ch3:
39 fmt.Println(v)
40 }
41 }
42}
Kode di atas membuat 3 channel yang secara berkala mengirimkan 1, 2, dan 3, dan menggunakan select
untuk menerima dan mencetak nilai dari channel. Dengan cara ini, select
memungkinkan penerimaan data dari beberapa channel secara bersamaan dan memprosesnya sesuai dengan penerimaan nilai dari channel.
for range
channel
dapat dengan mudah menerima data menggunakan for range
. Menggunakan for range
pada channel akan menjalankan kode setiap kali data ditambahkan ke channel, dan akan menghentikan loop ketika channel ditutup.
1package main
2
3func main() {
4 ch := make(chan int)
5 go func() {
6 ch <- 1
7 ch <- 2
8 close(ch)
9 }()
10
11 for i := range ch {
12 fmt.Println(i)
13 }
14}
Kode di atas menggunakan channel
untuk mencetak 1 dan 2. Kode ini menggunakan for range
untuk menerima dan mencetak data setiap kali data ditambahkan ke channel. Dan loop akan berhenti ketika channel ditutup.
Seperti yang telah ditulis beberapa kali di atas, sintaks ini juga dapat digunakan sebagai alat sinkronisasi sederhana.
1package main
2
3func main() {
4 ch := make(chan struct{})
5 go func() {
6 defer close(ch)
7 time.Sleep(1 * time.Second)
8 fmt.Println("Hello, World!")
9 }()
10
11 fmt.Println("Waiting for goroutine...")
12 for range ch {}
13}
Kode di atas menunggu 1 detik lalu mencetak Hello, World!
. Kode ini menggunakan channel
untuk mengubah kode sinkron menjadi kode asinkron. Dengan cara ini, channel
memudahkan perubahan kode sinkron menjadi kode asinkron dan pengaturan titik join
.
etc
- Mengirim atau menerima data ke/dari
nil channel
dapat menyebabkan deadlock karena loop tak terbatas. - Mengirim data setelah channel ditutup akan menyebabkan panic.
- Meskipun channel tidak ditutup secara eksplisit, GC akan menutupnya.
mutex
spinlock
spinlock
adalah metode sinkronisasi yang terus mencoba lock dengan loop berulang. Dalam bahasa Go, spinlock
dapat diimplementasikan dengan mudah menggunakan pointer.
1package spinlock
2
3import (
4 "runtime"
5 "sync/atomic"
6)
7
8type SpinLock struct {
9 lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13 for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14 runtime.Gosched()
15 }
16}
17
18func (s *SpinLock) Unlock() {
19 atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23 return &SpinLock{}
24}
Kode di atas mengimplementasikan paket spinlock
. Kode ini menggunakan paket sync/atomic
untuk mengimplementasikan SpinLock
. Metode Lock
menggunakan atomic.CompareAndSwapUintptr
untuk mencoba lock, dan metode Unlock
menggunakan atomic.StoreUintptr
untuk melepaskan lock. Metode ini terus mencoba lock sehingga terus menggunakan CPU sampai lock diperoleh, yang dapat menyebabkan loop tak terbatas. Oleh karena itu, spinlock
sebaiknya digunakan untuk sinkronisasi sederhana atau hanya untuk waktu yang singkat.
sync.Mutex
mutex
adalah alat untuk sinkronisasi antar goroutine. mutex
yang disediakan oleh paket sync
menyediakan metode seperti Lock
, Unlock
, RLock
, dan RUnlock
. mutex
dapat dibuat dengan sync.Mutex
, dan sync.RWMutex
juga dapat digunakan untuk lock baca/tulis.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 var mu sync.Mutex
9 var count int
10
11 go func() {
12 mu.Lock()
13 count++
14 mu.Unlock()
15 }()
16
17 mu.Lock()
18 count++
19 mu.Unlock()
20
21 println(count)
22}
Kode di atas memiliki dua goroutine yang hampir bersamaan mengakses variabel count
yang sama. Dengan menggunakan mutex
untuk membuat kode yang mengakses variabel count
sebagai critical section, maka akses bersamaan ke variabel count
dapat dicegah. Maka kode ini akan selalu mencetak 2
berapapun jumlah eksekusi.
sync.RWMutex
sync.RWMutex
adalah mutex
yang dapat digunakan untuk membedakan antara lock baca dan lock tulis. Metode RLock
dan RUnlock
dapat digunakan untuk mengunci dan melepaskan lock baca.
1package cmap
2
3import (
4 "sync"
5)
6
7type ConcurrentMap[K comparable, V any] struct {
8 sync.RWMutex
9 data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13 m.RLock()
14 defer m.RUnlock()
15
16 value, ok := m.data[key]
17 return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21 m.Lock()
22 defer m.Unlock()
23
24 m.data[key] = value
25}
Kode di atas mengimplementasikan ConcurrentMap
menggunakan sync.RWMutex
. Kode ini menggunakan lock baca pada metode Get
dan lock tulis pada metode Set
untuk mengakses dan memodifikasi map data
dengan aman. Lock baca diperlukan karena dalam kasus banyak operasi baca, lock tulis tidak perlu digunakan dan hanya lock baca yang digunakan sehingga beberapa goroutine dapat melakukan operasi baca secara bersamaan. Dengan demikian, kinerja dapat ditingkatkan dengan hanya menggunakan lock baca jika tidak ada perubahan status yang diperlukan dan lock tulis tidak diperlukan.
fakelock
fakelock
adalah trik sederhana yang mengimplementasikan sync.Locker
. Struktur ini menyediakan metode yang sama dengan sync.Mutex
, tetapi tidak melakukan operasi sebenarnya.
1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}
Kode di atas mengimplementasikan paket fakelock
. Paket ini mengimplementasikan sync.Locker
dan menyediakan metode Lock
dan Unlock
, tetapi sebenarnya tidak melakukan apa pun. Alasan mengapa kode ini diperlukan akan dijelaskan lain waktu.
waitgroup
sync.WaitGroup
sync.WaitGroup
adalah alat untuk menunggu hingga semua pekerjaan goroutine selesai. Metode Add
, Done
, dan Wait
disediakan. Metode Add
menambahkan jumlah goroutine, metode Done
menunjukkan bahwa pekerjaan goroutine telah selesai, dan metode Wait
menunggu hingga semua pekerjaan goroutine selesai.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 untuk i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Kode di atas menggunakan sync.WaitGroup
untuk menjalankan 100 goroutine secara bersamaan yang menambahkan nilai ke variabel c
. Kode ini menggunakan sync.WaitGroup
untuk menunggu semua goroutine selesai sebelum mencetak nilai yang telah ditambahkan ke variabel c
. Untuk beberapa tugas fork & join
yang sederhana, penggunaan channel sudah cukup, tetapi untuk sejumlah besar tugas fork & join
, penggunaan sync.WaitGroup
juga merupakan pilihan yang baik.
dengan slice
Ketika digunakan bersama slice, waitgroup
dapat menjadi alat yang sangat baik untuk mengelola eksekusi tugas konkuren tanpa lock.
1package main
2
3import (
4 "fmt"
5 "sync"
6 "rand"
7)
8
9func main() {
10 var wg sync.WaitGroup
11 arr := [10]int{}
12
13 for i := 0; i < 10; i++ {
14 wg.Add(1)
15 go func(id int) {
16 defer wg.Done()
17
18 arr[id] = rand.Intn(100)
19 }(i)
20 }
21
22 wg.Wait()
23 fmt.Println("Done")
24
25 for i, v := range arr {
26 fmt.Printf("arr[%d] = %d\n", i, v)
27 }
28}
Kode di atas menggunakan hanya waitgroup
untuk membuat 10 bilangan bulat acak secara konkuren oleh masing-masing goroutine dan menyimpannya di indeks yang telah dialokasikan. Kode ini menggunakan waitgroup
untuk menunggu semua goroutine selesai sebelum mencetak "Done". Dengan cara ini, waitgroup
memungkinkan beberapa goroutine untuk melakukan tugas secara bersamaan, menyimpan data tanpa lock, dan melakukan pemrosesan pasca-eksekusi secara bersamaan setelah semua goroutine selesai.
golang.org/x/sync/errgroup.ErrGroup
errgroup
merupakan paket yang memperluas sync.WaitGroup
. Tidak seperti sync.WaitGroup
, errgroup
akan membatalkan semua goroutine dan mengembalikan error jika salah satu tugas goroutine mengalami error.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/errgroup"
7)
8
9func main() {
10 g, ctx := errgroup.WithContext(context.Background())
11 _ = ctx
12
13 for i := 0; i < 10; i++ {
14 i := i
15 g.Go(func() error {
16 if i == 5 {
17 return fmt.Errorf("error")
18 }
19 return nil
20 })
21 }
22
23 if err := g.Wait(); err != nil {
24 fmt.Println(err)
25 }
26}
Kode di atas menggunakan errgroup
untuk membuat 10 goroutine dan menghasilkan error pada goroutine ke-5. Error sengaja dibuat pada goroutine kelima untuk menunjukkan bagaimana menangani situasi error. Namun, dalam penggunaan sebenarnya, Anda dapat menggunakan errgroup
untuk membuat goroutine dan melakukan berbagai pemrosesan pasca-eksekusi jika terjadi error pada masing-masing goroutine.
once
Alat untuk menjalankan kode yang hanya boleh dieksekusi sekali. Kode terkait dapat dijalankan melalui konstruktor di bawah ini.
1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)
OnceFunc
OnceFunc
hanya memungkinkan fungsi yang bersangkutan untuk dieksekusi sekali saja di seluruh program.
1package main
2
3import "sync"
4
5func main() {
6 once := sync.OnceFunc(func() {
7 println("Hello, World!")
8 })
9
10 once()
11 once()
12 once()
13 once()
14 once()
15}
Kode di atas menggunakan sync.OnceFunc
untuk mencetak "Hello, World!". Meskipun fungsi once
dipanggil beberapa kali, "Hello, World!" hanya akan dicetak sekali karena penggunaan sync.OnceFunc
.
OnceValue
OnceValue
tidak hanya memastikan fungsi hanya dieksekusi sekali, tetapi juga menyimpan nilai balik fungsi tersebut dan mengembalikan nilai yang tersimpan pada pemanggilan berikutnya.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValue(func() int {
8 c += 1
9 return c
10 })
11
12 println(once())
13 println(once())
14 println(once())
15 println(once())
16 println(once())
17}
Kode di atas menggunakan sync.OnceValue
untuk menambah nilai variabel c
sebanyak 1. Meskipun once
dipanggil beberapa kali, c
hanya akan bertambah sekali dan selalu mengembalikan nilai 1.
OnceValues
OnceValues
berfungsi sama seperti OnceValue
, tetapi dapat mengembalikan beberapa nilai.
1package main
2
3import "sync"
4
5func main() {
6 c := 0
7 once := sync.OnceValues(func() (int, int) {
8 c += 1
9 return c, c
10 })
11
12 a, b := once()
13 println(a, b)
14 a, b = once()
15 println(a, b)
16 a, b = once()
17 println(a, b)
18 a, b = once()
19 println(a, b)
20 a, b = once()
21 println(a, b)
22}
Kode di atas menggunakan sync.OnceValues
untuk menambah nilai variabel c
sebanyak 1. Meskipun once
dipanggil beberapa kali, c
hanya akan bertambah sekali dan selalu mengembalikan nilai 1 untuk a
dan b
.
atomic
Paket atomic
menyediakan operasi atomik. Paket atomic
menyediakan metode seperti Add
, CompareAndSwap
, Load
, Store
, Swap
, tetapi saat ini penggunaan tipe seperti Int64
, Uint64
, Pointer
direkomendasikan.
1package main
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8func main() {
9 wg := sync.WaitGroup{}
10 c := atomic.Int64{}
11
12 for i := 0; i < 100 ; i++ {
13 wg.Add(1)
14 go func() {
15 defer wg.Done()
16 c.Add(1)
17 }()
18 }
19
20 wg.Wait()
21 println(c.Load())
22}
Contoh yang digunakan sebelumnya. Kode ini menggunakan tipe atomic.Int64
untuk menambah nilai variabel c
secara atomik. Metode Add
dan Load
digunakan untuk menambah dan membaca nilai variabel secara atomik. Metode Store
untuk menyimpan nilai, Swap
untuk menukar nilai, dan CompareAndSwap
untuk membandingkan dan menukar nilai jika cocok.
cond
sync.Cond
Paket cond
menyediakan variabel kondisi. Paket cond
dapat dibuat dengan sync.Cond
dan menyediakan metode Wait
, Signal
, Broadcast
.
1package main
2
3import (
4 "sync"
5)
6
7func main() {
8 c := sync.NewCond(&sync.Mutex{})
9 ready := false
10
11 go func() {
12 c.L.Lock()
13 ready = true
14 c.Signal()
15 c.L.Unlock()
16 }()
17
18 c.L.Lock()
19 for !ready {
20 c.Wait()
21 }
22 c.L.Unlock()
23
24 println("Ready!")
25}
Kode di atas menggunakan sync.Cond
untuk menunggu hingga variabel ready
bernilai true
. Kode ini menggunakan sync.Cond
untuk menunggu hingga ready
bernilai true
sebelum mencetak "Ready!". Dengan cara ini, sync.Cond
memungkinkan beberapa goroutine untuk menunggu hingga kondisi tertentu terpenuhi.
Ini dapat digunakan untuk mengimplementasikan queue
sederhana.
1package queue
2
3import (
4 "sync"
5 "sync/atomic"
6)
7
8type Node[T any] struct {
9 Value T
10 Next *Node[T]
11}
12
13type Queue[T any] struct {
14 sync.Mutex
15 Cond *sync.Cond
16 Head *Node[T]
17 Tail *Node[T]
18 Len int
19}
20
21func New[T any]() *Queue[T] {
22 q := &Queue[T]{}
23 q.Cond = sync.NewCond(&q.Mutex)
24 return q
25}
26
27func (q *Queue[T]) Push(value T) {
28 q.Lock()
29 defer q.Unlock()
30
31 node := &Node[T]{Value: value}
32 if q.Len == 0 {
33 q.Head = node
34 q.Tail = node
35 } else {
36 q.Tail.Next = node
37 q.Tail = node
38 }
39 q.Len++
40 q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44 q.Lock()
45 defer q.Unlock()
46
47 for q.Len == 0 {
48 q.Cond.Wait()
49 }
50
51 node := q.Head
52 q.Head = q.Head.Next
53 q.Len--
54 return node.Value
55}
Dengan menggunakan sync.Cond
, kita dapat menunggu secara efisien dan melanjutkan eksekusi ketika kondisi terpenuhi, daripada menggunakan spin-lock
yang menghabiskan banyak CPU.
semaphore
golang.org/x/sync/semaphore.Semaphore
Paket semaphore
menyediakan semaphore. Paket semaphore
dapat dibuat dengan golang.org/x/sync/semaphore.Semaphore
dan menyediakan metode Acquire
, Release
, TryAcquire
.
1package main
2
3import (
4 "context"
5 "fmt"
6 "golang.org/x/sync/semaphore"
7)
8
9func main() {
10 s := semaphore.NewWeighted(1)
11
12 if s.TryAcquire(1) {
13 fmt.Println("Acquired!")
14 } else {
15 fmt.Println("Not Acquired!")
16 }
17
18 s.Release(1)
19}
Kode di atas menggunakan semaphore
untuk membuat semaphore, memperoleh semaphore menggunakan metode Acquire
, dan melepaskan semaphore menggunakan metode Release
. Kode ini menunjukkan bagaimana memperoleh dan melepaskan semaphore.
Kesimpulan
Sekian penjelasan dasar yang dibutuhkan. Semoga artikel ini membantu Anda memahami dan menerapkan cara mengelola konkurensi menggunakan goroutine. Semoga artikel ini bermanfaat bagi Anda. Terima kasih.