Migliorare la Reattività con la Cache Lato Client di Redis
What is Redis?
Credo che non ci siano molte persone che non conoscano Redis. Tuttavia, se dovessimo menzionare brevemente alcune delle sue caratteristiche, potremmo riassumerle come segue:
- Le operazioni vengono eseguite in un singolo thread, il che conferisce a tutte le operazioni l'atomicità.
- I dati vengono archiviati ed elaborati In-Memory, rendendo tutte le operazioni veloci.
- Redis può archiviare il WAL a seconda delle opzioni, consentendo un rapido backup e ripristino dello stato più recente.
- Supporta diversi tipi come Set, Hash, Bit, List, offrendo un'elevata produttività.
- Possiede una vasta comunità, permettendo di condividere diverse esperienze, problemi e soluzioni.
- È stato sviluppato e gestito per lungo tempo, garantendo un'affidabilità degna di fiducia.
Quindi, veniamo al dunque
Immaginate?
Cosa succederebbe se la cache del vostro servizio soddisfacesse le seguenti due condizioni?
- Quando è necessario fornire agli utenti i dati consultati frequentemente nello stato più recente, ma l'aggiornamento è irregolare e richiede frequenti aggiornamenti della cache.
- Quando l'aggiornamento non è necessario, ma è frequente l'accesso e la consultazione degli stessi dati di cache.
Il primo caso potrebbe riguardare la classifica in tempo reale della popolarità di un centro commerciale. Se la classifica in tempo reale della popolarità del centro commerciale fosse archiviata come un sorted set, sarebbe inefficiente per Redis leggerla ogni volta che un utente accede alla pagina principale. Nel secondo caso, i dati sui tassi di cambio, anche se vengono pubblicati all'incirca ogni 10 minuti, la consultazione effettiva avviene molto frequentemente. Inoltre, per il won-dollaro, il won-yen e il won-yuan, la cache viene consultata molto spesso. In questi casi, sarebbe un'operazione più efficiente se il server API mantenesse una cache separata a livello locale e, quando i dati cambiano, interrogasse nuovamente Redis per l'aggiornamento.
Quindi, come si può implementare questa operazione in una struttura Database - Redis - Server API?
Non è possibile con Redis PubSub?
Quando si utilizza la cache, sottoscriviamo un canale per ricevere la notifica di aggiornamento!
- Allora è necessario creare una logica per inviare un messaggio al momento dell'aggiornamento.
- L'azione aggiuntiva dovuta a PubSub influisce sulle prestazioni.
E se Redis rilevasse la modifica?
Utilizzare la Keyspace Notification per ricevere notifiche di comando per la chiave corrispondente?
- Esiste l'inconveniente di dover archiviare e condividere in anticipo la chiave e il comando utilizzati per l'aggiornamento.
- Ad esempio, per alcune chiavi, un semplice Set è il comando di aggiornamento, mentre per altre chiavi, LPush, RPush, SAdd o SRem diventano il comando di aggiornamento, il che rende il processo più complesso.
- Ciò aumenta notevolmente la possibilità di errori di comunicazione e di errori umani nella codifica durante il processo di sviluppo.
Utilizzare la Keyevent Notification per ricevere notifiche a livello di comando?
- È necessaria la sottoscrizione a tutti i comandi utilizzati per l'aggiornamento. È necessario un filtraggio appropriato sulle chiavi che arrivano.
- Ad esempio, per alcune delle chiavi che arrivano tramite Del, è probabile che il client corrispondente non abbia una cache locale.
- Ciò può portare a uno spreco inutile di risorse.
Quindi, ciò che è necessario è l'Invalidation Message!
Cos'è un Invalidation Message?
Gli Invalidation Messages sono un concetto fornito come parte della Server Assisted Client-Side Cache, aggiunta a partire da Redis 6.0. Un Invalidation Message viene trasmesso nel seguente flusso:
- Si assume che ClientB abbia già letto la chiave una volta.
- ClientA imposta nuovamente la chiave corrispondente.
- Redis rileva la modifica e pubblica un Invalidation Message a ClientB, notificando a ClientB di cancellare la cache.
- ClientB riceve il messaggio ed intraprende l'azione appropriata.
Come si usa
Struttura operativa di base
Il client connesso a Redis esegue CLIENT TRACKING ON REDIRECT <client-id>
per ricevere l'invalidation message. E il client che deve ricevere il messaggio si sottoscrive a SUBSCRIBE __redis__:invalidate
per ricevere l'invalidation message.
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Implementazione! Implementazione! Implementazione!
Redigo + Ristretto
Spiegato solo in questo modo, potrebbe non essere chiaro come utilizzarlo effettivamente nel codice. Quindi, lo configureremo brevemente con redigo
e ristretto
.
Prima di tutto, installiamo le due dipendenze.
github.com/gomodule/redigo
github.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // numero di chiavi di cui tracciare la frequenza (10M).
22 MaxCost: 1 << 30, // costo massimo della cache (1GB).
23 BufferItems: 64, // numero di chiavi per buffer Get.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Per prima cosa, creiamo semplicemente un RedisClient
che include ristretto e redigo.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
Il codice è un po' complesso.
- Per il Tracking, viene stabilita un'ulteriore connessione. Questa è una misura adottata considerando che PubSub potrebbe interferire con altre operazioni.
- Viene interrogato l'ID della connessione aggiunta e la connessione che interroga i dati viene reindirizzata a quella connessione per il Tracking.
- Successivamente, si sottoscrive l'invalidation message.
- Il codice che gestisce la sottoscrizione è un po' complesso. Poiché redigo non supporta il parsing del messaggio di invalidazione, è necessario ricevere e gestire la risposta prima del parsing.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Il messaggio Get
interroga prima ristretto e, se non lo trova, lo recupera da Redis, come segue.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
Il codice per il test è come sopra. Se provate a testarlo, potrete vedere che il valore viene aggiornato ogni volta che i dati in Redis vengono aggiornati.
Tuttavia, questo è troppo complesso. Soprattutto, per espandere il cluster, è inevitabile dover abilitare il Tracking su tutti i master o repliche.
Rueidis
Per chi usa il linguaggio Go, abbiamo rueidis
, il più moderno e avanzato. Scriveremo il codice che utilizza la server assisted client side cache in un ambiente cluster Redis usando rueidis.
Per prima cosa, installiamo la dipendenza.
github.com/redis/rueidis
Quindi scriviamo il codice per interrogare i dati in Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
In rueidis, per utilizzare la client side cache, è sufficiente DoCache
. Questo aggiunge i dati alla cache locale, specificando per quanto tempo mantenerli, e se si chiama DoCache
nuovamente, i dati vengono recuperati dalla cache locale. Naturalmente, gestisce correttamente anche i messaggi di invalidazione.
Perché non redis-go?
Sfortunatamente, redis-go
non supporta la server assisted client side cache tramite API ufficiali. Inoltre, non esiste un'API per accedere direttamente alla nuova connessione creata durante la generazione di PubSub, quindi l'ID del client non può essere conosciuto. Pertanto, si è ritenuto che la configurazione di redis-go
fosse impossibile e l'abbiamo omessa.
È affascinante
Attraverso la struttura della client side cache
- Se i dati possono essere preparati in anticipo, questa struttura può fornire i dati più recenti riducendo al minimo le query e il traffico verso Redis.
- Ciò consente di creare una sorta di struttura CQRS per migliorare drasticamente le prestazioni di lettura.
Quanto è diventato più affascinante?
Poiché questa struttura è effettivamente in uso sul campo, ho esaminato brevemente la latenza per le due API. Si prega di comprendere che posso descriverlo solo in termini molto astratti.
- Prima API
- Prima consultazione: media 14.63ms
- Consultazioni successive: media 2.82ms
- Differenza media: 10.98ms
- Seconda API
- Prima consultazione: media 14.05ms
- Consultazioni successive: media 1.60ms
- Differenza media: 11.57ms
C'è stato un miglioramento della latenza aggiuntiva fino all'82%!
Mi aspetto che i seguenti miglioramenti siano stati raggiunti:
- Eliminazione della comunicazione di rete tra il client e Redis e risparmio di traffico
- Riduzione del numero di comandi di lettura che Redis stesso deve eseguire
- Ciò ha anche l'effetto di aumentare le prestazioni di scrittura.
- Minimizzazione del parsing del protocollo Redis
- Anche il parsing del protocollo Redis ha un costo non nullo. Ridurlo è una grande opportunità.
Tuttavia, ogni cosa è un trade-off. Per questo, abbiamo sacrificato almeno le due seguenti cose:
- Necessità di implementare, gestire e mantenere il componente di gestione della client side cache.
- Aumento dell'utilizzo di CPU e memoria del client a causa di ciò.
Conclusione
Personalmente, è stata una componente architetturale soddisfacente, e lo stress sulla latenza e sul server API è stato molto ridotto. Spero di poter configurare l'architettura con una struttura simile anche in futuro, se possibile.