GoSuda

Go Concurrency Starter Pack

By snowmerak
views ...

Resumen

Breve introducción

El lenguaje Go posee numerosas herramientas para la gestión de la concurrencia. En este artículo, presentaremos algunas de ellas y ciertos trucos asociados.

¿Goroutines?

Una goroutine es un nuevo modelo de concurrencia compatible con el lenguaje Go. Generalmente, para que un programa ejecute múltiples tareas concurrentemente, solicita hilos del sistema operativo (OS threads) a este, y realiza tareas en paralelo según el número de núcleos disponibles. Para una concurrencia de unidad más pequeña, se crean "green threads" en el espacio de usuario (userland), permitiendo que múltiples "green threads" operen dentro de un único OS thread. Sin embargo, en el caso de las goroutines, estos "green threads" se han hecho aún más pequeños y eficientes. Las goroutines utilizan menos memoria que los threads y pueden crearse e intercambiarse más rápidamente que estos.

Para utilizar una goroutine, basta con emplear la palabra clave go. Esto permite que el código síncrono se ejecute de forma asíncrona de manera intuitiva durante la escritura del programa.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("Hello, World!")
14    }()
15
16    fmt.Println("Waiting for goroutine...")
17    for range ch {}
18}

Este código convierte un código síncrono que simplemente espera 1 segundo y luego imprime Hello, World! en un flujo asíncrono. Aunque el ejemplo actual es sencillo, al transformar un código síncrono más complejo en asíncrono, la legibilidad, visibilidad y comprensión del código mejoran significativamente en comparación con enfoques como async await o promise.

Sin embargo, en muchos casos, si no se comprende el flujo de simplemente invocar código síncrono de forma asíncrona y el flujo de "fork & join" (similar a la división y conquista), se pueden generar códigos de goroutines deficientes. En este artículo, presentaremos algunos métodos y técnicas que pueden ayudar a prevenir tales situaciones.

Gestión de la concurrencia

context

Podría resultar sorprendente que el primer método de gestión sea context. No obstante, en el lenguaje Go, context trasciende la mera funcionalidad de cancelación, desempeñando un papel fundamental en la gestión de árboles de tareas completos. Para aquellos que no estén familiarizados, explicaré brevemente este paquete.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("Context is done!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

El código anterior utiliza context para imprimir Context is done! después de 1 segundo. context permite verificar el estado de cancelación a través del método Done(), y ofrece varios métodos de cancelación como WithCancel, WithTimeout, WithDeadline y WithValue.

Creemos un ejemplo sencillo. Supongamos que están escribiendo un código que recupera user, post y comment utilizando el patrón aggregator para obtener ciertos datos. Si todas las solicitudes deben completarse en 2 segundos, podrían escribirlo de la siguiente manera:

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("Timeout!")
20    case <-ch:
21        fmt.Println("All data is fetched!")
22    }
23}

El código anterior imprime Timeout! si no se obtienen todos los datos en 2 segundos, y All data is fetched! si se obtienen. De esta manera, al usar context, se puede gestionar fácilmente la cancelación y los tiempos de espera incluso en código donde operan múltiples goroutines.

Diversas funciones y métodos relacionados con context están disponibles en godoc context. Esperamos que puedan aprender lo básico y utilizarlos cómodamente.

channel

unbuffered channel

Un channel es una herramienta para la comunicación entre goroutines. Se puede crear un channel con make(chan T), donde T es el tipo de datos que el channel transmitirá. Los datos se pueden enviar y recibir a través del channel utilizando <-, y el channel se puede cerrar con close.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

El código anterior utiliza un channel para imprimir 1 y 2. En este código, solo se muestra el envío y la recepción de valores en el channel. Sin embargo, un channel ofrece muchas más funcionalidades. Primero, examinemos los buffered channel y los unbuffered channel. Para empezar, el ejemplo anterior es un unbuffered channel, donde el envío y la recepción de datos en el canal deben ocurrir simultáneamente. Si estas acciones no ocurren a la vez, podría producirse un "deadlock".

buffered channel

¿Qué sucede si el código anterior no es una simple salida, sino dos procesos que realizan tareas pesadas? Si el segundo proceso se bloquea durante un período prolongado al leer y procesar, el primer proceso también se detendrá durante ese tiempo. Para evitar esta situación, podemos utilizar un buffered channel.

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

El código anterior utiliza un buffered channel para imprimir 1 y 2. En este código, se ha utilizado un buffered channel para que el envío y la recepción de datos en el channel no tengan que ocurrir simultáneamente. Al dotar al canal de un búfer, se crea un margen de maniobra de esa longitud, lo que puede prevenir retrasos en el trabajo causados por la influencia de tareas de menor prioridad.

select

Al manejar múltiples canales, la sintaxis select puede utilizarse para implementar fácilmente una estructura fan-in.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

El código anterior crea tres canales que transmiten periódicamente 1, 2 y 3, y utiliza select para recibir e imprimir valores de los canales. De esta manera, al usar select, se pueden recibir datos simultáneamente de múltiples canales y procesarlos a medida que llegan.

for range

Un channel puede recibir datos fácilmente utilizando for range. Cuando for range se utiliza con un canal, se ejecutará cada vez que se añadan datos a dicho canal, y el bucle finalizará cuando el canal se cierre.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

El código anterior utiliza un channel para imprimir 1 y 2. En este código, se utiliza for range para recibir e imprimir datos cada vez que se añaden al canal. El bucle finaliza cuando el canal se cierra.

Como se ha escrito varias veces, esta sintaxis también puede utilizarse como un simple mecanismo de sincronización.

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("Hello, World!")
 9    }()
10
11    fmt.Println("Waiting for goroutine...")
12    for range ch {}
13}

El código anterior espera 1 segundo y luego imprime Hello, World!. En este código, se ha utilizado un channel para convertir el código síncrono en código asíncrono. De esta manera, al usar un channel, se puede convertir fácilmente el código síncrono en código asíncrono y establecer puntos de join.

etc

  1. Si se envían o reciben datos a un canal nil, puede producirse un "deadlock" al caer en un bucle infinito.
  2. Si se envían datos a un canal después de cerrarlo, se producirá un panic.
  3. Aunque no se cierre explícitamente, el GC lo cerrará al recolectarlo.

mutex

spinlock

Un spinlock es un método de sincronización que intenta adquirir un bloqueo repetidamente mediante un bucle. En el lenguaje Go, se puede implementar fácilmente un spinlock utilizando punteros.

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

El código anterior implementa el paquete spinlock. En este código, SpinLock se implementa utilizando el paquete sync/atomic. El método Lock intenta adquirir el bloqueo utilizando atomic.CompareAndSwapUintptr, y el método Unlock libera el bloqueo utilizando atomic.StoreUintptr. Dado que este método intenta adquirir el bloqueo sin interrupción, consume continuamente la CPU hasta que se obtiene el bloqueo, lo que puede llevar a un bucle infinito. Por lo tanto, se recomienda utilizar spinlock para sincronizaciones simples o para períodos de tiempo cortos.

sync.Mutex

Un mutex es una herramienta para la sincronización entre goroutines. El mutex proporcionado por el paquete sync ofrece métodos como Lock, Unlock, RLock y RUnlock. Un mutex puede crearse con sync.Mutex, y también se puede utilizar un bloqueo de lectura/escritura con sync.RWMutex.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

En el código anterior, dos goroutines acceden casi simultáneamente a la misma variable count. Al utilizar un mutex para convertir el código que accede a la variable count en una sección crítica, se puede prevenir el acceso concurrente a la variable count. De este modo, este código siempre imprimirá 2, independientemente de cuántas veces se ejecute.

sync.RWMutex

sync.RWMutex es un mutex que permite diferenciar entre bloqueos de lectura y escritura. Los métodos RLock y RUnlock se utilizan para adquirir y liberar bloqueos de lectura.

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

El código anterior implementa ConcurrentMap utilizando sync.RWMutex. En este código, el método Get adquiere un bloqueo de lectura y el método Set adquiere un bloqueo de escritura, permitiendo un acceso y modificación seguros del mapa data. La necesidad de un bloqueo de lectura surge porque, en casos donde hay muchas operaciones de lectura, se pueden usar solo bloqueos de lectura para permitir que múltiples goroutines realicen operaciones de lectura concurrentemente, sin necesidad de un bloqueo de escritura. Esto mejora el rendimiento en situaciones donde no hay cambios de estado y, por lo tanto, no es necesario un bloqueo de escritura.

fakelock

fakelock es un truco sencillo que implementa sync.Locker. Esta estructura proporciona los mismos métodos que sync.Mutex, pero no realiza ninguna operación real.

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

El código anterior implementa el paquete fakelock. Este paquete implementa sync.Locker para proporcionar los métodos Lock y Unlock, pero en realidad no realiza ninguna operación. Explicaré por qué es necesario este tipo de código si surge la oportunidad.

waitgroup

sync.WaitGroup

sync.WaitGroup es una herramienta que espera a que todas las goroutines completen su trabajo. Proporciona los métodos Add, Done y Wait. El método Add incrementa el contador de goroutines, Done notifica que una goroutine ha finalizado su trabajo, y Wait espera hasta que todas las goroutines hayan terminado.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

El código anterior utiliza sync.WaitGroup para que 100 goroutines sumen valores a la variable c concurrentemente. En este código, sync.WaitGroup se utiliza para esperar a que todas las goroutines finalicen, y luego se imprime el valor sumado en la variable c. Si bien los canales son suficientes para realizar operaciones simples de "fork & join" en un número reducido de tareas, sync.WaitGroup es una buena opción cuando se trata de un gran volumen de tareas de "fork & join".

with slice

Cuando se utiliza con un "slice", waitgroup puede ser una excelente herramienta para gestionar tareas concurrentes sin necesidad de bloqueos.

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	arr := [10]int{}
12
13	for i := 0; i < 10; i++ {
14		wg.Add(1)
15		go func(id int) {
16			defer wg.Done()
17
18			arr[id] = rand.Intn(100)
19		}(i)
20	}
21
22	wg.Wait()
23	fmt.Println("Done")
24
25    for i, v := range arr {
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

El código anterior utiliza únicamente waitgroup para que cada goroutine genere simultáneamente 10 enteros aleatorios y los almacene en el índice asignado. En este código, waitgroup se utiliza para esperar a que todas las goroutines terminen, y luego se imprime "Done". De esta manera, al usar waitgroup, múltiples goroutines pueden realizar tareas concurrentemente, almacenar datos sin bloqueos hasta que todas las goroutines finalicen, y luego realizar un procesamiento posterior de forma conjunta.

golang.org/x/sync/errgroup.ErrGroup

errgroup es un paquete que extiende sync.WaitGroup. A diferencia de sync.WaitGroup, errgroup cancela todas las goroutines y devuelve un error si se produce un error en alguna de las tareas de las goroutines.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}

El código anterior utiliza errgroup para crear 10 goroutines y provoca un error en la quinta goroutine. Se ha inducido un error en la quinta goroutine intencionadamente para ilustrar los casos en que se produce un error. Sin embargo, en la práctica, errgroup se utiliza para crear goroutines y luego se gestionan diversos tratamientos posteriores en caso de que se produzcan errores en cada goroutine.

once

Es una herramienta que ejecuta un código que solo debe ejecutarse una vez. Se pueden ejecutar códigos relacionados a través de los siguientes constructores.

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc simplemente permite que la función se ejecute una única vez en todo el ciclo de vida.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}

El código anterior utiliza sync.OnceFunc para imprimir Hello, World!. En este código, sync.OnceFunc se utiliza para crear la función once, y aunque se llame a once varias veces, Hello, World! se imprime solo una vez.

OnceValue

OnceValue no solo asegura que la función se ejecute una única vez en su totalidad, sino que también almacena el valor de retorno de esa función y lo devuelve en llamadas posteriores.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}

El código anterior utiliza sync.OnceValue para incrementar la variable c en 1. En este código, sync.OnceValue se utiliza para crear la función once, y aunque se llame a once varias veces, la variable c solo se incrementa una vez y devuelve 1.

OnceValues

OnceValues funciona de manera idéntica a OnceValue, pero puede devolver múltiples valores.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}

El código anterior utiliza sync.OnceValues para incrementar la variable c en 1. En este código, sync.OnceValues se utiliza para crear la función once, y aunque se llame a once varias veces, la variable c solo se incrementa una vez y devuelve 1.

atomic

El paquete atomic proporciona operaciones atómicas. El paquete atomic ofrece métodos como Add, CompareAndSwap, Load, Store y Swap, pero recientemente se recomienda el uso de tipos como Int64, Uint64 y Pointer.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Este es el ejemplo utilizado anteriormente. Es un código que incrementa atómicamente la variable c utilizando el tipo atomic.Int64. Con los métodos Add y Load, se puede incrementar y leer la variable atómicamente. Además, con el método Store se puede almacenar un valor, con Swap se puede intercambiar un valor, y con CompareAndSwap se puede comparar un valor y reemplazarlo si es adecuado.

cond

sync.Cond

El paquete cond proporciona variables de condición. El paquete cond puede crearse con sync.Cond y ofrece los métodos Wait, Signal y Broadcast.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}

El código anterior utiliza sync.Cond para esperar hasta que la variable ready sea true. En este código, sync.Cond se utiliza para esperar hasta que la variable ready sea true, y luego imprime Ready!. De esta manera, al usar sync.Cond, se puede hacer que múltiples goroutines esperen simultáneamente hasta que se cumpla una condición específica.

Esto se puede utilizar para implementar una queue sencilla.

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

Al utilizar sync.Cond de esta manera, en lugar de consumir mucha CPU con un spin-lock, se puede esperar eficientemente y reanudar la operación cuando se cumpla la condición.

semaphore

golang.org/x/sync/semaphore.Semaphore

El paquete semaphore proporciona semáforos. El paquete semaphore se puede crear con golang.org/x/sync/semaphore.Semaphore y ofrece los métodos Acquire, Release y TryAcquire.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}

El código anterior utiliza semaphore para crear un semáforo, y luego utiliza el semáforo para adquirirlo con el método Acquire y liberarlo con el método Release. En este código, se ha demostrado cómo adquirir y liberar un semáforo utilizando semaphore.

Conclusión

Con esto debería bastar para los conceptos básicos. Espero que, basándose en el contenido de este artículo, puedan comprender y utilizar en la práctica cómo gestionar la concurrencia con goroutines. Espero que este artículo les haya sido de ayuda. Gracias.