GoSuda

Zlepšenie reaktivity pomocou klientskej cache Redis

By snowmerak
views ...

Čo je Redis?

Predpokladám, že len málokto nepozná Redis. Ak by som však mal stručne zhrnúť jeho vlastnosti, dalo by sa to zhrnúť takto:

  • Operácie sa vykonávajú v jednom vlákne, čo zaručuje atomicitu všetkých operácií.
  • Dáta sa ukladajú a spracovávajú v In-Memory, vďaka čomu sú všetky operácie rýchle.
  • Redis dokáže ukladať WAL (Write-Ahead Log) v závislosti od konfigurácie, čo umožňuje rýchle zálohovanie a obnovu najnovšieho stavu.
  • Podporuje rôzne typy dát, ako sú Set, Hash, Bit, List, čo vedie k vysokej produktivite.
  • veľkú komunitu, ktorá umožňuje zdieľanie rôznych skúseností, problémov a riešení.
  • Je dlhodobo vyvíjaný a prevádzkovaný, čo zaručuje spoľahlivú stabilitu.

Takže k veci

Predstavte si to

Čo ak by cache vášho servisu spĺňala nasledujúce dve podmienky?

  1. Je potrebné poskytovať používateľom často vyhľadávané dáta v aktuálnom stave, ale aktualizácia je nepravidelná, čo si vyžaduje časté obnovovanie cache.
  2. Aktualizácia nie je potrebná, ale je potrebné často pristupovať k rovnakým dátam v cache.

Prvý prípad môže zahŕňať real-time rebríčky popularity v e-shope. Ak sú real-time rebríčky popularity e-shopu uložené ako sorted set, bolo by neefektívne, keby ich Redis načítaval vždy, keď používateľ navštívi hlavnú stránku.
V druhom prípade, aj keď sú údaje o výmenných kurzoch zverejňované približne každých 10 minút, skutočné vyhľadávania sú veľmi časté. Konkrétne, cache sa veľmi často vyhľadáva pre kurzy won-dolár, won-jen a won-yuan.
V takýchto prípadoch by bolo efektívnejšie, keby API server uchovával samostatnú lokálnu cache a po zmene dát ju aktualizoval opätovným dotazovaním Redis.

Ako by sme potom mohli implementovať takéto správanie v architektúre databáza – Redis – API server?

Nepomôže Redis PubSub?

Pri používaní cache si odoberte kanál, ktorý vás bude informovať o aktualizáciách!

  • Potom je potrebné vytvoriť logiku na odosielanie správ pri aktualizácii.
  • Dodatočné operácie spôsobené PubSub ovplyvňujú výkon.

pubsub-write

pubsub-read

Čo ak Redis deteguje zmeny?

Použiť Keyspace Notification na prijímanie upozornení na príkazy pre daný kľúč?

  • Existuje nepríjemnosť spojená s predbežným ukladaním a zdieľaním kľúčov a príkazov použitých na aktualizáciu.
  • Napríklad, pre niektoré kľúče je aktualizačným príkazom jednoduchý Set, zatiaľ čo pre iné je to LPush, RPush, SAdd alebo SRem, čo situáciu komplikuje.
  • To výrazne zvyšuje pravdepodobnosť komunikačných chýb a ľudských chýb pri kódovaní počas vývoja.

Použiť Keyevent Notification na prijímanie upozornení na úrovni príkazov?

  • Je potrebné odoberať všetky príkazy používané na aktualizáciu. Je potrebná primeraná filtrácia kľúčov, ktoré prichádzajú.
  • Napríklad, pre niektoré kľúče prichádzajúce cez Del je pravdepodobné, že príslušný klient nemá lokálnu cache.
  • To môže viesť k zbytočnému plytvaniu zdrojmi.

Preto je potrebná Invalidation Message!

Čo je Invalidation Message?

Invalidation Messages sú koncept, ktorý bol predstavený ako súčasť Server Assisted Client-Side Cache, pridanej od verzie Redis 6.0. Invalidation Message sa doručuje nasledujúcim spôsobom:

  1. Predpokladajme, že ClientB už raz prečítal kľúč.
  2. ClientA nastaví nový kľúč.
  3. Redis deteguje zmenu a vydá Invalidation Message pre ClientB, aby mu oznámil, že má vymazať cache.
  4. ClientB prijme správu a vykoná príslušné opatrenia.

invalidation-message

Ako to použiť

Základná štruktúra fungovania

Klient pripojený k Redis spustí CLIENT TRACKING ON REDIRECT <client-id>, aby prijímal invalidation messages. Klient, ktorý má prijímať správy, sa prihlási na odber invalidation messages pomocou SUBSCRIBE __redis__:invalidate.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementácia! Implementácia! Implementácia!

Redigo + Ristretto

Takéto vysvetlenie je príliš vágne na to, aby bolo možné ho použiť v kóde. Preto najprv vytvoríme jednoduchú konfiguráciu s redigo a ristretto.

Najprv nainštalujte tieto dve závislosti.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // počet kľúčov na sledovanie frekvencie (10M).
22		MaxCost:     1 << 30, // maximálna cena cache (1GB).
23		BufferItems: 64,      // počet kľúčov na Get buffer.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Najprv vytvoríme jednoduchý RedisClient, ktorý obsahuje ristretto a redigo.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Kód je trochu zložitý.

  • Na sledovanie sa nadviaže ďalšie pripojenie. Toto je opatrenie, ktoré zohľadňuje možnosť, že PubSub by mohol narušiť iné operácie.
  • ID pridaného pripojenia sa získa a pripojenie, ktoré bude vyhľadávať dáta, presmeruje Tracking na toto pripojenie.
  • Potom sa prihlási na odber invalidation messages.
  • Kód na spracovanie odberu je trochu zložitý. Pretože redigo neparsuje invalidation messages, je potrebné prijať a spracovať odpoveď pred parsovaním.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Správa Get najprv vyhľadá v Ristretto a ak tam nie je, načíta ju z Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

Kód na testovanie je uvedený vyššie. Ak ho vyskúšate, uvidíte, že hodnoty sa aktualizujú zakaždým, keď sa dáta v Redis aktualizujú.

To je však príliš zložité. Okrem iného je nevyhnutné aktivovať Tracking pre všetky master alebo repliky, aby bolo možné rozšíriť klaster.

Rueidis

Pre tých, ktorí používajú Go, máme k dispozícii najmodernejší a najrozvinutejší rueidis. Napíšeme kód, ktorý používa server assisted client side cache v prostredí Redis klastra pomocou rueidis.

Najprv nainštalujte závislosť.

  • github.com/redis/rueidis

Potom napíšeme kód na vyhľadávanie dát v Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

V rueidis stačí použiť DoCache na použitie client side cache. Potom sa pridá do lokálnej cache (napríklad, ako dlho sa má uchovávať v lokálnej cache) a rovnakým volaním DoCache sa dáta vyhľadajú a získajú z lokálnej cache. Samozrejme, invalidation messages sú tiež spracované správne.

Prečo nie redis-go?

redis-go bohužiaľ nepodporuje server assisted client side cache prostredníctvom oficiálneho API. Navyše, pri vytváraní PubSub sa vytvorí nové pripojenie a neexistuje API na priamy prístup k tomuto pripojeniu, takže nie je možné zistiť client id. Preto sme rozhodli, že redis-go nie je možné konfigurovať a vynechali sme ho.

Je to sexy

Prostredníctvom štruktúry client side cache

  • Ak sú dáta vopred pripravené, táto štruktúra by mohla minimalizovať dotazy a prevádzku na Redis a vždy poskytovať najnovšie dáta.
  • To by mohlo vytvoriť štruktúru typu CQRS, ktorá by drasticky zvýšila výkon čítania.

cqrs

O koľko viac sexy?

V praxi, keďže sa takáto štruktúra už používa, sme si prezreli latenciu pre dve API. Prosím, ospravedlňte, že to môžem opísať len veľmi abstraktne.

  1. Prvé API
    1. Pri prvom vyhľadávaní: priemerne 14.63 ms
    2. Pri následnom vyhľadávaní: priemerne 2.82 ms
    3. Priemerný rozdiel: 10.98 ms
  2. Druhé API
    1. Pri prvom vyhľadávaní: priemerne 14.05 ms
    2. Pri následnom vyhľadávaní: priemerne 1.60 ms
    3. Priemerný rozdiel: 11.57 ms

Dosiahli sme dodatočné zlepšenie latencie až o 82%!

Očakávame, že došlo k nasledujúcim zlepšeniam:

  • Vynechanie sieťovej komunikácie medzi klientom a Redis a úspora prevádzky.
  • Zníženie počtu príkazov na čítanie, ktoré musí Redis vykonávať.
    • To má tiež za následok zvýšenie výkonu zápisu.
  • Minimalizácia parsovania protokolu Redis.
    • Parsenie protokolu Redis nie je bez nákladov. Zníženie tohto nákladu je veľká príležitosť.

Avšak všetko je kompromis. Pre dosiahnutie tohto sme obetovali minimálne nasledujúce dve veci:

  • Potreba implementácie, prevádzky a údržby prvkov správy client side cache.
  • Zvýšenie využitia CPU a pamäte klienta v dôsledku toho.

Záver

Osobne som bol s touto architektonickou súčasťou spokojný a latencia a záťaž na API server boli veľmi nízke. Do budúcnosti by som rád, ak je to možné, architektúru konfiguroval týmto spôsobom.