GoSuda

Förbättra responsiviteten med Redis klient-sidig cache

By snowmerak
views ...

Vad är Redis?

Jag tror att få är obekanta med Redis. Men om vi ändå kortfattat ska nämna några av dess egenskaper, kan det sammanfattas på följande sätt:

  • Operationer utförs i en enda tråd, vilket innebär att alla operationer har atomicitet.
  • Data lagras och bearbetas In-Memory, vilket gör att alla operationer är snabba.
  • Redis kan, beroende på konfiguration, lagra WAL, vilket möjliggör snabb säkerhetskopiering och återställning av det senaste tillståndet.
  • Den stöder olika datatyper som Set, Hash, Bit och List, vilket ger hög produktivitet.
  • Den har en stor community, vilket innebär att man kan dela med sig av olika erfarenheter, problem och lösningar.
  • Den har utvecklats och drivits under lång tid, vilket ger pålitlig stabilitet.

Så till huvudfrågan

Föreställ dig?

Vad händer om er tjänsts cache uppfyller följande två villkor?

  1. När ofta efterfrågad data måste tillhandahållas användaren i senaste tillstånd, men uppdateringar är oregelbundna och cachen därför måste uppdateras ofta.
  2. När data inte uppdateras, men samma cache-data ofta behöver kommas åt och efterfrågas.

I det första fallet kan man tänka sig realtidsrankningar för populära produkter i en webbutik. Om realtidsrankningarna lagras som en sorted set, blir det ineffektivt om användaren läser från Redis varje gång startsidan öppnas. I det andra fallet, när det gäller valutakurser, uppdateras de ungefär var tionde minut, men faktiska förfrågningar sker mycket frekvent. Särskilt för valutakurser som won-dollar, won-yen och won-yuan sker cache-förfrågningar mycket frekvent. I dessa fall skulle det vara en effektiv åtgärd om API-servern hade en separat lokal cache och uppdaterade den genom att fråga Redis igen när data ändrades.

Hur kan man då implementera denna funktion i en databas – Redis – API-server-struktur?

Fungerar inte Redis PubSub?

När du använder cache, prenumerera på en kanal som kan ta emot meddelanden om uppdateringar!

  • Då måste man skapa en logik som skickar meddelanden vid uppdatering.
  • Ytterligare operationer på grund av PubSub påverkar prestandan.

pubsub-write

pubsub-read

Vad händer om Redis detekterar ändringar?

Om man använder Keyspace Notification för att få meddelanden om kommandon för en specifik nyckel?

  • Det finns en besvärlig process att i förväg lagra och dela nycklar och kommandon som används för uppdateringar.
  • Till exempel blir det komplicerat om en enkel Set är uppdateringskommandot för en nyckel, medan LPush, RPush, SAdd eller SRem är uppdateringskommandot för en annan nyckel.
  • Detta ökar avsevärt risken för kommunikationsmissar och mänskliga fel i kodningen under utvecklingsprocessen.

Om man använder Keyevent Notification för att få meddelanden per kommando?

  • Det krävs prenumeration på alla kommandon som används för uppdateringar. Dessutom krävs en lämplig filtrering av de nycklar som kommer in.
  • Till exempel, för vissa nycklar som kommer in via Del, är det troligt att den berörda klienten inte har någon lokal cache.
  • Detta kan leda till onödig resursförbrukning.

Därför behövs Invalidation Message!

Vad är Invalidation Message?

Invalidation Messages är ett koncept som tillhandahålls som en del av Server Assisted Client-Side Cache, tillagt i Redis 6.0. Invalidation Message förmedlas enligt följande flöde:

  1. Anta att ClientB redan har läst nyckeln en gång.
  2. ClientA ställer in nyckeln på nytt.
  3. Redis upptäcker ändringen och publicerar ett Invalidation Message till ClientB för att meddela ClientB att radera cachen.
  4. ClientB tar emot meddelandet och vidtar lämpliga åtgärder.

invalidation-message

Hur man använder det

Grundläggande funktionsstruktur

Klienten som är ansluten till Redis får invalidation messages genom att köra CLIENT TRACKING ON REDIRECT <client-id>. Klienten som ska ta emot meddelanden prenumererar sedan på invalidation messages genom SUBSCRIBE __redis__:invalidate.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementering! Implementering! Implementering!

Redigo + Ristretto

Att bara förklara det så gör det svårt att veta hur man faktiskt använder det i kod. Så låt oss först konfigurera det enkelt med redigo och ristretto.

Installera först de två beroendena.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // antal nycklar att spåra frekvensen av (10M).
22		MaxCost:     1 << 30, // maximal kostnad för cachen (1GB).
23		BufferItems: 64,      // antal nycklar per Get-buffer.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Först skapas en enkel RedisClient som inkluderar ristretto och redigo.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Koden är något komplex.

  • För att utföra Tracking upprättas ytterligare en anslutning. Detta är en åtgärd med hänsyn till att PubSub kan störa andra operationer.
  • Klient-ID för den tillagda anslutningen hämtas, och Tracking på anslutningen som ska fråga data omdirigeras till den anslutningen.
  • Sedan prenumereras på invalidation message.
  • Koden för att hantera prenumerationen är något komplex. Eftersom redigo inte kan parsa ogiltigförklaringsmeddelanden måste svaret tas emot före parsning.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Meddelandet Get frågar först ristretto på följande sätt, och om det inte hittas, hämtas det från Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

Koden för att testa är som ovan. Om du testar den kommer du att kunna se att värdet uppdateras varje gång data uppdateras i Redis.

Men detta är för komplicerat. Framför allt är det nödvändigt att aktivera Tracking för alla masters eller repliker för att kunna skala ut till kluster.

Rueidis

För oss som använder Go-språket finns det rueidis, som är det mest moderna och avancerade. Vi kommer att skriva kod som använder server assisted client side cache i en Redis-klustermiljö med rueidis.

Installera först beroendet.

  • github.com/redis/rueidis

Skriv sedan koden för att fråga data från Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

Rueidis kräver bara DoCache för att använda client side cache. Detta lägger till data i den lokala cachen, inklusive hur länge den ska behållas, och när DoCache anropas igen hämtas data från den lokala cachen. Självklart hanteras även ogiltigförklaringsmeddelanden korrekt.

Varför inte redis-go?

redis-go stöder tyvärr inte server assisted client side cache via det officiella API:et. Dessutom finns det inget API för att direkt komma åt anslutningen när en ny anslutning skapas för PubSub, så klient-ID kan inte heller erhållas. Därför bedömdes redis-go vara omöjligt att konfigurera och har utelämnats.

Sexigt

Genom client side cache-strukturen

  • Om data kan förberedas i förväg kan denna struktur minimera Redis-frågor och trafik samtidigt som den alltid tillhandahåller de senaste uppgifterna.
  • Detta kan skapa en typ av CQRS-struktur för att avsevärt förbättra läsprestanda.

cqrs

Hur mycket sexigare har det blivit?

Eftersom en sådan struktur faktiskt används i verkliga system, undersökte vi enkelt latensen för två API:er. Vänligen förstå att jag bara kan beskriva det mycket abstrakt.

  1. Första API:et
    1. Vid första förfrågan: genomsnitt 14.63ms
    2. Vid efterföljande förfrågan: genomsnitt 2.82ms
    3. Genomsnittlig skillnad: 10.98ms
  2. Andra API:et
    1. Vid första förfrågan: genomsnitt 14.05ms
    2. Vid efterföljande förfrågan: genomsnitt 1.60ms
    3. Genomsnittlig skillnad: 11.57ms

Det fanns en ytterligare latensförbättring på så mycket som cirka 82%!

Jag förväntar mig att följande förbättringar har skett:

  • Uteslutande av nätverkskommunikation mellan klient och Redis och trafikbesparingar.
  • Minskning av antalet läskommandon som Redis själv måste utföra.
    • Detta har också effekten att skrivprestandan förbättras.
  • Minimering av parsning av Redis-protokollet.
    • Parsning av Redis-protokollet är inte kostnadsfritt. Att kunna minska detta är en stor möjlighet.

Men allt är en avvägning. För detta har vi offrat minst följande två saker:

  • Behov av att implementera, driva och underhålla klientens cachehanteringselement.
  • Ökad CPU- och minnesanvändning på klienten som ett resultat.

Slutsats

Personligen var jag nöjd med arkitekturkomponenten, och stressen på latensen och API-servern var mycket låg. Jag planerar att fortsätta att konfigurera arkitekturen med en sådan struktur om möjligt i framtiden.