Zlepšenie reaktivity pomocou klientskej cache Redis
Čo je Redis?
Predpokladám, že len málokto nepozná Redis. Ak by som však mal stručne zhrnúť jeho vlastnosti, dalo by sa to zhrnúť takto:
- Operácie sa vykonávajú v jednom vlákne, čo zaručuje atomicitu všetkých operácií.
- Dáta sa ukladajú a spracovávajú v In-Memory, vďaka čomu sú všetky operácie rýchle.
- Redis dokáže ukladať WAL (Write-Ahead Log) v závislosti od konfigurácie, čo umožňuje rýchle zálohovanie a obnovu najnovšieho stavu.
- Podporuje rôzne typy dát, ako sú Set, Hash, Bit, List, čo vedie k vysokej produktivite.
- Má veľkú komunitu, ktorá umožňuje zdieľanie rôznych skúseností, problémov a riešení.
- Je dlhodobo vyvíjaný a prevádzkovaný, čo zaručuje spoľahlivú stabilitu.
Takže k veci
Predstavte si to
Čo ak by cache vášho servisu spĺňala nasledujúce dve podmienky?
- Je potrebné poskytovať používateľom často vyhľadávané dáta v aktuálnom stave, ale aktualizácia je nepravidelná, čo si vyžaduje časté obnovovanie cache.
- Aktualizácia nie je potrebná, ale je potrebné často pristupovať k rovnakým dátam v cache.
Prvý prípad môže zahŕňať real-time rebríčky popularity v e-shope. Ak sú real-time rebríčky popularity e-shopu uložené ako sorted set, bolo by neefektívne, keby ich Redis načítaval vždy, keď používateľ navštívi hlavnú stránku.
V druhom prípade, aj keď sú údaje o výmenných kurzoch zverejňované približne každých 10 minút, skutočné vyhľadávania sú veľmi časté. Konkrétne, cache sa veľmi často vyhľadáva pre kurzy won-dolár, won-jen a won-yuan.
V takýchto prípadoch by bolo efektívnejšie, keby API server uchovával samostatnú lokálnu cache a po zmene dát ju aktualizoval opätovným dotazovaním Redis.
Ako by sme potom mohli implementovať takéto správanie v architektúre databáza – Redis – API server?
Nepomôže Redis PubSub?
Pri používaní cache si odoberte kanál, ktorý vás bude informovať o aktualizáciách!
- Potom je potrebné vytvoriť logiku na odosielanie správ pri aktualizácii.
- Dodatočné operácie spôsobené PubSub ovplyvňujú výkon.


Čo ak Redis deteguje zmeny?
Použiť Keyspace Notification na prijímanie upozornení na príkazy pre daný kľúč?
- Existuje nepríjemnosť spojená s predbežným ukladaním a zdieľaním kľúčov a príkazov použitých na aktualizáciu.
- Napríklad, pre niektoré kľúče je aktualizačným príkazom jednoduchý Set, zatiaľ čo pre iné je to LPush, RPush, SAdd alebo SRem, čo situáciu komplikuje.
- To výrazne zvyšuje pravdepodobnosť komunikačných chýb a ľudských chýb pri kódovaní počas vývoja.
Použiť Keyevent Notification na prijímanie upozornení na úrovni príkazov?
- Je potrebné odoberať všetky príkazy používané na aktualizáciu. Je potrebná primeraná filtrácia kľúčov, ktoré prichádzajú.
- Napríklad, pre niektoré kľúče prichádzajúce cez Del je pravdepodobné, že príslušný klient nemá lokálnu cache.
- To môže viesť k zbytočnému plytvaniu zdrojmi.
Preto je potrebná Invalidation Message!
Čo je Invalidation Message?
Invalidation Messages sú koncept, ktorý bol predstavený ako súčasť Server Assisted Client-Side Cache, pridanej od verzie Redis 6.0. Invalidation Message sa doručuje nasledujúcim spôsobom:
- Predpokladajme, že ClientB už raz prečítal kľúč.
- ClientA nastaví nový kľúč.
- Redis deteguje zmenu a vydá Invalidation Message pre ClientB, aby mu oznámil, že má vymazať cache.
- ClientB prijme správu a vykoná príslušné opatrenia.

Ako to použiť
Základná štruktúra fungovania
Klient pripojený k Redis spustí CLIENT TRACKING ON REDIRECT <client-id>, aby prijímal invalidation messages. Klient, ktorý má prijímať správy, sa prihlási na odber invalidation messages pomocou SUBSCRIBE __redis__:invalidate.
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Implementácia! Implementácia! Implementácia!
Redigo + Ristretto
Takéto vysvetlenie je príliš vágne na to, aby bolo možné ho použiť v kóde. Preto najprv vytvoríme jednoduchú konfiguráciu s redigo a ristretto.
Najprv nainštalujte tieto dve závislosti.
github.com/gomodule/redigogithub.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // počet kľúčov na sledovanie frekvencie (10M).
22 MaxCost: 1 << 30, // maximálna cena cache (1GB).
23 BufferItems: 64, // počet kľúčov na Get buffer.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Najprv vytvoríme jednoduchý RedisClient, ktorý obsahuje ristretto a redigo.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
Kód je trochu zložitý.
- Na sledovanie sa nadviaže ďalšie pripojenie. Toto je opatrenie, ktoré zohľadňuje možnosť, že PubSub by mohol narušiť iné operácie.
- ID pridaného pripojenia sa získa a pripojenie, ktoré bude vyhľadávať dáta, presmeruje Tracking na toto pripojenie.
- Potom sa prihlási na odber invalidation messages.
- Kód na spracovanie odberu je trochu zložitý. Pretože redigo neparsuje invalidation messages, je potrebné prijať a spracovať odpoveď pred parsovaním.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Správa Get najprv vyhľadá v Ristretto a ak tam nie je, načíta ju z Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
Kód na testovanie je uvedený vyššie. Ak ho vyskúšate, uvidíte, že hodnoty sa aktualizujú zakaždým, keď sa dáta v Redis aktualizujú.
To je však príliš zložité. Okrem iného je nevyhnutné aktivovať Tracking pre všetky master alebo repliky, aby bolo možné rozšíriť klaster.
Rueidis
Pre tých, ktorí používajú Go, máme k dispozícii najmodernejší a najrozvinutejší rueidis. Napíšeme kód, ktorý používa server assisted client side cache v prostredí Redis klastra pomocou rueidis.
Najprv nainštalujte závislosť.
github.com/redis/rueidis
Potom napíšeme kód na vyhľadávanie dát v Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
V rueidis stačí použiť DoCache na použitie client side cache. Potom sa pridá do lokálnej cache (napríklad, ako dlho sa má uchovávať v lokálnej cache) a rovnakým volaním DoCache sa dáta vyhľadajú a získajú z lokálnej cache. Samozrejme, invalidation messages sú tiež spracované správne.
Prečo nie redis-go?
redis-go bohužiaľ nepodporuje server assisted client side cache prostredníctvom oficiálneho API. Navyše, pri vytváraní PubSub sa vytvorí nové pripojenie a neexistuje API na priamy prístup k tomuto pripojeniu, takže nie je možné zistiť client id. Preto sme rozhodli, že redis-go nie je možné konfigurovať a vynechali sme ho.
Je to sexy
Prostredníctvom štruktúry client side cache
- Ak sú dáta vopred pripravené, táto štruktúra by mohla minimalizovať dotazy a prevádzku na Redis a vždy poskytovať najnovšie dáta.
- To by mohlo vytvoriť štruktúru typu CQRS, ktorá by drasticky zvýšila výkon čítania.

O koľko viac sexy?
V praxi, keďže sa takáto štruktúra už používa, sme si prezreli latenciu pre dve API. Prosím, ospravedlňte, že to môžem opísať len veľmi abstraktne.
- Prvé API
- Pri prvom vyhľadávaní: priemerne 14.63 ms
- Pri následnom vyhľadávaní: priemerne 2.82 ms
- Priemerný rozdiel: 10.98 ms
- Druhé API
- Pri prvom vyhľadávaní: priemerne 14.05 ms
- Pri následnom vyhľadávaní: priemerne 1.60 ms
- Priemerný rozdiel: 11.57 ms
Dosiahli sme dodatočné zlepšenie latencie až o 82%!
Očakávame, že došlo k nasledujúcim zlepšeniam:
- Vynechanie sieťovej komunikácie medzi klientom a Redis a úspora prevádzky.
- Zníženie počtu príkazov na čítanie, ktoré musí Redis vykonávať.
- To má tiež za následok zvýšenie výkonu zápisu.
- Minimalizácia parsovania protokolu Redis.
- Parsenie protokolu Redis nie je bez nákladov. Zníženie tohto nákladu je veľká príležitosť.
Avšak všetko je kompromis. Pre dosiahnutie tohto sme obetovali minimálne nasledujúce dve veci:
- Potreba implementácie, prevádzky a údržby prvkov správy client side cache.
- Zvýšenie využitia CPU a pamäte klienta v dôsledku toho.
Záver
Osobne som bol s touto architektonickou súčasťou spokojný a latencia a záťaž na API server boli veľmi nízke. Do budúcnosti by som rád, ak je to možné, architektúru konfiguroval týmto spôsobom.