Forbedring av responsivitet med Redis Client-Side Caching
Hva er Redis?
Jeg tror det er få som ikke kjenner til Redis. Men hvis vi likevel skal nevne noen kjennetegn kort, kan det oppsummeres slik:
- Operasjoner utføres i én tråd, slik at alle operasjoner har atomisitet.
- Data lagres og behandles In-Memory, slik at alle operasjoner er raske.
- Redis kan, avhengig av opsjoner, lagre WAL, noe som muliggjør rask sikkerhetskopiering og gjenoppretting av den nyeste tilstanden.
- Den støtter flere typer som Set, Hash, Bit, List, noe som gir høy produktivitet.
- Den har et stort fellesskap, slik at man kan dele diverse erfaringer, problemer og løsninger.
- Den har vært utviklet og drevet over lang tid, noe som gir pålitelig stabilitet.
Så til hovedsaken
Se for deg dette?
Hva om cache-en til tjenesten deres oppfyller følgende to betingelser?
- Når data som ofte spørres om, må leveres til brukeren i sanntid, men oppdateringen er uregelmessig, slik at cache-en må oppdateres hyppig.
- Når oppdatering ikke skjer, men det er nødvendig å aksessere og spørre den samme cache-dataen svært hyppig.
Det første tilfellet kan være en sanntids popularitetsrangering i en nettbutikk. Hvis den sanntids popularitetsrangeringen er lagret som et sorted set, er det ineffektivt å lese fra Redis hver gang en bruker aksesserer hovedsiden. Det andre tilfellet gjelder valutakursdata: selv om valutakursdataen kun kunngjøres omtrent hvert 10. minutt, skjer selve spørringen svært hyppig. Spesielt for valutakursene mellom won-dollar, won-yen og won-yuan, spørres cache-en ekstremt hyppig. I slike tilfeller vil det være en mer effektiv operasjon hvis API-serveren har en separat lokal cache, og oppdaterer den ved å spørre Redis på nytt når dataene endres.
Så, hvordan kan man implementere denne operasjonen i en arkitektur som består av database - Redis - API-server?
Er ikke Redis PubSub tilstrekkelig?
Når man bruker cache, abonner på en kanal som kan motta informasjon om oppdateringer!
- Da må man lage en logikk som sender en melding ved oppdatering.
- Ytelsen påvirkes av den ekstra operasjonen forårsaket av PubSub.
Hva om Redis oppdager endringen?
Hva om man bruker Keyspace Notification for å motta kommando-varsler for den aktuelle nøkkelen?
- Det er tungvint å måtte lagre og dele nøkkelen og kommandoen som brukes for oppdateringen på forhånd.
- For eksempel blir det komplisert fordi for noen nøkler er en enkel Set oppdateringskommandoen, mens for andre nøkler er det LPush, eller RPush, eller SAdd og SRem som er oppdateringskommandoen.
- Dette øker betraktelig risikoen for kommunikasjonsfeil og menneskelige feil i kodingen under utviklingsprosessen.
Hva om man bruker Keyevent Notification for å motta varsler på kommandonivå?
- Det er nødvendig å abonnere på alle kommandoer som brukes for oppdatering. Det kreves også passende filtrering av nøklene som mottas derfra.
- For eksempel er det sannsynlig at den aktuelle klienten ikke har en lokal cache for noen av nøklene som kommer inn via Del.
- Dette kan føre til unødvendig ressursforbruk.
Derfor trengs Invalidation Message!
Hva er Invalidation Message?
Invalidation Messages er et konsept som tilbys som en del av Server Assisted Client-Side Cache, som ble lagt til fra og med Redis 6.0. Invalidation Message overføres i følgende flyt:
- Anta at ClientB allerede har lest nøkkelen én gang.
- ClientA setter den aktuelle nøkkelen på nytt.
- Redis oppdager endringen og publiserer en Invalidation Message til ClientB for å informere ClientB om å slette cache-en.
- ClientB mottar meldingen og iverksetter passende tiltak.
Hvordan brukes det?
Grunnleggende driftsstruktur
Klienten som er koblet til Redis, kjører CLIENT TRACKING ON REDIRECT <client-id>
for å motta invalidation message. Klienten som skal motta meldingen, abonnerer på SUBSCRIBE __redis__:invalidate
for å motta invalidation message.
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Implementering! Implementering! Implementering!
Redigo + Ristretto
Hvis det bare forklares slik, er det uklart hvordan det faktisk skal brukes i kode. La oss derfor først sette det opp enkelt med redigo
og ristretto
.
Først installeres de to avhengighetene.
github.com/gomodule/redigo
github.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // antall nøkler å spore frekvensen av (10M).
22 MaxCost: 1 << 30, // maksimal kostnad for cache (1GB).
23 BufferItems: 64, // antall nøkler per Get buffer.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Først opprettes en enkel RedisClient
som inneholder ristretto og redigo.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
Koden er litt komplisert.
- En ekstra tilkobling opprettes for Tracking. Dette er et tiltak tatt for å unngå at PubSub forstyrrer andre operasjoner.
- Client ID-en til den ekstra tilkoblingen hentes, og Trackingen på tilkoblingen som skal hente data, omdirigeres til denne tilkoblingen.
- Deretter abonneres det på invalidation message.
- Koden som behandler abonnementet er litt komplisert. Siden redigo ikke kan parse ugyldiggjøringsmeldingen, må responsen mottas og behandles før parsing.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Get
-meldingen spør først ristretto, og hvis den ikke finner data der, henter den fra Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
Koden for testing er som vist ovenfor. Hvis man tester den, vil man kunne bekrefte at verdien oppdateres på nytt hver gang dataen i Redis oppdateres.
Men dette er for komplisert. I tillegg er det uunngåelig at Tracking må aktiveres for alle mastere, eller replikaer, for å kunne skalere til en klynge.
Rueidis
For de som bruker Go-språket, har vi den mest moderne og avanserte rueidis
. Vi skal skrive kode som bruker server assisted client side cache i et Redis klyngemiljø ved hjelp av rueidis.
Først installeres avhengigheten.
github.com/redis/rueidis
Deretter skrives koden for å spørre data fra Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
I rueidis trenger man bare å bruke DoCache
for å bruke client side cache. Da legges det til i den lokale cache-en, inkludert hvor lenge den skal beholdes i den lokale cache-en, og ved å kalle DoCache
på nytt hentes dataen fra den lokale cache-en. Naturligvis behandles invalidation message også korrekt.
Hvorfor ikke redis-go?
redis-go
støtter dessverre ikke server assisted client side cache gjennom sitt offisielle API. I tillegg er det ikke mulig å vite client id-en fordi det ikke finnes et API for direkte tilgang til tilkoblingen når en ny tilkobling opprettes for PubSub. Derfor ble det vurdert at redis-go
ikke kunne brukes, og det ble hoppet over.
Sexy!
Gjennom client side cache-strukturen
- Hvis data kan forberedes på forhånd, kan denne strukturen minimere spørringer og trafikk til Redis, samtidig som den alltid leverer de nyeste dataene.
- Dette muliggjør opprettelsen av en slags CQRS-struktur, som dramatisk kan øke leseytelsen.
Hvor mye mer sexy ble det?
Siden denne strukturen faktisk er i bruk i feltet, har jeg kort sett på latensen for to API-er. Vær snill å unnskylde at jeg bare kan skrive om det på en svært abstrakt måte.
- Første API
- Første spørring: Gjennomsnittlig 14.63ms
- Senere spørringer: Gjennomsnittlig 2.82ms
- Gjennomsnittlig forskjell: 10.98ms
- Andre API
- Første spørring: Gjennomsnittlig 14.05ms
- Senere spørringer: Gjennomsnittlig 1.60ms
- Gjennomsnittlig forskjell: 11.57ms
Det var en ytterligere latensforbedring på så mye som 82%!
Jeg forventer at følgende forbedringer har funnet sted:
- Eliminering av nettverkskommunikasjon og trafikkbesparelser mellom klienten og Redis.
- Reduksjon av antall lesekommandoer som Redis selv må utføre.
- Dette har også effekten av å øke skriveytelsen.
- Minimalisering av parsing av Redis-protokollen.
- Det er ikke null kostnad å parse Redis-protokollen. Å kunne redusere dette er en stor mulighet.
Men alt er en Trade-Off. For dette har vi ofret minst de to følgende tingene:
- Behov for implementering, drift og vedlikehold av administrasjonselementer for klient-side cache.
- Økt CPU- og minnebruk på klienten som følge av dette.
Konklusjon
Personlig var jeg fornøyd med arkitekturkomponenten, og latensen og stresset på API-serveren var svært lavt. Jeg tenker at det vil være bra å sette opp arkitekturen med denne strukturen hvis det er mulig i fremtiden.