GoSuda

Îmbunătățirea reactivității cu cache-ul Redis pe partea de client

By snowmerak
views ...

Ce este Redis?

Consider că puțini sunt cei care nu cunosc Redis. Cu toate acestea, dacă ar fi să-l menționăm pe scurt, cu câteva caracteristici, am putea rezuma astfel:

  • Operațiile sunt executate pe un singur thread, ceea ce conferă tuturor operațiilor atomicitate.
  • Datele sunt stocate și procesate In-Memory, ceea ce face ca toate operațiile să fie rapide.
  • Redis poate stoca WAL (Write-Ahead Log) în funcție de opțiuni, permițând backup-ul și recuperarea rapidă a stării curente.
  • Suportă multiple tipuri precum Set, Hash, Bit, List, asigurând o productivitate ridicată.
  • Dispune de o comunitate extinsă, de la care se pot împărtăși diverse experiențe, probleme și soluții.
  • Este dezvoltat și operat de mult timp, oferind o stabilitate fiabilă.

Prin urmare, la subiect

Imaginați-vă!

Ce s-ar întâmpla dacă memoria cache a serviciului dumneavoastră ar îndeplini următoarele două condiții?

  1. Atunci când datele accesate frecvent trebuie să fie furnizate utilizatorului în cea mai recentă stare, dar actualizarea este neregulată, necesitând o reîmprospătare frecventă a memoriei cache.
  2. Atunci când actualizarea nu are loc, dar datele din memoria cache sunt accesate și interogate frecvent.

Primul caz poate fi exemplificat prin clasamentul în timp real al popularității într-un magazin online. Atunci când clasamentul în timp real al popularității este stocat ca un sorted set, ar fi ineficient ca utilizatorul să îl citească din Redis de fiecare dată când accesează pagina principală.
Al doilea caz se referă la datele privind ratele de schimb valutar: chiar dacă acestea sunt actualizate aproximativ la fiecare 10 minute, interogările reale sunt foarte frecvente. În special, pentru perechile won-dolar, won-yen și won-yuan, memoria cache este interogată extrem de des.
În astfel de cazuri, ar fi mai eficient ca serverul API să dețină o memorie cache locală separată și să reîmprospăteze datele prin interogarea Redis de fiecare dată când acestea se modifică.

Atunci, cum am putea implementa o astfel de operațiune într-o structură de tip baza de date - Redis - server API?

Nu se poate rezolva cu Redis PubSub?

Când utilizați memoria cache, abonați-vă la un canal care poate primi notificări de actualizare!

  • Atunci trebuie să creați o logică pentru a trimite mesaje la momentul actualizării.
  • Operațiunile suplimentare cauzate de PubSub afectează performanța.

pubsub-write

pubsub-read

Dar dacă Redis ar detecta modificările?

Ce-ar fi dacă am primi notificări de comandă pentru o anumită cheie folosind Keyspace Notification?

  • Există inconvenientul de a trebui să stocați și să partajați în prealabil cheia și comanda utilizate pentru actualizare.
  • De exemplu, devine complex, deoarece pentru o anumită cheie, un simplu Set este o comandă de actualizare, în timp ce pentru alta, LPush, RPush, SAdd sau SRem sunt comenzi de actualizare.
  • Aceasta crește semnificativ probabilitatea de erori de comunicare și erori umane în codificare în timpul procesului de dezvoltare.

Ce-ar fi dacă am primi notificări la nivel de comandă folosind Keyevent Notification?

  • Este necesară abonarea la toate comenzile utilizate pentru actualizare. Aici, este necesară o filtrare adecvată a cheilor primite.
  • De exemplu, este foarte probabil ca un client să nu aibă o memorie cache locală pentru unele dintre cheile primite prin Del.
  • Acest lucru poate duce la risipă inutilă de resurse.

Prin urmare, Invalidation Message este necesar!

Ce este Invalidation Message?

Invalidation Messages este un concept introdus ca parte a Server Assisted Client-Side Cache, disponibil începând cu Redis 6.0. Invalidation Message este transmis conform următorului flux:

  1. Presupunem că ClientB a citit deja o cheie.
  2. ClientA setează o nouă valoare pentru acea cheie.
  3. Redis detectează modificarea și emite un Invalidation Message către ClientB, notificându-l să șteargă memoria cache.
  4. ClientB primește mesajul și ia măsurile adecvate.

invalidation-message

Cum se utilizează

Structura de funcționare de bază

Un client conectat la Redis execută CLIENT TRACKING ON REDIRECT <client-id> pentru a primi mesaje de invalidare. Clientul care trebuie să primească mesajele se abonează la SUBSCRIBE __redis__:invalidate pentru a primi mesajele de invalidare.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementare! Implementare! Implementare!

Redigo + Ristretto

O astfel de explicație este ambiguă în ceea ce privește utilizarea efectivă în cod. Prin urmare, vom configura mai întâi cu redigo și ristretto.

Mai întâi, instalați cele două dependențe.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // numărul de chei pentru a urmări frecvența (10M).
22		MaxCost:     1 << 30, // costul maxim al cache-ului (1GB).
23		BufferItems: 64,      // numărul de chei per buffer Get.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

În primul rând, se creează un RedisClient simplu care include ristretto și redigo.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Codul este puțin complex.

  • Pentru Tracking, se stabilește o conexiune suplimentară. Această măsură este luată având în vedere că PubSub ar putea interfera cu alte operațiuni.
  • Se interoghează ID-ul conexiunii adăugate, iar conexiunea care va interoga datele este redirecționată către această conexiune pentru Tracking.
  • Apoi, se abonează la mesajele de invalidare.
  • Codul de gestionare a abonamentului este puțin complex. Deoarece redigo nu poate parsa mesajele de invalidare, trebuie să primim și să procesăm răspunsul înainte de parsare.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Mesajul Get interoghează mai întâi ristretto și, dacă nu este găsit, îl preia din Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

Codul pentru testare este cel de mai sus. Dacă îl testați, veți observa că valorile sunt actualizate de fiecare dată când datele din Redis sunt actualizate.

Cu toate acestea, acest lucru este prea complex. Mai presus de toate, pentru a scala la un cluster, este inevitabil să fie nevoie să activați Tracking pentru toate masterele sau replicile.

Rueidis

Pentru cei care utilizează Go, avem rueidis, care este cel mai modern și avansat. Vom scrie codul care utilizează server assisted client side cache într-un mediu de cluster Redis, folosind rueidis.

Mai întâi, instalați dependențele.

  • github.com/redis/rueidis

Apoi, scrieți codul pentru a interoga datele din Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

Cu rueidis, pentru a utiliza client side cache, este suficient să apelați DoCache. Aceasta adaugă datele în cache-ul local și, de fiecare dată când DoCache este apelat, datele sunt interogate din cache-ul local. Desigur, mesajele de invalidare sunt gestionate corect.

De ce nu redis-go?

redis-go, din păcate, nu suportă oficial client side cache asistat de server prin API-ul său. Mai mult, atunci când se creează PubSub, nu există un API care să acceseze direct conexiunea nou creată, deci nu se poate cunoaște ID-ul clientului. Prin urmare, am considerat că redis-go nu poate fi configurat și l-am omis.

Sexy, nu-i așa?

Prin structura client side cache

  • Dacă datele pot fi pregătite în avans, această structură va permite furnizarea constantă a celor mai recente date, minimizând interogările și traficul către Redis.
  • Prin aceasta, se poate crea o structură de tip CQRS, îmbunătățind semnificativ performanța de citire.

cqrs

Cât de mult mai sexy a devenit?

De fapt, această structură este deja utilizată în practică, așa că am examinat pe scurt latența pentru două API-uri. Vă rog să mă scuzați că nu pot oferi decât o descriere foarte abstractă.

  1. Primul API
    1. La prima interogare: în medie 14.63ms
    2. La interogările ulterioare: în medie 2.82ms
    3. Diferența medie: 10.98ms
  2. Al doilea API
    1. La prima interogare: în medie 14.05ms
    2. La interogările ulterioare: în medie 1.60ms
    3. Diferența medie: 11.57ms

A existat o îmbunătățire suplimentară a latenței de până la aproximativ 82%!

Probabil că au existat următoarele îmbunătățiri:

  • Eliminarea procesului de comunicare în rețea între client și Redis și economisirea traficului.
  • Reducerea numărului de comenzi de citire pe care Redis trebuie să le execute.
    • Aceasta are ca efect și creșterea performanței de scriere.
  • Minimizarea parsării protocolului Redis.
    • Parsarea protocolului Redis nu este lipsită de costuri. Reducerea acestuia reprezintă o oportunitate semnificativă.

Cu toate acestea, totul este un compromis. Pentru aceasta, am sacrificat cel puțin următoarele două aspecte:

  • Necesitatea implementării, operării și mentenanței elementelor de gestionare a cache-ului pe partea clientului.
  • Creșterea consumului de CPU și memorie al clientului, ca urmare a acestui fapt.

Concluzie

Personal, a fost o componentă arhitecturală satisfăcătoare, iar latența și stresul asupra serverului API au fost minime. Sper ca pe viitor să pot configura arhitectura în acest mod, dacă este posibil.