GoSuda

Go Konsep Dasar Konkurensi

By snowmerak
views ...

개요

짧은 소개

Bahasa Go memiliki banyak alat untuk manajemen konkurensi. Artikel ini akan memperkenalkan beberapa di antaranya beserta trik-triknya.

Goroutine?

Goroutine adalah model konkurensi baru yang didukung oleh bahasa Go. Umumnya, program menggunakan OS thread dari sistem operasi untuk menjalankan beberapa tugas secara bersamaan, dan menjalankan tugas secara paralel sebanyak jumlah inti prosesor. Untuk konkurensi dalam unit yang lebih kecil, program membuat green thread di userland, sehingga beberapa green thread dapat bergantian menjalankan tugas dalam satu OS thread. Namun, goroutine membuat green thread tersebut lebih kecil dan efisien. Goroutine menggunakan memori lebih sedikit dan dapat dibuat serta dialihkan lebih cepat daripada thread.

Untuk menggunakan goroutine, cukup gunakan kata kunci go. Hal ini memungkinkan penulisan program secara intuitif untuk menjalankan kode sinkron sebagai kode asinkron.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("Hello, World!")
14    }()
15
16    fmt.Println("Waiting for goroutine...")
17    for range ch {}
18}

Kode ini mengubah kode sinkron yang sederhana yang menunggu 1 detik lalu mencetak Hello, World! menjadi aliran asinkron. Contoh ini sederhana, tetapi mengubah kode yang sedikit lebih kompleks dari kode sinkron ke kode asinkron akan meningkatkan keterbacaan, visibilitas, dan pemahaman kode dibandingkan dengan metode seperti async await atau promise.

Namun, dalam banyak kasus, aliran pemanggilan kode sinkron secara sederhana ke asinkron dan aliran seperti fork & join (mirip dengan aliran pembagian kekuasaan) yang tidak dipahami dengan baik dapat menghasilkan kode goroutine yang buruk. Artikel ini akan memperkenalkan beberapa metode dan teknik untuk mengantisipasi hal tersebut.

Manajemen Konkurensi

context

Penggunaan context sebagai teknik manajemen pertama mungkin mengejutkan. Namun, dalam bahasa Go, context tidak hanya sebatas fungsi pembatalan, tetapi juga berperan penting dalam manajemen keseluruhan pohon tugas. Untuk yang belum mengetahuinya, berikut penjelasan singkat tentang paket tersebut.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("Context is done!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

Kode di atas menggunakan context untuk mencetak Context is done! setelah 1 detik. context dapat memeriksa pembatalan melalui metode Done(), dan menyediakan berbagai metode pembatalan seperti WithCancel, WithTimeout, WithDeadline, dan WithValue.

Mari kita buat contoh sederhana. Misalkan Anda menulis kode yang menggunakan pola aggregator untuk mengambil data user, post, dan comment. Dan semua permintaan harus dilakukan dalam waktu 2 detik, maka Anda dapat menulisnya sebagai berikut:

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("Timeout!")
20    case <-ch:
21        fmt.Println("All data is fetched!")
22    }
23}

Kode di atas akan mencetak Timeout! jika tidak dapat mengambil semua data dalam waktu 2 detik, dan mencetak All data is fetched! jika semua data berhasil diambil. Dengan cara ini, context memudahkan manajemen pembatalan dan batas waktu bahkan dalam kode yang menjalankan beberapa goroutine.

Berbagai fungsi dan metode terkait context dapat dilihat di godoc context. Semoga Anda dapat mempelajarinya dan menggunakannya dengan mudah.

channel

unbuffered channel

channel adalah alat untuk komunikasi antar goroutine. channel dapat dibuat dengan make(chan T). T adalah tipe data yang akan dikirim oleh channel tersebut. channel dapat mengirim dan menerima data dengan <-, dan menutup channel dengan close.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Kode di atas menggunakan channel untuk mencetak 1 dan 2. Kode ini hanya menunjukkan pengiriman dan penerimaan nilai pada channel. Namun, channel menyediakan lebih banyak fungsi. Mari kita bahas buffered channel dan unbuffered channel. Sebagai permulaan, contoh di atas adalah unbuffered channel, di mana tindakan mengirim data ke channel dan menerima data harus dilakukan secara bersamaan. Jika tindakan ini tidak dilakukan secara bersamaan, maka dapat terjadi deadlock.

buffered channel

Bagaimana jika kode di atas bukan hanya mencetak output, tetapi menjalankan dua proses yang berat? Jika proses kedua membutuhkan waktu lama untuk membaca dan memproses, maka proses pertama juga akan berhenti selama waktu tersebut. Kita dapat menggunakan buffered channel untuk mencegah situasi ini.

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Kode di atas menggunakan buffered channel untuk mencetak 1 dan 2. Kode ini menggunakan buffered channel agar tindakan mengirim data ke channel dan menerima data tidak harus dilakukan secara bersamaan. Dengan menambahkan buffer ke channel, akan ada ruang yang cukup untuk mencegah penundaan pekerjaan yang disebabkan oleh pekerjaan dengan prioritas rendah.

select

Ketika menangani beberapa channel, sintaks select memudahkan implementasi struktur fan-in.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

Kode di atas membuat 3 channel yang secara berkala mengirimkan 1, 2, dan 3, dan menggunakan select untuk menerima dan mencetak nilai dari channel. Dengan cara ini, select memungkinkan penerimaan data dari beberapa channel secara bersamaan dan memprosesnya sesuai dengan penerimaan nilai dari channel.

for range

channel dapat dengan mudah menerima data menggunakan for range. Menggunakan for range pada channel akan menjalankan kode setiap kali data ditambahkan ke channel, dan akan menghentikan loop ketika channel ditutup.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

Kode di atas menggunakan channel untuk mencetak 1 dan 2. Kode ini menggunakan for range untuk menerima dan mencetak data setiap kali data ditambahkan ke channel. Dan loop akan berhenti ketika channel ditutup.

Seperti yang telah ditulis beberapa kali di atas, sintaks ini juga dapat digunakan sebagai alat sinkronisasi sederhana.

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("Hello, World!")
 9    }()
10
11    fmt.Println("Waiting for goroutine...")
12    for range ch {}
13}

Kode di atas menunggu 1 detik lalu mencetak Hello, World!. Kode ini menggunakan channel untuk mengubah kode sinkron menjadi kode asinkron. Dengan cara ini, channel memudahkan perubahan kode sinkron menjadi kode asinkron dan pengaturan titik join.

etc

  1. Mengirim atau menerima data ke/dari nil channel dapat menyebabkan deadlock karena loop tak terbatas.
  2. Mengirim data setelah channel ditutup akan menyebabkan panic.
  3. Meskipun channel tidak ditutup secara eksplisit, GC akan menutupnya.

mutex

spinlock

spinlock adalah metode sinkronisasi yang terus mencoba lock dengan loop berulang. Dalam bahasa Go, spinlock dapat diimplementasikan dengan mudah menggunakan pointer.

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

Kode di atas mengimplementasikan paket spinlock. Kode ini menggunakan paket sync/atomic untuk mengimplementasikan SpinLock. Metode Lock menggunakan atomic.CompareAndSwapUintptr untuk mencoba lock, dan metode Unlock menggunakan atomic.StoreUintptr untuk melepaskan lock. Metode ini terus mencoba lock sehingga terus menggunakan CPU sampai lock diperoleh, yang dapat menyebabkan loop tak terbatas. Oleh karena itu, spinlock sebaiknya digunakan untuk sinkronisasi sederhana atau hanya untuk waktu yang singkat.

sync.Mutex

mutex adalah alat untuk sinkronisasi antar goroutine. mutex yang disediakan oleh paket sync menyediakan metode seperti Lock, Unlock, RLock, dan RUnlock. mutex dapat dibuat dengan sync.Mutex, dan sync.RWMutex juga dapat digunakan untuk lock baca/tulis.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

Kode di atas memiliki dua goroutine yang hampir bersamaan mengakses variabel count yang sama. Dengan menggunakan mutex untuk membuat kode yang mengakses variabel count sebagai critical section, maka akses bersamaan ke variabel count dapat dicegah. Maka kode ini akan selalu mencetak 2 berapapun jumlah eksekusi.

sync.RWMutex

sync.RWMutex adalah mutex yang dapat digunakan untuk membedakan antara lock baca dan lock tulis. Metode RLock dan RUnlock dapat digunakan untuk mengunci dan melepaskan lock baca.

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

Kode di atas mengimplementasikan ConcurrentMap menggunakan sync.RWMutex. Kode ini menggunakan lock baca pada metode Get dan lock tulis pada metode Set untuk mengakses dan memodifikasi map data dengan aman. Lock baca diperlukan karena dalam kasus banyak operasi baca, lock tulis tidak perlu digunakan dan hanya lock baca yang digunakan sehingga beberapa goroutine dapat melakukan operasi baca secara bersamaan. Dengan demikian, kinerja dapat ditingkatkan dengan hanya menggunakan lock baca jika tidak ada perubahan status yang diperlukan dan lock tulis tidak diperlukan.

fakelock

fakelock adalah trik sederhana yang mengimplementasikan sync.Locker. Struktur ini menyediakan metode yang sama dengan sync.Mutex, tetapi tidak melakukan operasi sebenarnya.

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

Kode di atas mengimplementasikan paket fakelock. Paket ini mengimplementasikan sync.Locker dan menyediakan metode Lock dan Unlock, tetapi sebenarnya tidak melakukan apa pun. Alasan mengapa kode ini diperlukan akan dijelaskan lain waktu.

waitgroup

sync.WaitGroup

sync.WaitGroup adalah alat untuk menunggu hingga semua pekerjaan goroutine selesai. Metode Add, Done, dan Wait disediakan. Metode Add menambahkan jumlah goroutine, metode Done menunjukkan bahwa pekerjaan goroutine telah selesai, dan metode Wait menunggu hingga semua pekerjaan goroutine selesai.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    untuk i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Kode di atas menggunakan sync.WaitGroup untuk menjalankan 100 goroutine secara bersamaan yang menambahkan nilai ke variabel c. Kode ini menggunakan sync.WaitGroup untuk menunggu semua goroutine selesai sebelum mencetak nilai yang telah ditambahkan ke variabel c. Untuk beberapa tugas fork & join yang sederhana, penggunaan channel sudah cukup, tetapi untuk sejumlah besar tugas fork & join, penggunaan sync.WaitGroup juga merupakan pilihan yang baik.

dengan slice

Ketika digunakan bersama slice, waitgroup dapat menjadi alat yang sangat baik untuk mengelola eksekusi tugas konkuren tanpa lock.

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	arr := [10]int{}
12
13	for i := 0; i < 10; i++ {
14		wg.Add(1)
15		go func(id int) {
16			defer wg.Done()
17
18			arr[id] = rand.Intn(100)
19		}(i)
20	}
21
22	wg.Wait()
23	fmt.Println("Done")
24
25    for i, v := range arr {
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

Kode di atas menggunakan hanya waitgroup untuk membuat 10 bilangan bulat acak secara konkuren oleh masing-masing goroutine dan menyimpannya di indeks yang telah dialokasikan. Kode ini menggunakan waitgroup untuk menunggu semua goroutine selesai sebelum mencetak "Done". Dengan cara ini, waitgroup memungkinkan beberapa goroutine untuk melakukan tugas secara bersamaan, menyimpan data tanpa lock, dan melakukan pemrosesan pasca-eksekusi secara bersamaan setelah semua goroutine selesai.

golang.org/x/sync/errgroup.ErrGroup

errgroup merupakan paket yang memperluas sync.WaitGroup. Tidak seperti sync.WaitGroup, errgroup akan membatalkan semua goroutine dan mengembalikan error jika salah satu tugas goroutine mengalami error.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}

Kode di atas menggunakan errgroup untuk membuat 10 goroutine dan menghasilkan error pada goroutine ke-5. Error sengaja dibuat pada goroutine kelima untuk menunjukkan bagaimana menangani situasi error. Namun, dalam penggunaan sebenarnya, Anda dapat menggunakan errgroup untuk membuat goroutine dan melakukan berbagai pemrosesan pasca-eksekusi jika terjadi error pada masing-masing goroutine.

once

Alat untuk menjalankan kode yang hanya boleh dieksekusi sekali. Kode terkait dapat dijalankan melalui konstruktor di bawah ini.

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc hanya memungkinkan fungsi yang bersangkutan untuk dieksekusi sekali saja di seluruh program.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}

Kode di atas menggunakan sync.OnceFunc untuk mencetak "Hello, World!". Meskipun fungsi once dipanggil beberapa kali, "Hello, World!" hanya akan dicetak sekali karena penggunaan sync.OnceFunc.

OnceValue

OnceValue tidak hanya memastikan fungsi hanya dieksekusi sekali, tetapi juga menyimpan nilai balik fungsi tersebut dan mengembalikan nilai yang tersimpan pada pemanggilan berikutnya.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}

Kode di atas menggunakan sync.OnceValue untuk menambah nilai variabel c sebanyak 1. Meskipun once dipanggil beberapa kali, c hanya akan bertambah sekali dan selalu mengembalikan nilai 1.

OnceValues

OnceValues berfungsi sama seperti OnceValue, tetapi dapat mengembalikan beberapa nilai.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}

Kode di atas menggunakan sync.OnceValues untuk menambah nilai variabel c sebanyak 1. Meskipun once dipanggil beberapa kali, c hanya akan bertambah sekali dan selalu mengembalikan nilai 1 untuk a dan b.

atomic

Paket atomic menyediakan operasi atomik. Paket atomic menyediakan metode seperti Add, CompareAndSwap, Load, Store, Swap, tetapi saat ini penggunaan tipe seperti Int64, Uint64, Pointer direkomendasikan.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Contoh yang digunakan sebelumnya. Kode ini menggunakan tipe atomic.Int64 untuk menambah nilai variabel c secara atomik. Metode Add dan Load digunakan untuk menambah dan membaca nilai variabel secara atomik. Metode Store untuk menyimpan nilai, Swap untuk menukar nilai, dan CompareAndSwap untuk membandingkan dan menukar nilai jika cocok.

cond

sync.Cond

Paket cond menyediakan variabel kondisi. Paket cond dapat dibuat dengan sync.Cond dan menyediakan metode Wait, Signal, Broadcast.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}

Kode di atas menggunakan sync.Cond untuk menunggu hingga variabel ready bernilai true. Kode ini menggunakan sync.Cond untuk menunggu hingga ready bernilai true sebelum mencetak "Ready!". Dengan cara ini, sync.Cond memungkinkan beberapa goroutine untuk menunggu hingga kondisi tertentu terpenuhi.

Ini dapat digunakan untuk mengimplementasikan queue sederhana.

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

Dengan menggunakan sync.Cond, kita dapat menunggu secara efisien dan melanjutkan eksekusi ketika kondisi terpenuhi, daripada menggunakan spin-lock yang menghabiskan banyak CPU.

semaphore

golang.org/x/sync/semaphore.Semaphore

Paket semaphore menyediakan semaphore. Paket semaphore dapat dibuat dengan golang.org/x/sync/semaphore.Semaphore dan menyediakan metode Acquire, Release, TryAcquire.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}

Kode di atas menggunakan semaphore untuk membuat semaphore, memperoleh semaphore menggunakan metode Acquire, dan melepaskan semaphore menggunakan metode Release. Kode ini menunjukkan bagaimana memperoleh dan melepaskan semaphore.

Kesimpulan

Sekian penjelasan dasar yang dibutuhkan. Semoga artikel ini membantu Anda memahami dan menerapkan cara mengelola konkurensi menggunakan goroutine. Semoga artikel ini bermanfaat bagi Anda. Terima kasih.