Förbättra reaktiviteten med Redis klient-sidig cache
What is Redis?
Jag tror att de flesta känner till Redis. Men om jag ändå ska nämna några av dess egenskaper kortfattat, kan de sammanfattas på följande sätt:
- Operationer utförs i en enkel tråd, vilket innebär att alla operationer har atomicitet.
- Data lagras och bearbetas In-Memory, vilket gör alla operationer snabba.
- Redis kan, beroende på inställningar, spara WAL, vilket möjliggör snabb säkerhetskopiering och återställning av det senaste tillståndet.
- Den stöder flera olika typer som Set, Hash, Bit och List, vilket ger hög produktivitet.
- Den har en stor community, vilket gör det möjligt att dela olika erfarenheter, problem och lösningar.
- Den har utvecklats och drivits under lång tid, vilket ger pålitlig stabilitet.
Så till saken
Föreställ dig?
Vad skulle hända om din tjänsts cache uppfyllde följande två villkor?
- Du måste tillhandahålla de senaste data som ofta efterfrågas till användaren, men uppdateringarna är oregelbundna, vilket kräver frekventa cacheuppdateringar.
- Uppdateringar sker inte, men samma cachedata måste ofta nås och efterfrågas.
Det första fallet kan gälla realtidsrankningar för populära produkter i en webbutik. Om realtidsrankningen lagras som en sorted set, skulle det vara ineffektivt för Redis att läsa den varje gång en användare besöker huvudsidan. Det andra fallet gäller valutakursdata; även om valutakursdata publiceras ungefär var tionde minut, sker faktiska förfrågningar mycket ofta. Särskilt för valutor som Won-Dollar, Won-Yen och Won-Yuan efterfrågas cachen mycket frekvent. I dessa fall skulle det vara effektivare för API-servern att ha en separat lokal cache och uppdatera den genom att fråga Redis igen när data ändras.
Hur kan man då implementera denna funktionalitet i en databas - Redis - API-serverstruktur?
Kan det inte göras med Redis PubSub?
När du använder cache, prenumerera på en kanal som kan ta emot uppdateringar!
- Då måste du skapa logik för att skicka meddelanden vid uppdateringar.
- Ytterligare operationer på grund av PubSub påverkar prestandan.


Vad händer om Redis upptäcker ändringar?
Vad händer om man använder Keyspace Notification för att få kommandonotiseringar för en specifik nyckel?
- Det finns ett besvär med att i förväg lagra och dela nycklar och kommandon som används för uppdateringar.
- Till exempel blir det komplicerat när en enkel Set är ett uppdateringskommando för en viss nyckel, medan LPush, RPush, SAdd eller SRem är uppdateringskommandon för andra nycklar.
- Detta ökar drastiskt risken för kommunikationsfel och mänskliga fel i kodningen under utvecklingsprocessen.
Vad händer om man använder Keyevent Notification för att få aviseringar per kommando?
- Det krävs en prenumeration på alla kommandon som används för uppdateringar. Därifrån krävs lämplig filtrering för de nycklar som kommer in.
- Till exempel, för vissa av alla nycklar som kommer in med Del, är det troligt att den berörda klienten inte har en lokal cache.
- Detta kan leda till onödigt resursspill.
Därför behövs Invalidation Message!
Vad är Invalidation Message?
Invalidation Messages är ett koncept som tillhandahålls som en del av Server Assisted Client-Side Cache, som lades till i Redis 6.0. Invalidation Messages levereras i följande flöde:
- Antag att ClientB redan har läst en nyckel en gång.
- ClientA ställer in den nyckeln på nytt.
- Redis detekterar ändringen och publicerar ett Invalidation Message till ClientB för att meddela ClientB att radera cachen.
- ClientB tar emot meddelandet och vidtar lämpliga åtgärder.

Hur man använder det
Grundläggande funktionsstruktur
Klienter som är anslutna till Redis får invalidation messages genom att köra CLIENT TRACKING ON REDIRECT <client-id>. Klienten som ska ta emot meddelandena prenumererar sedan på SUBSCRIBE __redis__:invalidate för att ta emot invalidation messages.
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Implementering! Implementering! Implementering!
Redigo + Ristretto
Enbart en sådan beskrivning gör det svårt att veta hur man faktiskt använder det i kod. Låt oss därför kortfattat konfigurera det med redigo och ristretto först.
Installera först de två beroendena.
github.com/gomodule/redigogithub.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // antal nycklar att spåra frekvensen av (10M).
22 MaxCost: 1 << 30, // maximal kostnad för cache (1GB).
23 BufferItems: 64, // antal nycklar per Get-buffert.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Först skapar vi en enkel RedisClient som inkluderar ristretto och redigo.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
Koden är lite komplex.
- För att utföra spårning upprättas ytterligare en anslutning. Detta är en åtgärd som beaktar att PubSub kan störa andra operationer.
- Klient-ID för den nya anslutningen hämtas, och spårningen omdirigeras till den anslutningen från den anslutning som ska hämta data.
- Därefter prenumereras på invalidation messages.
- Koden för att hantera prenumerationen är något komplex. Eftersom redigo inte kan parsas för ogiltigförklarande meddelanden måste svaret tas emot och bearbetas före parsning.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Get-meddelandet frågar ristretto först, och om det inte finns, hämtas det från Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
Koden för att testa är som ovan. Om du testar den kommer du att kunna se att data uppdateras varje gång data uppdateras i Redis.
Men detta är alltför komplext. Framför allt, för att skala till ett kluster, är det oundvikligt att aktivera spårning för alla masters eller repliker.
Rueidis
För oss som skriver Go-språk har vi den mest moderna och avancerade rueidis. Låt oss skriva kod som använder server assisted client side cache i en Redis-klustermiljö med rueidis.
Installera först beroendet.
github.com/redis/rueidis
Skriv sedan koden för att hämta data från Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
Med rueidis behöver du bara DoCache för att använda client side cache. Då läggs det till i den lokala cachen, inklusive hur länge det ska behållas i den lokala cachen, och när DoCache anropas igen hämtas data från den lokala cachen. Självklart hanteras även invalidation messages korrekt.
Varför inte redis-go?
redis-go stöder tyvärr inte server assisted client side cache via det officiella API:et. Dessutom, när PubSub skapas, skapas en ny anslutning, och det finns inget API för att direkt komma åt den anslutningen, så klient-ID:t kan inte heller hämtas. Därför bedömde jag att redis-go inte kunde konfigureras och valde att hoppa över det.
Sexigt
Genom client side cache-strukturen
- Om data kan förberedas i förväg, kan denna struktur minimera Redis-frågor och trafik samtidigt som den alltid tillhandahåller de senaste data.
- Detta kan skapa en sorts CQRS-struktur för att drastiskt förbättra läsprestanda.

Hur mycket sexigare har det blivit?
Jag har faktiskt använt denna struktur i praktiken och har kortfattat undersökt latensen för två API:er. Jag ber om ursäkt för att jag bara kan beskriva det mycket abstrakt.
- Första API:et
- Vid första förfrågan: genomsnitt 14.63ms
- Vid efterföljande förfrågningar: genomsnitt 2.82ms
- Genomsnittlig skillnad: 10.98ms
- Andra API:et
- Vid första förfrågan: genomsnitt 14.05ms
- Vid efterföljande förfrågningar: genomsnitt 1.60ms
- Genomsnittlig skillnad: 11.57ms
Det fanns en ytterligare latensförbättring på upp till cirka 82%!
Jag förväntar mig att följande förbättringar har skett:
- Eliminering av nätverkskommunikation mellan klient och Redis samt trafikbesparingar.
- Minskning av antalet läskommandon som Redis själv måste utföra.
- Detta har också effekten att skrivprestandan förbättras.
- Minimering av parsning av Redis-protokollet.
- Parsning av Redis-protokollet är inte kostnadsfritt. Att kunna minska detta är en stor möjlighet.
Men allt är en avvägning. För detta har vi offrat minst följande två saker:
- Behovet av implementering, drift och underhåll av klientens cachehanteringselement.
- Ökad CPU- och minnesanvändning på klienten som ett resultat av detta.
Slutsats
Personligen var jag nöjd med arkitekturkomponenten, och latensen samt stressen på API-servern var mycket låg. Jag tror att det vore bra att konfigurera arkitekturen på detta sätt om möjligt i framtiden.