Migliorare la reattività con Redis Client-Side Cache
Che cos'è Redis?
Ritengo che non siano molte le persone che non conoscono Redis. Tuttavia, per menzionarlo brevemente con alcune caratteristiche, possiamo riassumerlo come segue:
- Le operazioni vengono eseguite in un singolo thread, conferendo atomicità a tutte le operazioni.
- I dati vengono memorizzati ed elaborati In-Memory, rendendo tutte le operazioni veloci.
- Redis può memorizzare il WAL a seconda dell'opzione, consentendo un rapido backup e recupero dello stato più recente.
- Supporta vari tipi come Set, Hash, Bit, List, offrendo un'elevata produttività.
- Possiede una grande community, consentendo la condivisione di diverse esperienze, problematiche e soluzioni.
- È stato sviluppato e gestito a lungo, garantendo un'affidabile stabilità.
Quindi, al punto centrale
Immaginate?
Cosa succederebbe se la cache del vostro servizio soddisfacesse le seguenti due condizioni?
- Dover fornire agli utenti i dati consultati frequentemente nello stato più recente, ma dover aggiornare frequentemente la cache a causa di aggiornamenti irregolari.
- Non dover aggiornare i dati, ma dover accedere frequentemente agli stessi dati in cache per la consultazione.
Nel primo caso, si potrebbe considerare la classifica di popolarità in tempo reale di un negozio online. Quando la classifica di popolarità in tempo reale di un negozio online viene memorizzata come sorted set, leggerla da Redis ogni volta che un utente accede alla pagina principale sarebbe inefficiente. Nel secondo caso, per quanto riguarda i dati sui tassi di cambio, anche se i dati vengono pubblicati approssimativamente ogni 10 minuti, le consultazioni effettive avvengono molto frequentemente. Inoltre, per i tassi Won-Dollaro, Won-Yen e Won-Yuan, la cache viene consultata con altissima frequenza. In questi casi, sarebbe un'operazione più efficiente se l'API Server mantenesse una cache separata localmente e, quando i dati cambiano, consultasse nuovamente Redis per l'aggiornamento.
Allora, come si potrebbe implementare tale comportamento in una struttura Dati - Redis - API Server?
Non è possibile con Redis PubSub?
Quando si utilizza la cache, sottoscrivere un canale da cui ricevere notifiche di aggiornamento!
- Ciò richiederebbe la creazione di una logica per inviare messaggi al momento dell'aggiornamento.
- Le operazioni aggiuntive dovute a PubSub influenzerebbero le prestazioni.
E se Redis rilevasse il cambiamento?
Utilizzando Keyspace Notification per ricevere notifiche sui command relativi a quella key?
- Esiste l'inconveniente di dover memorizzare e condividere in anticipo le key e i command utilizzati per l'aggiornamento.
- Ad esempio, per alcune key, un semplice Set è il command di aggiornamento, mentre per altre key, LPush, RPush, SAdd o SRem diventano il command di aggiornamento, rendendo il tutto complesso.
- Ciò aumenta significativamente la possibilità di errori di comunicazione durante lo sviluppo e di errori umani nella codifica.
Utilizzando Keyevent Notification per ricevere notifiche a livello di command?
- È necessaria la sottoscrizione a tutti i command utilizzati per l'aggiornamento. Da ciò, è necessario un filtraggio appropriato delle key in arrivo.
- Ad esempio, è probabile che un determinato client non disponga della cache locale per alcune key tra tutte quelle ricevute tramite Del.
- Ciò può portare a uno spreco inutile di risorse.
Quindi, ciò che serve è Invalidation Message!
Che cos'è Invalidation Message?
Invalidation Messages è un concetto introdotto a partire da Redis 6.0 come parte di Server Assisted Client-Side Cache. Un Invalidation Message viene trasmesso secondo il seguente flusso:
- Si supponga che ClientB abbia già letto una key una volta.
- ClientA imposta quella key con un nuovo valore.
- Redis rileva il cambiamento e pubblica un Invalidation Message a ClientB per notificare a ClientB di cancellare la cache.
- ClientB riceve il messaggio e intraprende le azioni appropriate.
Come si usa?
Struttura operativa di base
Un client connesso a Redis riceve invalidation message eseguendo CLIENT TRACKING ON REDIRECT <client-id>
. Inoltre, il client che deve ricevere i messaggi si sottoscrive per ricevere invalidation message tramite SUBSCRIBE __redis__:invalidate
.
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Implementazione! Implementazione! Implementazione!
Redigo + Ristretto
Spiegandolo solo così, non è chiaro come utilizzarlo effettivamente a livello di codice. Quindi, proviamo innanzitutto a configurarlo semplicemente con redigo
e ristretto
.
Per prima cosa, installate le due dependency.
github.com/gomodule/redigo
github.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 // Configurazione della cache Ristretto
21 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
22 NumCounters: 1e7, // numero di chiavi di cui tracciare la frequenza (10M).
23 MaxCost: 1 << 30, // costo massimo della cache (1GB).
24 BufferItems: 64, // numero di chiavi per buffer Get.
25 })
26 if err != nil {
27 return nil, fmt.Errorf("failed to generate cache: %w", err)
28 }
29
30 // Connessione a Redis
31 conn, err := redis.Dial("tcp", addr)
32 if err != nil {
33 return nil, fmt.Errorf("failed to connect to redis: %w", err)
34 }
35
36 return &RedisClient{
37 conn: conn,
38 cache: cache,
39 addr: addr,
40 }, nil
41}
42
43func (r *RedisClient) Close() error {
44 // Chiusura della connessione Redis
45 err := r.conn.Close()
46 if err != nil {
47 return fmt.Errorf("failed to close redis connection: %w", err)
48 }
49
50 return nil
51}
Iniziamo creando un semplice RedisClient
che include ristretto e redigo.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 // Stabilisce una connessione separata per PubSub
3 psc, err := redis.Dial("tcp", r.addr)
4 if err != nil {
5 return fmt.Errorf("failed to connect to redis: %w", err)
6 }
7
8 // Ottiene l'ID del client per la connessione PubSub
9 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
10 if err != nil {
11 return fmt.Errorf("failed to get client id: %w", err)
12 }
13 slog.Info("client id", "id", clientId)
14
15 // Abilita il tracking sulla connessione dati, reindirizzando gli invalidation message alla connessione PubSub
16 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
17 if err != nil {
18 return fmt.Errorf("failed to enable tracking: %w", err)
19 }
20 slog.Info("subscription result", "result", subscriptionResult)
21
22 // Si sottoscrive al canale di invalidation message
23 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
24 return fmt.Errorf("failed to subscribe: %w", err)
25 }
26 psc.Flush()
27
28 // Ciclo di ricezione messaggi
29 for {
30 // Riceve messaggi
31 msg, err := psc.Receive()
32 if err != nil {
33 return fmt.Errorf("failed to receive message: %w", err)
34 }
35
36 // Gestisce diversi tipi di messaggi
37 switch msg := msg.(type) {
38 case redis.Message:
39 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
40 // In caso di messaggio singolo di invalidazione (per Redis < 6.2)
41 key := string(msg.Data)
42 r.cache.Del(key)
43 case redis.Subscription:
44 // Messaggio di sottoscrizione
45 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
46 case error:
47 // Gestisce errori
48 return fmt.Errorf("error: %w", msg)
49 case []interface{}:
50 // Gestisce invalidation message in formato array (per Redis >= 6.2)
51 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
52 slog.Warn("unexpected message", "message", msg)
53 continue
54 }
55
56 contents := msg[2].([]interface{})
57 keys := make([]string, len(contents))
58 for i, key := range contents {
59 keys[i] = string(key.([]byte))
60 // Elimina la chiave dalla cache locale
61 r.cache.Del(keys[i])
62 }
63 slog.Info("received invalidation message", "keys", keys)
64 default:
65 // Gestisce messaggi inattesi
66 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
67 }
68 }
69}
Il codice è un po' complesso.
- Per eseguire il Tracking, si stabilisce un'ulteriore connessione. Questa è una misura presa considerando che PubSub potrebbe interferire con altre operazioni.
- Si ottiene l'ID della connessione aggiunta, e si reindirizza il Tracking dalla connessione che consulta i dati a questa connessione.
- E poi ci si sottoscrive agli invalidation message.
- Il codice che gestisce la sottoscrizione è un po' complesso. Poiché redigo non gestisce il parsing dei messaggi di invalidazione, è necessario ricevere e elaborare la risposta prima del parsing.
1func (r *RedisClient) Get(key string) (any, error) {
2 // Cerca nella cache locale
3 val, found := r.cache.Get(key)
4 if found {
5 // Gestisce il tipo di valore
6 switch v := val.(type) {
7 case int64:
8 slog.Info("cache hit", "key", key)
9 return v, nil
10 default:
11 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
12 }
13 }
14 slog.Info("cache miss", "key", key)
15
16 // Se non trovato nella cache locale, lo ottiene da Redis
17 val, err := redis.Int64(r.conn.Do("GET", key))
18 if err != nil {
19 return nil, fmt.Errorf("failed to get key: %w", err)
20 }
21
22 // Aggiunge il valore alla cache locale con TTL
23 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
24 return val, nil
25}
Il metodo Get
come segue consulta prima ristretto e, se non lo trova, lo recupera da Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 // Configura il contesto per la gestione dei segnali di interruzione
13 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
14 defer cancel()
15
16 // Crea un nuovo RedisClient
17 client, err := NewRedisClient("localhost:6379")
18 if err != nil {
19 panic(err)
20 }
21 defer client.Close()
22
23 // Avvia il tracking degli invalidation message in una goroutine separata
24 go func() {
25 if err := client.Tracking(ctx); err != nil {
26 slog.Error("failed to track invalidation message", "error", err)
27 }
28 }()
29
30 // Configura un ticker per le consultazioni periodiche
31 ticker := time.NewTicker(1 * time.Second)
32 defer ticker.Stop()
33 done := ctx.Done()
34
35 // Ciclo principale di consultazione
36 for {
37 select {
38 case <-done:
39 // Gestisce il segnale di interruzione
40 slog.Info("shutting down")
41 return
42 case <-ticker.C:
43 // Consulta la chiave tramite il client
44 v, err := client.Get("key")
45 if err != nil {
46 slog.Error("failed to get key", "error", err)
47 return
48 }
49 slog.Info("got key", "value", v)
50 }
51 }
52}
Il codice per il test è come sopra. Se lo provate, potrete verificare che il valore viene aggiornato ogni volta che i dati in Redis vengono aggiornati.
Tuttavia, questo è troppo complicato. Soprattutto, per l'espansione in un ambiente cluster, è inevitabile dover abilitare il Tracking su tutti i master o replica.
Rueidis
Per chi usa il linguaggio Go, abbiamo rueidis
, il più moderno e avanzato. Scriveremo il codice che utilizza server assisted client side cache in un ambiente cluster Redis usando rueidis.
Innanzitutto, installate la dipendenza.
github.com/redis/rueidis
Quindi, scrivete il codice che consulta i dati in Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 // Configura il contesto per la gestione dei segnali di interruzione
15 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
16 defer cancel()
17
18 // Crea un nuovo client Rueidis con opzioni per il tracking della cache lato client
19 client, err := rueidis.NewClient(rueidis.ClientOption{
20 InitAddress: []string{"localhost:6379"}, // Indirizzo iniziale di Redis
21 // Abilita il tracking della cache lato client per invalidation message
22 ClientTracking: rueidis.ClientTrackingOption{
23 Prefixes: []string{"key"}, // Traccia solo le chiavi con questo prefisso (opzionale, qui per esempio)
24 },
25 })
26 if err != nil {
27 panic(err)
28 }
29 // Non è necessario chiudere esplicitamente il client rueidis, gestisce la pulizia internamente
30
31 // Configura un ticker per le consultazioni periodiche
32 ticker := time.NewTicker(1 * time.Second)
33 defer ticker.Stop()
34 done := ctx.Done()
35
36 // Ciclo principale di consultazione
37 for {
38 select {
39 case <-done:
40 // Gestisce il segnale di interruzione
41 slog.Info("shutting down")
42 return
43 case <-ticker.C:
44 const key = "key"
45 // Esegue un comando GET con caching lato client abilitato
46 // Cache() abilita il caching lato client per questo comando
47 // 10*time.Second imposta il TTL locale (scadenza forzata se non invalidata prima)
48 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
49 if resp.Error() != nil {
50 slog.Error("failed to get key", "error", resp.Error())
51 continue
52 }
53 // Converte la risposta in int64
54 i, err := resp.AsInt64()
55 if err != nil {
56 slog.Error("failed to convert response to int64", "error", err)
57 continue
58 }
59 // Controlla se è stato un cache hit
60 switch resp.IsCacheHit() {
61 case true:
62 slog.Info("cache hit", "key", key)
63 case false:
64 slog.Info("missed key", "key", key)
65 }
66 slog.Info("got key", "value", i)
67 }
68 }
69}
Con rueidis, per utilizzare client side cache è sufficiente chiamare DoCache
. Questo aggiunge i dati alla cache locale, inclusa la durata di conservazione locale, e chiamando DoCache
di nuovo si consulta il dato dalla cache locale. Ovviamente, gestisce correttamente anche gli invalidation message.
Perché non redis-go?
Purtroppo, redis-go
non supporta ufficialmente server assisted client side cache tramite API. Inoltre, quando si crea un PubSub, crea una nuova connessione e non esiste un'API per accedere direttamente a tale connessione, quindi non è possibile conoscere il client id. Pertanto, abbiamo deciso di saltare redis-go
ritenendo che la sua configurazione fosse impossibile.
È sexy
Attraverso la struttura client side cache
- Se i dati possono essere preparati in anticipo, questa struttura consentirà di fornire sempre i dati più recenti minimizzando le query e il traffico verso Redis.
- Ciò consente di creare una sorta di struttura CQRS, migliorando drasticamente le prestazioni di lettura.
Quanto è diventato più sexy?
Poiché effettivamente questa struttura è in uso sul campo, abbiamo esaminato la latenza di due semplici API. Vi prego di comprendere che posso descriverla solo in modo molto astratto.
- Prima API
- Alla prima consultazione: media 14.63ms
- Alle consultazioni successive: media 2.82ms
- Differenza media: 10.98ms
- Seconda API
- Alla prima consultazione: media 14.05ms
- Alle consultazioni successive: media 1.60ms
- Differenza media: 11.57ms
Si è verificato un miglioramento aggiuntivo della latenza che raggiunge l'82%!
Ci si aspetta che i miglioramenti siano dovuti ai seguenti fattori:
- Eliminazione della comunicazione di rete tra client e Redis e risparmio di traffico.
- Riduzione del numero di read command che Redis stesso deve eseguire.
- Ciò ha anche l'effetto di aumentare le prestazioni di scrittura.
- Minimizzazione del parsing del protocollo Redis.
- Anche il parsing del protocollo Redis ha un costo non nullo. Ridurlo è una grande opportunità.
Tuttavia, tutto è un trade-off. Per questo, abbiamo sacrificato almeno i seguenti due aspetti:
- Necessità di implementare, gestire e mantenere il componente di gestione della cache lato client.
- Aumento dell'utilizzo di CPU e memoria da parte del client dovuto a ciò.
Conclusione
Personalmente, è stato un componente di architettura soddisfacente, e la latenza e lo stress sull'API Server sono stati molto ridotti. Spero di poter continuare a configurare l'architettura con questa struttura in futuro, se possibile.