Forbedring af reaktionsevne med Redis Client-Side Caching
Hvad er Redis?
Jeg tror ikke, der er mange, der ikke kender Redis. Men hvis jeg kort skal nævne nogle karakteristika, kan det opsummeres som følger:
- Operationer udføres i en enkelt tråd, hvilket betyder, at alle operationer er atomare.
- Data gemmes og behandles i In-Memory, hvilket gør alle operationer hurtige.
- Redis kan, afhængigt af indstillingerne, gemme en WAL, hvilket muliggør hurtig sikkerhedskopiering og gendannelse af den seneste tilstand.
- Understøtter forskellige typer som Set, Hash, Bit og List, hvilket giver høj produktivitet.
- Har et stort fællesskab, hvor man kan dele forskellige erfaringer, problemer og løsninger.
- Har været under udvikling og i drift i lang tid, hvilket giver pålidelig stabilitet.
Til sagen
Forestil dig?
Hvad nu hvis din tjenestes cache opfylder følgende to betingelser?
- Du skal levere de hyppigt forespurgte data i den seneste tilstand til brugeren, men opdateringen er uregelmæssig, hvilket kræver hyppig cache-opdatering.
- Opdateringer sker ikke, men du skal ofte tilgå og forespørge de samme cache-data.
Det første tilfælde kan være en overvejelse af en online butiks realtids popularitetsrangering. Hvis realtids popularitetsrangeringen gemmes som et sorted set, er det ineffektivt at læse fra Redis, hver gang en bruger tilgår hovedsiden. I det andet tilfælde, for valutakursdata, selvom valutakursdata offentliggøres cirka hvert 10. minut, sker de faktiske forespørgsler meget hyppigt. Og for won-dollar, won-yen og won-yuan sker cache-forespørgslerne meget hyppigt. I disse tilfælde ville det være en effektiv handling, hvis API-serveren havde en separat lokal cache og opdaterede den ved at forespørge Redis igen, når dataene ændrede sig.
Hvordan kan man så implementere denne handling i en database - Redis - API-serverstruktur?
Kan det ikke gøres med Redis PubSub?
Når du bruger cache, abonner på en kanal, der kan modtage opdateringer!
- Så skal du oprette logik, der sender en meddelelse ved opdatering.
- Ydeevnen påvirkes, da yderligere handlinger på grund af PubSub er inkluderet.
Hvad nu hvis Redis registrerer ændringer?
Hvad nu hvis man bruger Keyspace Notification til at modtage kommandoalarmer for den pågældende nøgle?
- Der er besværet med at skulle gemme og dele de nøgler og kommandoer, der bruges til opdatering, på forhånd.
- For eksempel bliver det komplekst, da en simpel Set er en opdateringskommando for nogle nøgler, mens LPush, RPush, SAdd eller SRem er opdateringskommandoer for andre nøgler.
- Dette øger sandsynligheden for kommunikationsfejl og menneskelige fejl i koden under udviklingsprocessen betydeligt.
Hvad nu hvis man bruger Keyevent Notification til at modtage alarmer pr. kommando?
- Abonnement på alle kommandoer, der bruges til opdatering, er nødvendigt. Der kræves passende filtrering af de nøgler, der kommer ind derfra.
- For eksempel er det sandsynligt, at den pågældende klient ikke har en lokal cache for nogle af alle de nøgler, der kommer ind via Del.
- Dette kan føre til unødvendigt ressourcetab.
Derfor er Invalidation Message nødvendig!
Hvad er Invalidation Message?
Invalidation Messages er et koncept, der er tilføjet fra Redis 6.0 som en del af Server Assisted Client-Side Cache. Invalidation Message transmitteres i følgende flow:
- Antag, at ClientB allerede har læst nøglen én gang.
- ClientA indstiller den pågældende nøgle på ny.
- Redis registrerer ændringen og udsteder en Invalidation Message til ClientB for at meddele ClientB om at slette cachen.
- ClientB modtager meddelelsen og træffer passende foranstaltninger.
Hvordan bruges det?
Grundlæggende funktionsstruktur
En klient, der er forbundet til Redis, modtager invalidation messages ved at udføre CLIENT TRACKING ON REDIRECT <client-id>
. Og den klient, der skal modtage meddelelserne, abonnerer på SUBSCRIBE __redis__:invalidate
for at modtage invalidation messages.
standard tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Implementering! Implementering! Implementering!
Redigo + Ristretto
Hvis det kun forklares sådan, er det uklart, hvordan man skal bruge det i faktisk kode. Så lad os først konfigurere det simpelt med redigo
og ristretto
.
Først installeres de to dependencies.
github.com/gomodule/redigo
github.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // Antal nøgler at spore frekvens for (10M).
22 MaxCost: 1 << 30, // Maksimal omkostning for cache (1GB).
23 BufferItems: 64, // Antal nøgler per Get buffer.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Først oprettes en simpel RedisClient
, der indeholder ristretto og redigo.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
Koden er lidt kompleks.
- For at udføre Tracking oprettes en yderligere forbindelse. Dette er en foranstaltning, der tager højde for, at PubSub kan forstyrre andre operationer.
- Klient-ID'et for den tilføjede forbindelse forespørges, og Tracking omdirigeres til den forbindelse fra den forbindelse, der skal forespørge data.
- Og invalidation message abonneres på.
- Koden til håndtering af abonnementet er lidt kompleks. Da redigo ikke kan parse ugyldiggørelsesmeddelelser, skal den modtage og behandle svaret før parsing.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Get
-meddelelsen forespørger først ristretto, og hvis den ikke findes, hentes den fra Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
Koden til test er som ovenfor. Hvis du tester den, vil du se, at Redis opdaterer værdien, hver gang dataene opdateres.
Men dette er for komplekst. Frem for alt er det uundgåeligt at aktivere Tracking for alle mastere eller replikaer for at skalere til klyngen.
Rueidis
For Go-sprogbrugere har vi den mest moderne og avancerede rueidis
. Lad os skrive kode, der bruger server assisted client side cache i et Redis-klyngemiljø med rueidis.
Først installeres afhængigheden.
github.com/redis/rueidis
Og koden til at forespørge data i Redis skrives.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
Med rueidis skal du blot bruge DoCache
for at bruge client side cache. Derefter tilføjes den til den lokale cache, inklusive hvor længe den skal opbevares i den lokale cache, og ved at kalde DoCache
igen hentes dataene fra den lokale cache. Selvfølgelig behandles ugyldiggørelsesmeddelelser også korrekt.
Hvorfor ikke redis-go?
redis-go
understøtter desværre ikke server assisted client side cache via en officiel API. Desuden, når PubSub oprettes, oprettes en ny forbindelse, og der er ingen API, der direkte tilgår den forbindelse, så klient-ID'et kan ikke kendes. Derfor blev redis-go
vurderet som umuligt at konfigurere og blev udeladt.
Sexet
Gennem client side cache-strukturen
- Hvis data kan forberedes på forhånd, kan denne struktur minimere forespørgsler og trafik til Redis og altid levere de seneste data.
- Dette kan skabe en slags CQRS-struktur, der dramatisk forbedrer læseydelsen.
Hvor meget mere sexet er det blevet?
Faktisk bruges en sådan struktur i praksis, så jeg har undersøgt den simple latency for to API'er. Jeg beder om forståelse for, at jeg kun kan skrive meget abstrakt.
- Første API
- Ved første forespørgsel: gennemsnitligt 14.63ms
- Ved efterfølgende forespørgsler: gennemsnitligt 2.82ms
- Gennemsnitlig forskel: 10.98ms
- Anden API
- Ved første forespørgsel: gennemsnitligt 14.05ms
- Ved efterfølgende forespørgsler: gennemsnitligt 1.60ms
- Gennemsnitlig forskel: 11.57ms
Der var en yderligere latency-forbedring på op til 82%!
Jeg forventer, at følgende forbedringer har fundet sted:
- Spring over netværkskommunikation mellem klient og Redis og spar trafik
- Reducer antallet af læsekommandoer, som Redis selv skal udføre
- Dette forbedrer også skriveydelsen.
- Minimer parsing af Redis-protokollen
- Parsing af Redis-protokollen er ikke omkostningsfrit. At reducere dette er en stor mulighed.
Men alt er en afvejning. For at opnå dette har vi ofret mindst de to følgende ting:
- Implementering, drift og vedligeholdelse af klient-side cache management-elementer er nødvendig
- Øget CPU- og hukommelsesforbrug på klienten som følge heraf
Konklusion
Personligt var det en tilfredsstillende arkitekturkomponent, og latency samt stress på API-serveren var meget lav. Jeg mener, at det ville være godt at bygge arkitektur med en sådan struktur i fremtiden, hvis det er muligt.