Mejora de la reactividad con la caché del lado del cliente de Redis
¿Qué es Redis?
Pocas personas ignorarán qué es Redis. Sin embargo, para aquellos que no lo sepan, se puede resumir brevemente con las siguientes características:
- Las operaciones se ejecutan en un único hilo, lo que confiere atomicidad a todas ellas.
- Los datos se almacenan y operan en memoria, lo que hace que todas las operaciones sean rápidas.
- Redis puede almacenar WAL según la configuración, permitiendo copiar y recuperar rápidamente el estado más reciente.
- Soporta múltiples tipos como Set, Hash, Bit, List, lo que le otorga una alta productividad.
- Posee una gran comunidad, lo que facilita el intercambio de experiencias, problemas y soluciones.
- Ha sido desarrollado y operado durante mucho tiempo, lo que le confiere una estabilidad confiable.
Por lo tanto, al grano
¿Imagínese?
¿Qué pasaría si el caché de sus servicios cumpliera las siguientes dos condiciones?
- Cuando se debe proporcionar a los usuarios datos consultados frecuentemente en su estado más reciente, pero la actualización es irregular y requiere una frecuente renovación del caché.
- Cuando no se actualiza, pero se necesita acceder y consultar los mismos datos del caché con frecuencia.
El primer caso podría considerar la clasificación de popularidad en tiempo real de un centro comercial. Si se almacena la clasificación de popularidad en tiempo real de un centro comercial como un sorted set, sería ineficiente que Redis la leyera cada vez que un usuario accede a la página principal.
En el segundo caso, para los datos de tipo de cambio, aunque los datos de tipo de cambio se publiquen aproximadamente cada 10 minutos, las consultas reales ocurren con mucha frecuencia. Además, para el won-dólar, el won-yen y el won-yuan, el caché se consulta con gran asiduidad.
En estos casos, sería más eficiente que el servidor de la API tuviera un caché local separado y, cuando los datos cambiaran, consultara Redis nuevamente para actualizarlos.
Entonces, ¿cómo se podría implementar tal operación en una arquitectura de base de datos - Redis - servidor API?
¿No se puede hacer con Redis PubSub?
¡Al usar un caché, suscribámonos a un canal que pueda recibir notificaciones de actualización!
- Entonces, se debe crear una lógica para enviar mensajes al momento de la actualización.
- Las operaciones adicionales debido a PubSub afectan el rendimiento.


Entonces, ¿qué pasa si Redis detecta cambios?
¿Qué tal si usamos Keyspace Notification para recibir notificaciones de comandos para esa clave?
- Existe la molestia de tener que almacenar y compartir previamente las claves y los comandos utilizados para la actualización.
- Por ejemplo, para algunas claves, un simple Set es el comando de actualización, mientras que para otras, LPush, RPush, SAdd o SRem son los comandos de actualización, lo que lo hace más complejo.
- Esto aumenta significativamente la posibilidad de errores de comunicación y errores humanos en la codificación durante el proceso de desarrollo.
¿Qué tal si usamos Keyevent Notification para recibir notificaciones por unidad de comando?
- Se requiere la suscripción a todos los comandos utilizados para la actualización. Se necesita un filtrado adecuado para las claves entrantes.
- Por ejemplo, es probable que algunos clientes no tengan un caché local para algunas de las claves que llegan con
Del. - Esto puede llevar a un desperdicio innecesario de recursos.
¡Por eso se necesita el Invalidation Message!
¿Qué es un Invalidation Message?
Los Invalidation Messages son un concepto introducido a partir de Redis 6.0 como parte del Server Assisted Client-Side Cache. Un Invalidation Message se transmite siguiendo este flujo:
- Se asume que ClientB ya leyó una clave.
- ClientA establece un nuevo valor para esa clave.
- Redis detecta el cambio y publica un Invalidation Message a ClientB para indicarle que borre el caché.
- ClientB recibe ese mensaje y toma las medidas adecuadas.

Cómo usarlo
Estructura de operación básica
Un cliente conectado a Redis ejecuta CLIENT TRACKING ON REDIRECT <client-id> para recibir los mensajes de invalidación. Y el cliente que debe recibir los mensajes se suscribe a SUBSCRIBE __redis__:invalidate para recibirlos.
Seguimiento predeterminado
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
Seguimiento por difusión (broadcasting)
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
¡Implementación! ¡Implementación! ¡Implementación!
Redigo + Ristretto
Con solo esa explicación, sería ambiguo cómo usarlo realmente en el código. Así que, primero lo configuraremos brevemente con redigo y ristretto.
Primero, instale las dos dependencias.
github.com/gomodule/redigogithub.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // número de claves para rastrear la frecuencia (10M).
22 MaxCost: 1 << 30, // costo máximo del caché (1GB).
23 BufferItems: 64, // número de claves por búfer Get.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Primero, creamos un RedisClient que incluye brevemente ristretto y redigo.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
El código es un poco complejo.
- Para realizar el Tracking, se establece una conexión adicional. Esta medida se toma considerando que PubSub podría interferir con otras operaciones.
- Se consulta el ID de la conexión adicional para que la conexión que consulta los datos redirija el Tracking a esa conexión.
- Y se suscribe al mensaje de invalidación.
- El código que maneja la suscripción es un poco complejo. Dado que
redigono puede analizar los mensajes de invalidación, es necesario recibir la respuesta antes del análisis y procesarla.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
El mensaje Get primero consulta Ristretto y, si no lo encuentra, lo obtiene de Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
El código para realizar pruebas es el anterior. Si lo prueba, podrá observar que los datos se actualizan cada vez que se renuevan en Redis.
Sin embargo, esto es demasiado complejo. Sobre todo, para escalar a un clúster, es inevitable tener que habilitar el Tracking para todos los maestros o réplicas.
Rueidis
Para quienes utilizan Go, contamos con rueidis, la biblioteca más moderna y avanzada. A continuación, escribiremos un código que utiliza el caché del lado del cliente asistido por el servidor en un entorno de clúster de Redis, usando rueidis.
Primero, instale la dependencia.
github.com/redis/rueidis
Luego, escriba el código para consultar datos en Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
Para usar el caché del lado del cliente en rueidis, solo necesita llamar a DoCache. Esto agregará el elemento al caché local, especificando cuánto tiempo debe persistir. Al llamar a DoCache nuevamente, los datos se recuperarán del caché local. Naturalmente, los mensajes de invalidación también se manejan correctamente.
¿Por qué no redis-go?
redis-go lamentablemente no soporta la caché del lado del cliente asistida por el servidor a través de su API oficial. Incluso al crear PubSub, no hay una API para acceder directamente a la nueva conexión creada, por lo que no se puede conocer el client id. Por lo tanto, se ha descartado redis-go al considerar que su configuración es inviable.
Qué atractivo
A través de la estructura de caché del lado del cliente
- Si los datos pueden prepararse con antelación, esta estructura permitirá minimizar las consultas y el tráfico a Redis, mientras se proporciona siempre la información más reciente.
- Esto puede crear una especie de estructura CQRS, mejorando drásticamente el rendimiento de lectura.

¿Cuánto más atractivo se ha vuelto?
De hecho, como esta estructura se utiliza actualmente en producción, he buscado brevemente la latencia de dos API. Les pido disculpas por no poder ser más específico.
- Primera API
- En la consulta inicial: promedio de 14.63 ms
- En consultas posteriores: promedio de 2.82 ms
- Diferencia promedio: 10.98 ms
- Segunda API
- En la consulta inicial: promedio de 14.05 ms
- En consultas posteriores: promedio de 1.60 ms
- Diferencia promedio: 11.57 ms
¡Hubo una mejora adicional en la latencia de hasta un 82%!
Probablemente se esperaban las siguientes mejoras:
- Eliminación del proceso de comunicación de red entre el cliente y Redis y ahorro de tráfico.
- Reducción del número de comandos de lectura que Redis debe ejecutar.
- Esto también tiene el efecto de mejorar el rendimiento de escritura.
- Minimización del análisis del protocolo de Redis.
- El análisis del protocolo de Redis no tiene un costo nulo. Poder reducirlo es una gran oportunidad.
Sin embargo, todo es un compromiso. Para lograr esto, sacrificamos al menos dos cosas:
- Necesidad de implementar, operar y mantener un componente de gestión de caché del lado del cliente.
- Aumento del uso de CPU y memoria del cliente debido a esto.
Conclusión
Personalmente, fue un componente arquitectónico satisfactorio, y el estrés sobre la latencia y el servidor API fue muy bajo. En el futuro, si es posible, me gustaría configurar la arquitectura de esta manera.