Mejora de la reactividad con el almacenamiento en caché del lado del cliente de Redis
¿Qué es Redis?
Supongo que la mayoría de ustedes ya conocen Redis. Sin embargo, para aquellos que no, se puede resumir brevemente con las siguientes características:
- Las operaciones se ejecutan en un hilo único, lo que confiere atomicidad a todas ellas.
- Los datos se almacenan y procesan en memoria, lo que hace que todas las operaciones sean rápidas.
- Redis puede almacenar un WAL (Write-Ahead Log) según la opción, lo que permite un respaldo y recuperación rápidos del estado más reciente.
- Soporta varios tipos como Set, Hash, Bit, List, lo que resulta en una alta productividad.
- Posee una gran comunidad, lo que permite compartir diversas experiencias, problemas y soluciones.
- Ha sido desarrollado y operado durante mucho tiempo, lo que le confiere una estabilidad fiable.
Al grano
¿Imagínese?
¿Qué sucedería si el caché de su servicio cumpliera las dos condiciones siguientes?
- Cuando los datos consultados con frecuencia deben proporcionarse al usuario en su estado más reciente, pero la actualización es irregular, lo que requiere una frecuente renovación del caché.
- Cuando la actualización no es necesaria, pero se debe acceder y consultar con frecuencia los mismos datos en caché.
El primer caso podría considerar el ranking de popularidad en tiempo real de un centro comercial. Al almacenar el ranking de popularidad en tiempo real de un centro comercial como un sorted set, sería ineficiente si Redis lo leyera cada vez que un usuario accede a la página principal. En el segundo caso, para los datos de tipo de cambio, aunque los datos de tipo de cambio se publican aproximadamente cada 10 minutos, las consultas reales ocurren con mucha frecuencia. Además, para el won-dólar, el won-yen y el won-yuan, el caché se consulta con mucha frecuencia. En estos casos, sería más eficiente que el servidor API tuviera un caché local separado y, cuando los datos cambiaran, consultara Redis nuevamente para actualizarlos.
Entonces, ¿cómo se puede implementar esta operación en una estructura de base de datos - Redis - servidor API?
¿No se puede hacer con Redis PubSub?
¡Cuando uses caché, suscríbete a un canal que reciba notificaciones de actualización!
- Entonces, se debe crear una lógica para enviar mensajes al momento de la actualización.
- La operación adicional debido a PubSub afecta el rendimiento.
Entonces, ¿qué pasa si Redis detecta los cambios?
¿Qué sucede si se utiliza Keyspace Notification para recibir notificaciones de comandos para esa clave?
- Existe la molestia de tener que almacenar y compartir de antemano las claves y comandos utilizados para la actualización.
- Por ejemplo, se vuelve complejo que para algunas claves, un simple Set sea el comando de actualización, mientras que para otras, LPush, RPush, SAdd o SRem sean los comandos de actualización.
- Esto aumenta significativamente la posibilidad de errores de comunicación y errores humanos en la codificación durante el proceso de desarrollo.
¿Qué sucede si se utiliza Keyevent Notification para recibir notificaciones por comando?
- Se requiere la suscripción a todos los comandos utilizados para la actualización. Se necesita un filtrado adecuado para las claves entrantes.
- Por ejemplo, es muy probable que, de todas las claves que llegan por Del, algunos clientes no tengan un caché local para ellas.
- Esto puede llevar a un desperdicio innecesario de recursos.
¡Por eso necesitamos un Invalidation Message!
¿Qué es un Invalidation Message?
Los Mensajes de Invalidación son un concepto introducido a partir de Redis 6.0 como parte del Client-Side Cache asistido por el servidor. Un Mensaje de Invalidación se transmite según el siguiente flujo:
- Se asume que ClientB ya ha leído la clave una vez.
- ClientA establece nuevamente esa clave.
- Redis detecta el cambio y emite un Invalidation Message a ClientB para indicarle que borre el caché en ClientB.
- ClientB recibe el mensaje y toma las medidas adecuadas.
Cómo usarlo
Estructura de operación básica
El cliente conectado a Redis ejecuta CLIENT TRACKING ON REDIRECT <client-id>
para recibir mensajes de invalidación. Y el cliente que debe recibir los mensajes se suscribe a SUBSCRIBE __redis__:invalidate
para recibirlos.
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
¡Implementación! ¡Implementación! ¡Implementación!
Redigo + Ristretto
Explicarlo así puede resultar ambiguo sobre cómo usarlo en el código. Así que, primero, lo configuraremos brevemente con redigo
y ristretto
.
Primero, instale las dos dependencias.
github.com/gomodule/redigo
github.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // número de claves para rastrear la frecuencia (10M).
22 MaxCost: 1 << 30, // costo máximo del caché (1GB).
23 BufferItems: 64, // número de claves por búfer Get.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Primero, creamos un RedisClient
que incluye ristretto y redigo de forma sencilla.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
El código es un poco complejo.
- Se establece una conexión adicional para el Tracking. Esto se hace considerando que PubSub podría interferir con otras operaciones.
- Se consulta el ID de la conexión adicional para que la conexión que consulta los datos redirija el Tracking a esa conexión.
- Y se suscribe a los mensajes de invalidación.
- El código que maneja la suscripción es un poco complejo. Dado que redigo no analiza los mensajes de invalidación, se debe recibir y procesar la respuesta antes del análisis.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
El mensaje Get
primero consulta Ristretto, y si no se encuentra, lo obtiene de Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
El código para probar es el anterior. Si lo pruebas, podrás ver que los valores se actualizan cada vez que los datos se actualizan en Redis.
Sin embargo, esto es demasiado complejo. Además, para escalar a un clúster, es inevitable que se deba habilitar el Tracking para todos los maestros o réplicas.
Rueidis
Para quienes utilizan Go, contamos con rueidis
, la herramienta más moderna y avanzada. A continuación, se presenta un código que utiliza la caché del lado del cliente asistida por el servidor en un entorno de clúster Redis, implementado con rueidis.
Primero, instale la dependencia.
github.com/redis/rueidis
Luego, escriba el código para consultar los datos en Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
Con rueidis, para usar el Client-Side Cache, solo es necesario llamar a DoCache
. Esto agregará los datos a la caché local, especificando cuánto tiempo se mantendrán. Luego, si se llama a DoCache
nuevamente, los datos se consultarán desde la caché local. Naturalmente, los mensajes de invalidación también se manejan correctamente.
¿Por qué no redis-go?
Lamentablemente, redis-go
no soporta oficialmente el Client-Side Cache asistido por el servidor. Además, al crear un PubSub, no hay una API que acceda directamente a la nueva conexión creada, por lo que no se puede conocer el ID del cliente. Por lo tanto, se determinó que la configuración de redis-go
era imposible y se omitió.
Qué atractivo
A través de la estructura de caché del lado del cliente
- Si los datos pueden prepararse de antemano, esta estructura permitirá minimizar las consultas y el tráfico a Redis, a la vez que proporcionará siempre los datos más recientes.
- Esto permitirá crear una especie de estructura CQRS para mejorar drásticamente el rendimiento de lectura.
¿Cuánto más atractivo se volvió?
De hecho, como esta estructura se está utilizando actualmente en el campo, hemos buscado brevemente la latencia de las dos API. Por favor, comprenda que solo puedo expresarlo de forma muy abstracta.
- Primera API
- Consulta inicial: promedio de 14.63 ms
- Consulta posterior: promedio de 2.82 ms
- Diferencia promedio: 10.98 ms
- Segunda API
- Consulta inicial: promedio de 14.05 ms
- Consulta posterior: promedio de 1.60 ms
- Diferencia promedio: 11.57 ms
¡Hubo una mejora adicional en la latencia de hasta el 82%!
Probablemente se esperaban las siguientes mejoras:
- Eliminación del proceso de comunicación de red entre el cliente y Redis y ahorro de tráfico.
- Disminución del número de comandos de lectura que Redis debe ejecutar.
- Esto también mejora el rendimiento de escritura.
- Minimización del análisis del protocolo de Redis.
- El análisis del protocolo de Redis no tiene un costo nulo. Reducirlo es una gran oportunidad.
Sin embargo, todo es una compensación. Para ello, sacrificamos al menos dos cosas:
- Necesidad de implementar, operar y mantener los elementos de gestión de la caché del lado del cliente.
- Aumento del uso de CPU y memoria del cliente debido a esto.
Conclusión
Personalmente, fue un componente arquitectónico satisfactorio, y el estrés sobre la latencia y el servidor API fue muy bajo. En el futuro, si es posible, considero que sería bueno construir la arquitectura con esta estructura.