GoSuda

Migliorare la reattività con Redis Client-Side Cache

By snowmerak
views ...

Che cos'è Redis?

Ritengo che non siano molte le persone che non conoscono Redis. Tuttavia, per menzionarlo brevemente con alcune caratteristiche, possiamo riassumerlo come segue:

  • Le operazioni vengono eseguite in un singolo thread, conferendo atomicità a tutte le operazioni.
  • I dati vengono memorizzati ed elaborati In-Memory, rendendo tutte le operazioni veloci.
  • Redis può memorizzare il WAL a seconda dell'opzione, consentendo un rapido backup e recupero dello stato più recente.
  • Supporta vari tipi come Set, Hash, Bit, List, offrendo un'elevata produttività.
  • Possiede una grande community, consentendo la condivisione di diverse esperienze, problematiche e soluzioni.
  • È stato sviluppato e gestito a lungo, garantendo un'affidabile stabilità.

Quindi, al punto centrale

Immaginate?

Cosa succederebbe se la cache del vostro servizio soddisfacesse le seguenti due condizioni?

  1. Dover fornire agli utenti i dati consultati frequentemente nello stato più recente, ma dover aggiornare frequentemente la cache a causa di aggiornamenti irregolari.
  2. Non dover aggiornare i dati, ma dover accedere frequentemente agli stessi dati in cache per la consultazione.

Nel primo caso, si potrebbe considerare la classifica di popolarità in tempo reale di un negozio online. Quando la classifica di popolarità in tempo reale di un negozio online viene memorizzata come sorted set, leggerla da Redis ogni volta che un utente accede alla pagina principale sarebbe inefficiente. Nel secondo caso, per quanto riguarda i dati sui tassi di cambio, anche se i dati vengono pubblicati approssimativamente ogni 10 minuti, le consultazioni effettive avvengono molto frequentemente. Inoltre, per i tassi Won-Dollaro, Won-Yen e Won-Yuan, la cache viene consultata con altissima frequenza. In questi casi, sarebbe un'operazione più efficiente se l'API Server mantenesse una cache separata localmente e, quando i dati cambiano, consultasse nuovamente Redis per l'aggiornamento.

Allora, come si potrebbe implementare tale comportamento in una struttura Dati - Redis - API Server?

Non è possibile con Redis PubSub?

Quando si utilizza la cache, sottoscrivere un canale da cui ricevere notifiche di aggiornamento!

  • Ciò richiederebbe la creazione di una logica per inviare messaggi al momento dell'aggiornamento.
  • Le operazioni aggiuntive dovute a PubSub influenzerebbero le prestazioni.

pubsub-write

pubsub-read

E se Redis rilevasse il cambiamento?

Utilizzando Keyspace Notification per ricevere notifiche sui command relativi a quella key?

  • Esiste l'inconveniente di dover memorizzare e condividere in anticipo le key e i command utilizzati per l'aggiornamento.
  • Ad esempio, per alcune key, un semplice Set è il command di aggiornamento, mentre per altre key, LPush, RPush, SAdd o SRem diventano il command di aggiornamento, rendendo il tutto complesso.
  • Ciò aumenta significativamente la possibilità di errori di comunicazione durante lo sviluppo e di errori umani nella codifica.

Utilizzando Keyevent Notification per ricevere notifiche a livello di command?

  • È necessaria la sottoscrizione a tutti i command utilizzati per l'aggiornamento. Da ciò, è necessario un filtraggio appropriato delle key in arrivo.
  • Ad esempio, è probabile che un determinato client non disponga della cache locale per alcune key tra tutte quelle ricevute tramite Del.
  • Ciò può portare a uno spreco inutile di risorse.

Quindi, ciò che serve è Invalidation Message!

Che cos'è Invalidation Message?

Invalidation Messages è un concetto introdotto a partire da Redis 6.0 come parte di Server Assisted Client-Side Cache. Un Invalidation Message viene trasmesso secondo il seguente flusso:

  1. Si supponga che ClientB abbia già letto una key una volta.
  2. ClientA imposta quella key con un nuovo valore.
  3. Redis rileva il cambiamento e pubblica un Invalidation Message a ClientB per notificare a ClientB di cancellare la cache.
  4. ClientB riceve il messaggio e intraprende le azioni appropriate.

invalidation-message

Come si usa?

Struttura operativa di base

Un client connesso a Redis riceve invalidation message eseguendo CLIENT TRACKING ON REDIRECT <client-id>. Inoltre, il client che deve ricevere i messaggi si sottoscrive per ricevere invalidation message tramite SUBSCRIBE __redis__:invalidate.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementazione! Implementazione! Implementazione!

Redigo + Ristretto

Spiegandolo solo così, non è chiaro come utilizzarlo effettivamente a livello di codice. Quindi, proviamo innanzitutto a configurarlo semplicemente con redigo e ristretto.

Per prima cosa, installate le due dependency.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	// Configurazione della cache Ristretto
21	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
22		NumCounters: 1e7,     // numero di chiavi di cui tracciare la frequenza (10M).
23		MaxCost:     1 << 30, // costo massimo della cache (1GB).
24		BufferItems: 64,      // numero di chiavi per buffer Get.
25	})
26	if err != nil {
27		return nil, fmt.Errorf("failed to generate cache: %w", err)
28	}
29
30	// Connessione a Redis
31	conn, err := redis.Dial("tcp", addr)
32	if err != nil {
33		return nil, fmt.Errorf("failed to connect to redis: %w", err)
34	}
35
36	return &RedisClient{
37		conn:  conn,
38		cache: cache,
39		addr:  addr,
40	}, nil
41}
42
43func (r *RedisClient) Close() error {
44	// Chiusura della connessione Redis
45	err := r.conn.Close()
46	if err != nil {
47		return fmt.Errorf("failed to close redis connection: %w", err)
48	}
49
50	return nil
51}

Iniziamo creando un semplice RedisClient che include ristretto e redigo.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	// Stabilisce una connessione separata per PubSub
 3	psc, err := redis.Dial("tcp", r.addr)
 4	if err != nil {
 5		return fmt.Errorf("failed to connect to redis: %w", err)
 6	}
 7
 8	// Ottiene l'ID del client per la connessione PubSub
 9	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
10	if err != nil {
11		return fmt.Errorf("failed to get client id: %w", err)
12	}
13	slog.Info("client id", "id", clientId)
14
15	// Abilita il tracking sulla connessione dati, reindirizzando gli invalidation message alla connessione PubSub
16	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
17	if err != nil {
18		return fmt.Errorf("failed to enable tracking: %w", err)
19	}
20	slog.Info("subscription result", "result", subscriptionResult)
21
22	// Si sottoscrive al canale di invalidation message
23	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
24		return fmt.Errorf("failed to subscribe: %w", err)
25	}
26	psc.Flush()
27
28	// Ciclo di ricezione messaggi
29	for {
30		// Riceve messaggi
31		msg, err := psc.Receive()
32		if err != nil {
33			return fmt.Errorf("failed to receive message: %w", err)
34		}
35
36		// Gestisce diversi tipi di messaggi
37		switch msg := msg.(type) {
38		case redis.Message:
39			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
40			// In caso di messaggio singolo di invalidazione (per Redis < 6.2)
41			key := string(msg.Data)
42			r.cache.Del(key)
43		case redis.Subscription:
44			// Messaggio di sottoscrizione
45			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
46		case error:
47			// Gestisce errori
48			return fmt.Errorf("error: %w", msg)
49		case []interface{}:
50			// Gestisce invalidation message in formato array (per Redis >= 6.2)
51			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
52				slog.Warn("unexpected message", "message", msg)
53				continue
54			}
55
56			contents := msg[2].([]interface{})
57			keys := make([]string, len(contents))
58			for i, key := range contents {
59				keys[i] = string(key.([]byte))
60				// Elimina la chiave dalla cache locale
61				r.cache.Del(keys[i])
62			}
63			slog.Info("received invalidation message", "keys", keys)
64		default:
65			// Gestisce messaggi inattesi
66			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
67		}
68	}
69}

Il codice è un po' complesso.

  • Per eseguire il Tracking, si stabilisce un'ulteriore connessione. Questa è una misura presa considerando che PubSub potrebbe interferire con altre operazioni.
  • Si ottiene l'ID della connessione aggiunta, e si reindirizza il Tracking dalla connessione che consulta i dati a questa connessione.
  • E poi ci si sottoscrive agli invalidation message.
  • Il codice che gestisce la sottoscrizione è un po' complesso. Poiché redigo non gestisce il parsing dei messaggi di invalidazione, è necessario ricevere e elaborare la risposta prima del parsing.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	// Cerca nella cache locale
 3	val, found := r.cache.Get(key)
 4	if found {
 5		// Gestisce il tipo di valore
 6		switch v := val.(type) {
 7		case int64:
 8			slog.Info("cache hit", "key", key)
 9			return v, nil
10		default:
11			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
12		}
13	}
14	slog.Info("cache miss", "key", key)
15
16	// Se non trovato nella cache locale, lo ottiene da Redis
17	val, err := redis.Int64(r.conn.Do("GET", key))
18	if err != nil {
19		return nil, fmt.Errorf("failed to get key: %w", err)
20	}
21
22	// Aggiunge il valore alla cache locale con TTL
23	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
24	return val, nil
25}

Il metodo Get come segue consulta prima ristretto e, se non lo trova, lo recupera da Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	// Configura il contesto per la gestione dei segnali di interruzione
13	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
14	defer cancel()
15
16	// Crea un nuovo RedisClient
17	client, err := NewRedisClient("localhost:6379")
18	if err != nil {
19		panic(err)
20	}
21	defer client.Close()
22
23	// Avvia il tracking degli invalidation message in una goroutine separata
24	go func() {
25		if err := client.Tracking(ctx); err != nil {
26			slog.Error("failed to track invalidation message", "error", err)
27		}
28	}()
29
30	// Configura un ticker per le consultazioni periodiche
31	ticker := time.NewTicker(1 * time.Second)
32	defer ticker.Stop()
33	done := ctx.Done()
34
35	// Ciclo principale di consultazione
36	for {
37		select {
38		case <-done:
39			// Gestisce il segnale di interruzione
40			slog.Info("shutting down")
41			return
42		case <-ticker.C:
43			// Consulta la chiave tramite il client
44			v, err := client.Get("key")
45			if err != nil {
46				slog.Error("failed to get key", "error", err)
47				return
48			}
49			slog.Info("got key", "value", v)
50		}
51	}
52}

Il codice per il test è come sopra. Se lo provate, potrete verificare che il valore viene aggiornato ogni volta che i dati in Redis vengono aggiornati.

Tuttavia, questo è troppo complicato. Soprattutto, per l'espansione in un ambiente cluster, è inevitabile dover abilitare il Tracking su tutti i master o replica.

Rueidis

Per chi usa il linguaggio Go, abbiamo rueidis, il più moderno e avanzato. Scriveremo il codice che utilizza server assisted client side cache in un ambiente cluster Redis usando rueidis.

Innanzitutto, installate la dipendenza.

  • github.com/redis/rueidis

Quindi, scrivete il codice che consulta i dati in Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	// Configura il contesto per la gestione dei segnali di interruzione
15	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
16	defer cancel()
17
18	// Crea un nuovo client Rueidis con opzioni per il tracking della cache lato client
19	client, err := rueidis.NewClient(rueidis.ClientOption{
20		InitAddress: []string{"localhost:6379"}, // Indirizzo iniziale di Redis
21		// Abilita il tracking della cache lato client per invalidation message
22		ClientTracking: rueidis.ClientTrackingOption{
23			Prefixes: []string{"key"}, // Traccia solo le chiavi con questo prefisso (opzionale, qui per esempio)
24		},
25	})
26	if err != nil {
27		panic(err)
28	}
29	// Non è necessario chiudere esplicitamente il client rueidis, gestisce la pulizia internamente
30
31	// Configura un ticker per le consultazioni periodiche
32	ticker := time.NewTicker(1 * time.Second)
33	defer ticker.Stop()
34	done := ctx.Done()
35
36	// Ciclo principale di consultazione
37	for {
38		select {
39		case <-done:
40			// Gestisce il segnale di interruzione
41			slog.Info("shutting down")
42			return
43		case <-ticker.C:
44			const key = "key"
45			// Esegue un comando GET con caching lato client abilitato
46			// Cache() abilita il caching lato client per questo comando
47			// 10*time.Second imposta il TTL locale (scadenza forzata se non invalidata prima)
48			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
49			if resp.Error() != nil {
50				slog.Error("failed to get key", "error", resp.Error())
51				continue
52			}
53			// Converte la risposta in int64
54			i, err := resp.AsInt64()
55			if err != nil {
56				slog.Error("failed to convert response to int64", "error", err)
57				continue
58			}
59			// Controlla se è stato un cache hit
60			switch resp.IsCacheHit() {
61			case true:
62				slog.Info("cache hit", "key", key)
63			case false:
64				slog.Info("missed key", "key", key)
65			}
66			slog.Info("got key", "value", i)
67		}
68	}
69}

Con rueidis, per utilizzare client side cache è sufficiente chiamare DoCache. Questo aggiunge i dati alla cache locale, inclusa la durata di conservazione locale, e chiamando DoCache di nuovo si consulta il dato dalla cache locale. Ovviamente, gestisce correttamente anche gli invalidation message.

Perché non redis-go?

Purtroppo, redis-go non supporta ufficialmente server assisted client side cache tramite API. Inoltre, quando si crea un PubSub, crea una nuova connessione e non esiste un'API per accedere direttamente a tale connessione, quindi non è possibile conoscere il client id. Pertanto, abbiamo deciso di saltare redis-go ritenendo che la sua configurazione fosse impossibile.

È sexy

Attraverso la struttura client side cache

  • Se i dati possono essere preparati in anticipo, questa struttura consentirà di fornire sempre i dati più recenti minimizzando le query e il traffico verso Redis.
  • Ciò consente di creare una sorta di struttura CQRS, migliorando drasticamente le prestazioni di lettura.

cqrs

Quanto è diventato più sexy?

Poiché effettivamente questa struttura è in uso sul campo, abbiamo esaminato la latenza di due semplici API. Vi prego di comprendere che posso descriverla solo in modo molto astratto.

  1. Prima API
    1. Alla prima consultazione: media 14.63ms
    2. Alle consultazioni successive: media 2.82ms
    3. Differenza media: 10.98ms
  2. Seconda API
    1. Alla prima consultazione: media 14.05ms
    2. Alle consultazioni successive: media 1.60ms
    3. Differenza media: 11.57ms

Si è verificato un miglioramento aggiuntivo della latenza che raggiunge l'82%!

Ci si aspetta che i miglioramenti siano dovuti ai seguenti fattori:

  • Eliminazione della comunicazione di rete tra client e Redis e risparmio di traffico.
  • Riduzione del numero di read command che Redis stesso deve eseguire.
    • Ciò ha anche l'effetto di aumentare le prestazioni di scrittura.
  • Minimizzazione del parsing del protocollo Redis.
    • Anche il parsing del protocollo Redis ha un costo non nullo. Ridurlo è una grande opportunità.

Tuttavia, tutto è un trade-off. Per questo, abbiamo sacrificato almeno i seguenti due aspetti:

  • Necessità di implementare, gestire e mantenere il componente di gestione della cache lato client.
  • Aumento dell'utilizzo di CPU e memoria da parte del client dovuto a ciò.

Conclusione

Personalmente, è stato un componente di architettura soddisfacente, e la latenza e lo stress sull'API Server sono stati molto ridotti. Spero di poter continuare a configurare l'architettura con questa struttura in futuro, se possibile.