Zlepšenie odozvy pomocou Redis Client-Side Cache
Čo je Redis?
Predpokladám, že len málo ľudí nepozná Redis. Avšak, ak by sme mali stručne spomenúť jeho charakteristiky, dalo by sa to zhrnúť nasledovne:
- Operácie sa vykonávajú v jednom vlákne, čo zaručuje atómovosť všetkých operácií.
- Dáta sú ukladané a spracúvané In-Memory, vďaka čomu sú všetky operácie rýchle.
- Redis dokáže voliteľne ukladať WAL, čo umožňuje rýchle zálohovanie a obnovu najnovšieho stavu.
- Podporuje rôzne typy dát, ako sú Set, Hash, Bit, List, čo zabezpečuje vysokú produktivitu.
- Má veľkú komunitu, ktorá umožňuje zdieľanie rôznych skúseností, problémov a riešení.
- Je dlhodobo vyvíjaný a prevádzkovaný, čo zaručuje spoľahlivú stabilitu.
Takže k podstate
Predstavte si to?
Čo ak by sa cache vašej služby riadila nasledujúcimi dvoma podmienkami?
- Často dopytované dáta musia byť používateľovi poskytnuté v najaktuálnejšom stave, ale aktualizácie sú nepravidelné, čo si vyžaduje časté obnovovanie cache.
- Dáta sa neaktualizujú, ale k rovnakým dátam v cache sa pristupuje a dopytuje sa ich veľmi často.
Prvý prípad by mohol byť zohľadnený pri rebríčku popularity v reálnom čase v nákupnom centre. Ak by bol rebríček popularity v reálnom čase v nákupnom centre uložený ako sorted set, bolo by neefektívne, aby Redis čítal dáta vždy, keď používateľ pristupuje na hlavnú stránku. V druhom prípade, aj keď sú údaje o výmenných kurzoch zverejňované približne každých 10 minút, skutočné dopyty sú veľmi časté. Konkrétne pre kurzy won-dolár, won-jen a won-yuan sa cache dopytuje veľmi často. V takýchto prípadoch by bolo efektívnejšie, aby API server mal vlastnú lokálnu cache a po zmene dát ich aktualizoval opätovným dopytovaním Redis.
Ako by sa teda dalo implementovať takéto správanie v štruktúre databáza – Redis – API server?
Nemôže to vyriešiť Redis PubSub?
Pri použití cache sa prihláste na odber kanála, ktorý prijíma informácie o aktualizácii!
- Potom je potrebné vytvoriť logiku na odosielanie správ pri aktualizácii.
- PubSub zavádza dodatočné operácie, čo ovplyvňuje výkon.
Čo ak Redis deteguje zmeny?
Ak použijete Keyspace Notification na prijímanie upozornení na príkazy pre daný kľúč?
- Existuje nepríjemnosť spojená s predbežným ukladaním a zdieľaním kľúčov a príkazov používaných na aktualizáciu.
- Napríklad, pre niektoré kľúče je aktualizačným príkazom jednoduchý Set, zatiaľ čo pre iné kľúče sa stáva LPush, RPush, SAdd alebo SRem, čo komplikuje situáciu.
- To výrazne zvyšuje pravdepodobnosť komunikačných nedorozumení a ľudských chýb v kódovaní počas vývojového procesu.
Ak použijete Keyevent Notification na prijímanie upozornení na úrovni príkazov?
- Je potrebné sa prihlásiť na odber všetkých príkazov používaných na aktualizáciu. Je potrebná vhodná filtrácia pre kľúče, ktoré prichádzajú.
- Napríklad, pre niektoré kľúče, ktoré prichádzajú cez Del, je vysoká pravdepodobnosť, že daný klient nemá lokálnu cache.
- To môže viesť k zbytočnému plytvaniu zdrojmi.
Preto je potrebná Invalidation Message!
Čo je Invalidation Message?
Invalidation Messages sú koncept, ktorý bol pridaný od Redis 6.0 ako súčasť Server Assisted Client-Side Cache. Invalidation Message sa prenáša nasledujúcim tokom:
- Predpokladajme, že ClientB už raz prečítal kľúč.
- ClientA nastaví tento kľúč na novú hodnotu.
- Redis deteguje zmenu a publikuje Invalidation Message pre ClientB, aby mu oznámil, že má vymazať cache.
- ClientB prijme túto správu a vykoná vhodné opatrenia.
Ako sa to používa?
Základná štruktúra operácie
Klient pripojený k Redis spustí CLIENT TRACKING ON REDIRECT <client-id>
, aby prijímal invalidation message. Klient, ktorý má prijímať správy, sa potom prihlási na odber SUBSCRIBE __redis__:invalidate
, aby prijímal invalidation message.
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Implementácia! Implementácia! Implementácia!
Redigo + Ristretto
Ak by som to vysvetlil len takto, bolo by nejasné, ako to použiť v skutočnom kóde. Takže najprv to jednoducho nakonfigurujeme s redigo
a ristretto
.
Najprv nainštalujte dve závislosti.
github.com/gomodule/redigo
github.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // počet kľúčov na sledovanie frekvencie (10M).
22 MaxCost: 1 << 30, // maximálne náklady na cache (1GB).
23 BufferItems: 64, // počet kľúčov na vyrovnávaciu pamäť Get.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Najprv jednoducho vytvoríme RedisClient
obsahujúci ristretto a redigo.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
Kód je trochu zložitý.
- Pre Tracking sa nadviaže ďalšie pripojenie. Je to opatrenie s ohľadom na to, aby PubSub neprekážal iným operáciám.
- Získa sa ID pridaného pripojenia a Tracking sa presmeruje na toto pripojenie z pripojenia, ktoré má dopytovať dáta.
- A prihlási sa na odber invalidation message.
- Kód pre spracovanie odberu je trochu zložitý. Keďže redigo nevie parsovať invalidation message, je potrebné prijať neparsovanú odpoveď a spracovať ju.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Správa Get
najprv vyhľadá v Ristretto a ak tam nie je, získa ju z Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
Kód na testovanie je uvedený vyššie. Ak ho vyskúšate, uvidíte, že dáta v Redis sa obnovujú vždy, keď sa aktualizujú.
Avšak, toto je príliš zložité. Okrem toho, aby sa rozšírilo na klaster, je nevyhnutné aktivovať Tracking pre všetky master alebo repliky.
Rueidis
Pokiaľ ide o programovací jazyk Go, máme k dispozícii najmodernejší a najrozvinutejší rueidis
. Napíšme kód, ktorý používa server assisted client side cache v prostredí Redis klastra s použitím rueidis.
Najprv nainštalujte závislosť.
github.com/redis/rueidis
Potom napíšte kód na dopytovanie dát z Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
V rueidis, pre použitie client side cache stačí zavolať DoCache
. Potom sa pridá do lokálnej cache, napríklad ako dlho sa má udržiavať v lokálnej cache, a pri opätovnom volaní DoCache
sa dáta získajú z lokálnej cache. Samozrejme, invalidation message sa tiež spracuje správne.
Prečo nie redis-go?
redis-go
, žiaľ, nepodporuje server assisted client side cache prostredníctvom oficiálneho API. Navyše, pri vytváraní PubSub sa vytvára nové pripojenie a neexistuje žiadne API na priamy prístup k tomuto pripojeniu, takže ID klienta nie je známe. Preto sme sa rozhodli redis-go
preskočiť, pretože sme usúdili, že jeho konfigurácia je nemožná.
Sexy
Prostredníctvom štruktúry client side cache
- Ak sú dáta, ktoré možno vopred pripraviť, prostredníctvom tejto štruktúry, bude možné minimalizovať dopyty a prevádzku na Redis a vždy poskytovať najaktuálnejšie dáta.
- Týmto spôsobom možno vytvoriť určitý typ štruktúry CQRS, ktorá exponenciálne zvýši výkon čítania.
O koľko sa to stalo atraktívnejším?
V skutočnosti sa takáto štruktúra používa v teréne, preto som sa pozrel na jednoduchú latenciu pre dve API. Prosím, ospravedlňte, že to môžem opísať len veľmi abstraktne.
- Prvé API
- Počiatočný dopyt: priemerne 14.63ms
- Následné dopyty: priemerne 2.82ms
- Priemerný rozdiel: 10.98ms
- Druhé API
- Počiatočný dopyt: priemerne 14.05ms
- Následné dopyty: priemerne 1.60ms
- Priemerný rozdiel: 11.57ms
Dosiahlo sa dodatočné zlepšenie latencie až o 82%!
Predpokladám, že došlo k nasledujúcim zlepšeniam:
- Vynechanie sieťovej komunikácie medzi klientom a Redis a úspora dátového prenosu.
- Zníženie počtu príkazov na čítanie, ktoré musí Redis vykonať.
- To má tiež vplyv na zvýšenie výkonu zápisu.
- Minimalizácia parsovania protokolu Redis.
- Parsovanie protokolu Redis tiež nie je nulové náklady. Zníženie tohto je veľká príležitosť.
Avšak všetko je kompromis. Na dosiahnutie tohto sme obetovali minimálne nasledujúce dve veci:
- Potreba implementovať, prevádzkovať a udržiavať prvky správy client side cache.
- Zvýšenie využitia CPU a pamäte klienta v dôsledku toho.
Záver
Osobne to bola uspokojivá architektonická súčasť a zaťaženie API servera a latencia boli veľmi nízke. V budúcnosti by som rád, ak je to možné, navrhoval architektúru s takouto štruktúrou.