GoSuda

Îmbunătățirea reactivității cu caching-ul pe partea de client Redis

By snowmerak
views ...

Ce este Redis?

Consider că puțini sunt cei care nu cunosc Redis. Cu toate acestea, dacă ar fi să îl menționez pe scurt prin câteva caracteristici, acestea ar putea fi rezumate astfel:

  • Operațiunile sunt executate într-un singur Thread, conferind tuturor operațiunilor atomicitate.
  • Datele sunt stocate și procesate In-Memory, ceea ce face ca toate operațiunile să fie rapide.
  • Redis poate stoca WAL în funcție de opțiuni, permițând backup și recuperare rapidă a celei mai recente stări.
  • Suportă mai multe tipuri de date, cum ar fi Set, Hash, Bit, List, oferind o productivitate ridicată.
  • Are o comunitate extinsă, permițând partajarea diverselor experiențe, probleme și soluții.
  • Este dezvoltat și operat de mult timp, având o stabilitate de încredere.

Așadar, la subiect

Imaginați-vă?

Ce-ar fi dacă memoria cache a serviciului dumneavoastră ar îndeplini următoarele două condiții?

  1. Atunci când datele accesate frecvent trebuie să fie furnizate utilizatorului în cea mai recentă stare, dar actualizarea este neregulată, necesitând o reîmprospătare frecventă a memoriei cache.
  2. Atunci când datele cache nu sunt actualizate, dar sunt accesate și interogate frecvent.

Primul caz ar putea fi clasamentul de popularitate în timp real al unui magazin online. Dacă clasamentul de popularitate în timp real al unui magazin online este stocat într-un sorted set, ar fi ineficient ca Redis să îl citească de fiecare dată când un utilizator accesează pagina principală. În al doilea caz, în cazul datelor privind cursul de schimb valutar, chiar dacă datele sunt publicate aproximativ la fiecare 10 minute, interogările reale sunt foarte frecvente. În plus, interogarea memoriei cache pentru won-dolar, won-yen și won-yuan are loc foarte frecvent. În aceste cazuri, ar fi o operațiune eficientă ca serverul API să aibă o memorie cache locală separată, iar atunci când datele se modifică, să interogheze din nou Redis pentru a le actualiza.

Așadar, cum am putea implementa o astfel de operațiune în arhitectura Bază de date - Redis - Server API?

Nu se poate face cu Redis PubSub?

Când utilizați memoria cache, abonați-vă la un canal care poate primi notificări de actualizare!

  • Atunci trebuie să creați o logică pentru a trimite mesaje la momentul actualizării.
  • Operațiunea suplimentară cauzată de PubSub afectează performanța.

pubsub-write

pubsub-read

Dar dacă Redis ar detecta modificările?

Ce-ar fi dacă am folosi Keyspace Notification pentru a primi notificări de comandă pentru acea cheie?

  • Există inconvenientul de a trebui să stocați și să partajați în prealabil cheile și comenzile utilizate pentru actualizare.
  • De exemplu, pentru o anumită cheie, un simplu Set este o comandă de actualizare, în timp ce pentru altă cheie, LPush, RPush, SAdd sau SRem devin comenzi de actualizare, ceea ce complică lucrurile.
  • Acest lucru crește semnificativ probabilitatea de erori de comunicare și erori umane în procesul de dezvoltare și codare.

Ce-ar fi dacă am folosi Keyevent Notification pentru a primi notificări la nivel de comandă?

  • Este necesară abonarea la toate comenzile utilizate pentru actualizare. Acolo, este necesară o filtrare adecvată a cheilor primite.
  • De exemplu, este foarte probabil ca unii dintre clienți să nu aibă o memorie cache locală pentru toate cheile primite prin Del.
  • Acest lucru poate duce la o risipă inutilă de resurse.

Așadar, ceea ce este necesar este Invalidation Message!

Ce este un Invalidation Message?

Invalidation Messages este un concept introdus începând cu Redis 6.0, ca parte a Server Assisted Client-Side Cache. Un Invalidation Message este transmis conform următorului flux:

  1. Se presupune că ClientB a citit deja o dată cheia.
  2. ClientA setează o nouă valoare pentru acea cheie.
  3. Redis detectează modificarea și emite un Invalidation Message către ClientB, notificându-l să șteargă memoria cache.
  4. ClientB primește mesajul și ia măsurile adecvate.

invalidation-message

Cum se utilizează

Structura operațională de bază

Clientul conectat la Redis execută CLIENT TRACKING ON REDIRECT <client-id> pentru a primi mesaje de invalidare. Apoi, clientul care trebuie să primească mesajele se abonează la SUBSCRIBE __redis__:invalidate pentru a primi mesajele de invalidare.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementare! Implementare! Implementare!

Redigo + Ristretto

Dacă ar fi descris doar așa, ar fi ambiguu cum să-l folosești în cod. Așadar, să-l configurăm simplu cu redigo și ristretto mai întâi.

Mai întâi, instalați cele două dependențe.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // numărul de chei pentru a urmări frecvența (10M).
22		MaxCost:     1 << 30, // costul maxim al cache-ului (1GB).
23		BufferItems: 64,      // numărul de chei per buffer Get.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Mai întâi, creăm un RedisClient care include simplu ristretto și redigo.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Codul este puțin complex.

  • Pentru Tracking, se stabilește o conexiune suplimentară. Aceasta este o măsură luată având în vedere că PubSub ar putea interfera cu alte operațiuni.
  • Se interoghează ID-ul conexiunii adăugate și se redirecționează Tracking-ul de la conexiunea care va interoga datele către această conexiune.
  • Apoi, se abonează la mesajele de invalidare.
  • Codul care gestionează abonarea este puțin complex. Deoarece redigo nu poate analiza mesajele de invalidare, trebuie să primească și să proceseze răspunsul înainte de analiză.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Mesajul Get interoghează mai întâi ristretto, iar dacă nu găsește, preia valoarea din Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

Codul pentru testare este cel de mai sus. Dacă îl testați, veți observa că valoarea este reîmprospătată de fiecare dată când datele din Redis sunt actualizate.

Dar acest lucru este prea complicat. Mai presus de toate, pentru a extinde la un cluster, este inevitabil să se activeze Tracking-ul pentru toate Master-ele sau Replica-urile.

Rueidis

Pentru cei care folosesc limbajul Go, avem la dispoziție rueidis, cel mai modern și avansat. Voi scrie codul care utilizează Server Assisted Client Side Cache într-un mediu Redis cluster, folosind rueidis.

Mai întâi, instalați dependența.

  • github.com/redis/rueidis

Apoi, scrieți codul pentru a interoga datele din Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

Pentru a utiliza client side cache în rueidis, este suficient să se apeleze DoCache. Aceasta va adăuga datele în memoria cache locală, specificând cât timp să fie reținute, iar apelarea ulterioară a DoCache va prelua datele din memoria cache locală. Desigur, mesajele de invalidare sunt, de asemenea, gestionate corect.

De ce nu redis-go?

Din păcate, redis-go nu suportă Server Assisted Client Side Cache ca API oficial. În plus, atunci când se creează un PubSub, nu există un API care să permită accesul direct la noua conexiune, ceea ce face imposibilă obținerea ID-ului clientului. Prin urmare, am considerat că redis-go nu poate fi configurat și l-am omis.

Sexy!

Prin structura client side cache

  • Dacă datele pot fi pregătite în prealabil, această structură ar permite furnizarea constantă a celor mai recente date, minimizând interogările și traficul către Redis.
  • Prin aceasta, se poate crea o formă de arhitectură CQRS, îmbunătățind semnificativ performanța de citire.

cqrs

Cât de mult mai sexy a devenit?

De fapt, o astfel de structură este deja utilizată în practică, așa că am analizat scurt latența pentru două API-uri. Vă rog să mă scuzați că nu pot oferi decât o descriere foarte abstractă.

  1. Primul API
    1. La prima interogare: în medie 14.63ms
    2. La interogări ulterioare: în medie 2.82ms
    3. Diferența medie: 10.98ms
  2. Al doilea API
    1. La prima interogare: în medie 14.05ms
    2. La interogări ulterioare: în medie 1.60ms
    3. Diferența medie: 11.57ms

A existat o îmbunătățire suplimentară a latenței de până la 82%!

Probabil că au existat următoarele îmbunătățiri:

  • Eliminarea procesului de comunicare în rețea între client și Redis și economisirea traficului.
  • Reducerea numărului de comenzi de citire pe care Redis trebuie să le execute.
    • Acest lucru are, de asemenea, efectul de a crește performanța de scriere.
  • Minimizarea analizei protocolului Redis.
    • Analiza protocolului Redis nu este lipsită de costuri. Reducerea acestuia este o oportunitate semnificativă.

Însă, totul este un compromis. Pentru aceasta, am sacrificat cel puțin următoarele două aspecte:

  • Necesitatea implementării, operării și mentenanței componentelor de gestionare a cache-ului pe partea clientului.
  • Creșterea consumului de CPU și memorie al clientului din această cauză.

Concluzie

Personal, am fost mulțumit de componenta arhitecturală și latența, precum și stresul asupra serverului API, au fost foarte reduse. Pe viitor, dacă este posibil, aș dori să configurez arhitectura într-o astfel de structură.