GoSuda

Paquete de inicio de concurrencia en Go

By snowmerak
views ...

Esquema

Breve Introducción

El lenguaje Go tiene muchas herramientas para el manejo de la concurrencia. En este artículo, presentaremos algunas de ellas y trucos.

¿Goroutines?

Una goroutine es un nuevo modelo de concurrencia soportado por el lenguaje Go. Generalmente, un programa recibe hilos del sistema operativo para realizar múltiples tareas simultáneamente y realiza trabajos en paralelo según el número de núcleos. Y para llevar a cabo concurrencia en unidades más pequeñas, se generan hilos verdes en el espacio de usuario, permitiendo que varios hilos verdes giren y realicen tareas dentro de un solo hilo del sistema operativo. Sin embargo, en el caso de las goroutines, estos tipos de hilos verdes se han hecho más pequeños y eficientes. Estas goroutines usan menos memoria que los hilos y se pueden crear e intercambiar más rápidamente que los hilos.

Para usar goroutines, simplemente se debe usar la palabra clave go. Esto permite que, en el proceso de escritura de un programa, el código síncrono se ejecute intuitivamente como código asíncrono.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch := make(chan struct{})
10    go func() {
11        defer close(ch)
12        time.Sleep(1 * time.Second)
13        fmt.Println("¡Hola, Mundo!")
14    }()
15
16    fmt.Println("Esperando la goroutine...")
17    for range ch {}
18}

Este código cambia el código síncrono que simplemente espera 1 segundo e imprime ¡Hola, Mundo! a un flujo asíncrono. El ejemplo actual es simple, pero si se cambia un código un poco más complejo de código síncrono a código asíncrono, la legibilidad, visibilidad y comprensión del código mejoran aún más que los métodos existentes como async await o promise.

Sin embargo, en muchos casos, se puede generar un mal código de goroutines si no se comprende simplemente el flujo de llamar a este código síncrono de forma asíncrona y flujos como fork & join (un flujo similar a dividir y conquistar). En este artículo, presentaremos algunos métodos y técnicas que pueden prepararse para estos casos.

Gestión de la concurrencia

Contexto

Puede ser sorprendente que el context aparezca como la primera técnica de gestión. Sin embargo, en el lenguaje Go, el context desempeña un papel excelente en la gestión de todo el árbol de tareas, más allá de la simple función de cancelación. Si hay alguien que no lo conoce, explicaré brevemente este paquete.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithCancel(context.Background())
 5    defer cancel()
 6
 7    go func() {
 8        <-ctx.Done()
 9        fmt.Println("¡El Contexto ha terminado!")
10    }()
11
12    time.Sleep(1 * time.Second)
13
14    cancel()
15
16    time.Sleep(1 * time.Second)
17}

El código anterior es un código que imprime ¡El Contexto ha terminado! después de 1 segundo utilizando context. El context puede verificar si se cancela a través del método Done(), y proporciona varios métodos de cancelación a través de métodos como WithCancel, WithTimeout, WithDeadline, WithValue.

Creemos un ejemplo simple. Supongamos que están escribiendo código para obtener user, post y comment utilizando el patrón aggregator para obtener algunos datos. Y si todas las solicitudes deben hacerse en 2 segundos, pueden escribirlo de la siguiente manera.

 1package main
 2
 3func main() {
 4    ctx, cancel := context.WithTimeout(context.Background(), 2 * time.Second)
 5    defer cancel()
 6
 7    ch := make(chan struct{})
 8    go func() {
 9        defer close(ch)
10        user := getUser(ctx)
11        post := getPost(ctx)
12        comment := getComment(ctx)
13
14        fmt.Println(user, post, comment)
15    }()
16
17    select {
18    case <-ctx.Done():
19        fmt.Println("¡Tiempo de espera!")
20    case <-ch:
21        fmt.Println("¡Todos los datos han sido obtenidos!")
22    }
23}

El código anterior imprime ¡Tiempo de espera! si no se pueden obtener todos los datos en 2 segundos, e imprime ¡Todos los datos han sido obtenidos! si se obtienen todos los datos. Al usar context de esta manera, se puede gestionar fácilmente la cancelación y el tiempo de espera incluso en código donde varias goroutines están funcionando.

Se pueden encontrar varias funciones y métodos relacionados con el contexto en godoc context. Espero que puedan aprender lo simple y usarlo cómodamente.

Canal

Canal sin búfer

Un channel es una herramienta para la comunicación entre goroutines. Un channel se puede crear con make(chan T). Aquí, T es el tipo de datos que transmitirá ese channel. Un channel puede enviar y recibir datos con <-, y puede cerrar un channel con close.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

El código anterior es un código que imprime 1 y 2 utilizando un channel. En este código, solo se muestra el envío y la recepción de valores a un channel. Pero un channel ofrece más funciones que esto. Primero, veamos el buffered channel y el unbuffered channel. Antes de empezar, el ejemplo escrito anteriormente es un unbuffered channel, y el acto de enviar datos a un canal y el acto de recibir datos deben realizarse simultáneamente. Si estas acciones no se realizan simultáneamente, puede ocurrir un interbloqueo.

Canal con búfer

¿Qué pasa si el código anterior no es una simple salida, sino dos procesos que realizan operaciones pesadas? Si el segundo proceso se cuelga durante un largo período de tiempo mientras lee y procesa, el primer proceso también se detendrá durante ese tiempo. Podemos utilizar buffered channel para evitar esta situación.

 1package main
 2
 3func main() {
 4    ch := make(chan int, 2)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

El código anterior es un código que imprime 1 y 2 utilizando un buffered channel. En este código, utilizando un buffered channel, hemos hecho que la acción de enviar datos a un channel y la acción de recibir datos no tengan que realizarse simultáneamente. Al agregar un búfer a un canal de esta manera, se puede generar un margen, evitando así el retraso en las operaciones causado por el impacto de las operaciones posteriores.

select

Al tratar con múltiples canales, se puede implementar fácilmente una estructura de fan-in utilizando la sintaxis select.

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6)
 7
 8func main() {
 9    ch1 := make(chan int, 10)
10    ch2 := make(chan int, 10)
11    ch3 := make(chan int, 10)
12
13    go func() {
14        for {
15            ch1 <- 1
16            time.Sleep(1 * time.Second)
17        }
18    }()
19    go func() {
20        for {
21            ch2 <- 2
22            time.Sleep(2 * time.Second)
23        }
24    }()
25    go func() {
26        for {
27            ch3 <- 3
28            time.Sleep(3 * time.Second)
29        }
30    }()
31
32    for i := 0; i < 3; i++ {
33        select {
34        case v := <-ch1:
35            fmt.Println(v)
36        case v := <-ch2:
37            fmt.Println(v)
38        case v := <-ch3:
39            fmt.Println(v)
40        }
41    }
42}

El código anterior es un código que crea 3 canales que transmiten periódicamente 1, 2 y 3, y utiliza select para recibir valores de los canales e imprimirlos. Utilizando select de esta manera, se pueden recibir datos de varios canales simultáneamente y procesarlos a medida que se reciben los valores de los canales.

for range

Un channel puede recibir datos fácilmente usando for range. Si se usa for range en un canal, funcionará cada vez que se agreguen datos a ese canal y finalizará el bucle cuando el canal se cierre.

 1package main
 2
 3func main() {
 4    ch := make(chan int)
 5    go func() {
 6        ch <- 1
 7        ch <- 2
 8        close(ch)
 9    }()
10
11    for i := range ch {
12        fmt.Println(i)
13    }
14}

El código anterior es un código que imprime 1 y 2 utilizando un channel. En este código, usamos for range para recibir datos e imprimirlos cada vez que se agregan datos al canal. Y cuando el canal se cierra, finaliza el bucle.

Como se ha escrito varias veces anteriormente, esta sintaxis también se puede utilizar para medios de sincronización simples.

 1package main
 2
 3func main() {
 4    ch := make(chan struct{})
 5    go func() {
 6        defer close(ch)
 7        time.Sleep(1 * time.Second)
 8        fmt.Println("¡Hola, Mundo!")
 9    }()
10
11    fmt.Println("Esperando la goroutine...")
12    for range ch {}
13}

El código anterior es un código que espera 1 segundo e imprime ¡Hola, Mundo!. En este código, hemos cambiado el código síncrono a código asíncrono utilizando un channel. Al usar channel de esta manera, se puede cambiar fácilmente el código síncrono a código asíncrono y establecer puntos de join.

etc

  1. Si envía o recibe datos en un canal nulo, puede caer en un bucle infinito y provocar un interbloqueo.
  2. Si envía datos después de cerrar un canal, se produce un pánico.
  3. Incluso si no cierra un canal, el GC lo recolecta y cierra el canal.

Mutex

spinlock

Un spinlock es un método de sincronización que intenta continuamente obtener un bloqueo girando un bucle. En el lenguaje Go, se puede implementar fácilmente un spinlock usando punteros.

 1package spinlock
 2
 3import (
 4    "runtime"
 5    "sync/atomic"
 6)
 7
 8type SpinLock struct {
 9    lock uintptr
10}
11
12func (s *SpinLock) Lock() {
13    for !atomic.CompareAndSwapUintptr(&s.lock, 0, 1) {
14        runtime.Gosched()
15    }
16}
17
18func (s *SpinLock) Unlock() {
19    atomic.StoreUintptr(&s.lock, 0)
20}
21
22func NewSpinLock() *SpinLock {
23    return &SpinLock{}
24}

El código anterior es un código que implementa el paquete spinlock. En este código, se usa el paquete sync/atomic para implementar SpinLock. En el método Lock, se usa atomic.CompareAndSwapUintptr para intentar obtener un bloqueo, y en el método Unlock, se usa atomic.StoreUintptr para liberar el bloqueo. Dado que este método intenta obtener un bloqueo sin descansar, continúa usando la CPU hasta que obtiene un bloqueo, lo que puede provocar un bucle infinito. Por lo tanto, es mejor usar spinlock para una sincronización simple o cuando se usa solo por un corto tiempo.

sync.Mutex

Un mutex es una herramienta para la sincronización entre goroutines. El mutex proporcionado por el paquete sync proporciona métodos como Lock, Unlock, RLock y RUnlock. Un mutex se puede crear con sync.Mutex, y también se puede usar un bloqueo de lectura/escritura con sync.RWMutex.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    var mu sync.Mutex
 9    var count int
10
11    go func() {
12        mu.Lock()
13        count++
14        mu.Unlock()
15    }()
16
17    mu.Lock()
18    count++
19    mu.Unlock()
20
21    println(count)
22}

En el código anterior, dos goroutines acceden casi simultáneamente a la misma variable count. En este momento, si se hace que el código que accede a la variable count sea una sección crítica utilizando un mutex, se puede evitar el acceso concurrente a la variable count. Entonces este código siempre imprimirá 2 de la misma manera, sin importar cuántas veces se ejecute.

sync.RWMutex

sync.RWMutex es un mutex que puede usar bloqueos de lectura y escritura por separado. Los métodos RLock y RUnlock se pueden usar para bloquear y liberar un bloqueo de lectura.

 1package cmap
 2
 3import (
 4    "sync"
 5)
 6
 7type ConcurrentMap[K comparable, V any] struct {
 8    sync.RWMutex
 9    data map[K]V
10}
11
12func (m *ConcurrentMap[K, V]) Get(key K) (V, bool) {
13    m.RLock()
14    defer m.RUnlock()
15
16    value, ok := m.data[key]
17    return value, ok
18}
19
20func (m *ConcurrentMap[K, V]) Set(key K, value V) {
21    m.Lock()
22    defer m.Unlock()
23
24    m.data[key] = value
25}

El código anterior es un código que implementa ConcurrentMap utilizando sync.RWMutex. En este código, el método Get bloquea la lectura y el método Set bloquea la escritura para acceder y modificar de forma segura el mapa data. La razón por la que se necesita un bloqueo de lectura es que, en el caso de muchas operaciones de lectura simples, se puede mejorar el rendimiento al permitir que varias goroutines realicen operaciones de lectura simultáneamente bloqueando solo la lectura sin bloquear la escritura. A través de esto, si no hay un cambio de estado que no requiera un bloqueo de escritura, el rendimiento se puede mejorar bloqueando solo la lectura.

fakelock

fakelock es un simple truco que implementa sync.Locker. Esta estructura proporciona los mismos métodos que sync.Mutex, pero no realiza ninguna operación real.

1package fakelock
2
3type FakeLock struct{}
4
5func (f *FakeLock) Lock() {}
6
7func (f *FakeLock) Unlock() {}

El código anterior es un código que implementa el paquete fakelock. Este paquete proporciona los métodos Lock y Unlock al implementar sync.Locker, pero en realidad no hace nada. La razón por la que se necesita este código se describirá cuando surja la oportunidad.

waitgroup

sync.WaitGroup

sync.WaitGroup es una herramienta para esperar hasta que todas las goroutines hayan terminado su trabajo. Proporciona los métodos Add, Done y Wait. El método Add agrega el número de goroutines, el método Done indica que el trabajo de la goroutine ha terminado, y el método Wait espera hasta que todas las goroutines hayan terminado su trabajo.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

El código anterior utiliza sync.WaitGroup para permitir que 100 gorutinas agreguen valores a la variable c simultáneamente. En este código, sync.WaitGroup se emplea para esperar hasta que todas las gorutinas finalicen, y luego se imprime el valor sumado a la variable c. En el caso de simplemente ejecutar algunas operaciones con fork & join, el uso exclusivo de canales puede ser suficiente, pero al ejecutar una gran cantidad de operaciones con fork & join, también es una buena opción utilizar sync.WaitGroup.

con slice

Si se utiliza con un slice, waitgroup puede ser una excelente herramienta para gestionar operaciones de ejecución simultánea sin bloqueos.

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6    "rand"
 7)
 8
 9func main() {
10	var wg sync.WaitGroup
11	arr := [10]int{}
12
13	for i := 0; i < 10; i++ {
14		wg.Add(1)
15		go func(id int) {
16			defer wg.Done()
17
18			arr[id] = rand.Intn(100)
19		}(i)
20	}
21
22	wg.Wait()
23	fmt.Println("Done")
24
25    for i, v := range arr {
26        fmt.Printf("arr[%d] = %d\n", i, v)
27    }
28}

El código anterior utiliza únicamente waitgroup para que cada gorutina genere simultáneamente 10 números enteros aleatorios y los almacene en el índice asignado. En este código, waitgroup se utiliza para esperar hasta que todas las gorutinas terminen y luego se imprime Done. Al utilizar waitgroup de esta manera, varias gorutinas pueden realizar operaciones simultáneamente, almacenar datos sin bloqueos hasta que todas las gorutinas terminen y realizar el post-procesamiento de forma colectiva después de la finalización del trabajo.

golang.org/x/sync/errgroup.ErrGroup

errgroup es un paquete que extiende sync.WaitGroup. A diferencia de sync.WaitGroup, si se produce un error en cualquiera de las operaciones de las gorutinas, errgroup cancela todas las gorutinas y devuelve el error.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/errgroup"
 7)
 8
 9func main() {
10    g, ctx := errgroup.WithContext(context.Background())
11    _ = ctx
12
13    for i := 0; i < 10; i++ {
14        i := i
15        g.Go(func() error {
16            if i == 5 {
17                return fmt.Errorf("error")
18            }
19            return nil
20        })
21    }
22
23    if err := g.Wait(); err != nil {
24        fmt.Println(err)
25    }
26}

El código anterior utiliza errgroup para crear 10 gorutinas y provocar un error en la quinta gorutina. Intencionalmente generamos un error en la quinta gorutina para mostrar el caso en el que se produce un error. Sin embargo, en la práctica, errgroup se utiliza para crear gorutinas y realizar diversos post-procesamientos en caso de que se produzcan errores en cada gorutina.

once

Es una herramienta para ejecutar código que solo debe ejecutarse una vez. El código relacionado se puede ejecutar a través de los siguientes constructores.

1func OnceFunc(f func()) func()
2func OnceValue[T any](f func() T) func() T
3func OnceValues[T1, T2 any](f func() (T1, T2)) func() (T1, T2)

OnceFunc

OnceFunc simplemente asegura que la función correspondiente se pueda ejecutar solo una vez en todo el proceso.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    once := sync.OnceFunc(func() {
 7        println("Hello, World!")
 8    })
 9
10    once()
11    once()
12    once()
13    once()
14    once()
15}

El código anterior utiliza sync.OnceFunc para imprimir Hello, World!. En este código, se crea la función once usando sync.OnceFunc, e incluso si la función once se llama varias veces, Hello, World! se imprime solo una vez.

OnceValue

OnceValue no solo asegura que la función correspondiente se ejecute solo una vez en todo el proceso, sino que también almacena el valor de retorno de dicha función y devuelve el valor almacenado al ser llamada de nuevo.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValue(func() int {
 8        c += 1
 9        return c
10    })
11
12    println(once())
13    println(once())
14    println(once())
15    println(once())
16    println(once())
17}

El código anterior usa sync.OnceValue para aumentar la variable c en 1. En este código, la función once se crea usando sync.OnceValue, e incluso si la función once se llama varias veces, devuelve 1, ya que la variable c solo aumenta una vez.

OnceValues

OnceValues funciona de la misma manera que OnceValue, pero puede devolver varios valores.

 1package main
 2
 3import "sync"
 4
 5func main() {
 6    c := 0
 7    once := sync.OnceValues(func() (int, int) {
 8        c += 1
 9        return c, c
10    })
11
12    a, b := once()
13    println(a, b)
14    a, b = once()
15    println(a, b)
16    a, b = once()
17    println(a, b)
18    a, b = once()
19    println(a, b)
20    a, b = once()
21    println(a, b)
22}

El código anterior usa sync.OnceValues para aumentar la variable c en 1. En este código, la función once se crea usando sync.OnceValues, e incluso si la función once se llama varias veces, devuelve 1, ya que la variable c solo aumenta una vez.

atomic

El paquete atomic proporciona operaciones atómicas. El paquete atomic proporciona métodos como Add, CompareAndSwap, Load, Store y Swap, pero recientemente se recomienda el uso de tipos como Int64, Uint64 y Pointer.

 1package main
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8func main() {
 9    wg := sync.WaitGroup{}
10    c := atomic.Int64{}
11
12    for i := 0; i < 100 ; i++ {
13        wg.Add(1)
14        go func() {
15            defer wg.Done()
16            c.Add(1)
17        }()
18    }
19
20    wg.Wait()
21    println(c.Load())
22}

Este es el ejemplo que se usó anteriormente. Es un código que incrementa atómicamente la variable c utilizando el tipo atomic.Int64. Con el método Add y el método Load, se puede incrementar atómicamente una variable y leer la variable. Además, se puede almacenar un valor con el método Store, intercambiar un valor con el método Swap, y comparar un valor y reemplazarlo si es adecuado con el método CompareAndSwap.

cond

sync.Cond

El paquete cond proporciona variables de condición. El paquete cond se puede crear con sync.Cond y proporciona los métodos Wait, Signal y Broadcast.

 1package main
 2
 3import (
 4    "sync"
 5)
 6
 7func main() {
 8    c := sync.NewCond(&sync.Mutex{})
 9    ready := false
10
11    go func() {
12        c.L.Lock()
13        ready = true
14        c.Signal()
15        c.L.Unlock()
16    }()
17
18    c.L.Lock()
19    for !ready {
20        c.Wait()
21    }
22    c.L.Unlock()
23
24    println("Ready!")
25}

El código anterior utiliza sync.Cond para esperar hasta que la variable ready se vuelva true. En este código, se espera hasta que la variable ready se vuelva true usando sync.Cond, y luego se imprime Ready!. Al usar sync.Cond de esta manera, se puede hacer que varias gorutinas esperen simultáneamente hasta que se cumpla una condición específica.

Utilizando esto, se puede implementar una queue sencilla.

 1package queue
 2
 3import (
 4    "sync"
 5    "sync/atomic"
 6)
 7
 8type Node[T any] struct {
 9    Value T
10    Next  *Node[T]
11}
12
13type Queue[T any] struct {
14    sync.Mutex
15    Cond *sync.Cond
16    Head *Node[T]
17    Tail *Node[T]
18    Len  int
19}
20
21func New[T any]() *Queue[T] {
22    q := &Queue[T]{}
23    q.Cond = sync.NewCond(&q.Mutex)
24    return q
25}
26
27func (q *Queue[T]) Push(value T) {
28    q.Lock()
29    defer q.Unlock()
30
31    node := &Node[T]{Value: value}
32    if q.Len == 0 {
33        q.Head = node
34        q.Tail = node
35    } else {
36        q.Tail.Next = node
37        q.Tail = node
38    }
39    q.Len++
40    q.Cond.Signal()
41}
42
43func (q *Queue[T]) Pop() T {
44    q.Lock()
45    defer q.Unlock()
46
47    for q.Len == 0 {
48        q.Cond.Wait()
49    }
50
51    node := q.Head
52    q.Head = q.Head.Next
53    q.Len--
54    return node.Value
55}

Al utilizar sync.Cond de esta manera, en lugar de usar un spin-lock que consume mucha CPU, se puede esperar de manera eficiente y volver a operar cuando se cumple una condición.

semaphore

golang.org/x/sync/semaphore.Semaphore

El paquete semaphore proporciona semáforos. El paquete semaphore se puede crear con golang.org/x/sync/semaphore.Semaphore y proporciona los métodos Acquire, Release y TryAcquire.

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "golang.org/x/sync/semaphore"
 7)
 8
 9func main() {
10    s := semaphore.NewWeighted(1)
11
12    if s.TryAcquire(1) {
13        fmt.Println("Acquired!")
14    } else {
15        fmt.Println("Not Acquired!")
16    }
17
18    s.Release(1)
19}

El código anterior crea un semáforo usando semaphore, y usa el semáforo para adquirir el semáforo con el método Acquire y liberar el semáforo con el método Release. En este código, se demostró cómo adquirir y liberar un semáforo usando semaphore.

En conclusión

Parece que el contenido básico es suficiente hasta aquí. Basándose en el contenido de este artículo, espero que entiendan cómo gestionar la concurrencia utilizando gorutinas y que puedan usarlo en la práctica. Espero que este artículo les haya sido útil. Gracias.