GoSuda

Förbättra reaktiviteten med Redis klient-sidig cache

By snowmerak
views ...

What is Redis?

Jag tror att de flesta känner till Redis. Men om jag ändå ska nämna några av dess egenskaper kortfattat, kan de sammanfattas på följande sätt:

  • Operationer utförs i en enkel tråd, vilket innebär att alla operationer har atomicitet.
  • Data lagras och bearbetas In-Memory, vilket gör alla operationer snabba.
  • Redis kan, beroende på inställningar, spara WAL, vilket möjliggör snabb säkerhetskopiering och återställning av det senaste tillståndet.
  • Den stöder flera olika typer som Set, Hash, Bit och List, vilket ger hög produktivitet.
  • Den har en stor community, vilket gör det möjligt att dela olika erfarenheter, problem och lösningar.
  • Den har utvecklats och drivits under lång tid, vilket ger pålitlig stabilitet.

Så till saken

Föreställ dig?

Vad skulle hända om din tjänsts cache uppfyllde följande två villkor?

  1. Du måste tillhandahålla de senaste data som ofta efterfrågas till användaren, men uppdateringarna är oregelbundna, vilket kräver frekventa cacheuppdateringar.
  2. Uppdateringar sker inte, men samma cachedata måste ofta nås och efterfrågas.

Det första fallet kan gälla realtidsrankningar för populära produkter i en webbutik. Om realtidsrankningen lagras som en sorted set, skulle det vara ineffektivt för Redis att läsa den varje gång en användare besöker huvudsidan. Det andra fallet gäller valutakursdata; även om valutakursdata publiceras ungefär var tionde minut, sker faktiska förfrågningar mycket ofta. Särskilt för valutor som Won-Dollar, Won-Yen och Won-Yuan efterfrågas cachen mycket frekvent. I dessa fall skulle det vara effektivare för API-servern att ha en separat lokal cache och uppdatera den genom att fråga Redis igen när data ändras.

Hur kan man då implementera denna funktionalitet i en databas - Redis - API-serverstruktur?

Kan det inte göras med Redis PubSub?

När du använder cache, prenumerera på en kanal som kan ta emot uppdateringar!

  • Då måste du skapa logik för att skicka meddelanden vid uppdateringar.
  • Ytterligare operationer på grund av PubSub påverkar prestandan.

pubsub-write

pubsub-read

Vad händer om Redis upptäcker ändringar?

Vad händer om man använder Keyspace Notification för att få kommandonotiseringar för en specifik nyckel?

  • Det finns ett besvär med att i förväg lagra och dela nycklar och kommandon som används för uppdateringar.
  • Till exempel blir det komplicerat när en enkel Set är ett uppdateringskommando för en viss nyckel, medan LPush, RPush, SAdd eller SRem är uppdateringskommandon för andra nycklar.
  • Detta ökar drastiskt risken för kommunikationsfel och mänskliga fel i kodningen under utvecklingsprocessen.

Vad händer om man använder Keyevent Notification för att få aviseringar per kommando?

  • Det krävs en prenumeration på alla kommandon som används för uppdateringar. Därifrån krävs lämplig filtrering för de nycklar som kommer in.
  • Till exempel, för vissa av alla nycklar som kommer in med Del, är det troligt att den berörda klienten inte har en lokal cache.
  • Detta kan leda till onödigt resursspill.

Därför behövs Invalidation Message!

Vad är Invalidation Message?

Invalidation Messages är ett koncept som tillhandahålls som en del av Server Assisted Client-Side Cache, som lades till i Redis 6.0. Invalidation Messages levereras i följande flöde:

  1. Antag att ClientB redan har läst en nyckel en gång.
  2. ClientA ställer in den nyckeln på nytt.
  3. Redis detekterar ändringen och publicerar ett Invalidation Message till ClientB för att meddela ClientB att radera cachen.
  4. ClientB tar emot meddelandet och vidtar lämpliga åtgärder.

invalidation-message

Hur man använder det

Grundläggande funktionsstruktur

Klienter som är anslutna till Redis får invalidation messages genom att köra CLIENT TRACKING ON REDIRECT <client-id>. Klienten som ska ta emot meddelandena prenumererar sedan på SUBSCRIBE __redis__:invalidate för att ta emot invalidation messages.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementering! Implementering! Implementering!

Redigo + Ristretto

Enbart en sådan beskrivning gör det svårt att veta hur man faktiskt använder det i kod. Låt oss därför kortfattat konfigurera det med redigo och ristretto först.

Installera först de två beroendena.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // antal nycklar att spåra frekvensen av (10M).
22		MaxCost:     1 << 30, // maximal kostnad för cache (1GB).
23		BufferItems: 64,      // antal nycklar per Get-buffert.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Först skapar vi en enkel RedisClient som inkluderar ristretto och redigo.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Koden är lite komplex.

  • För att utföra spårning upprättas ytterligare en anslutning. Detta är en åtgärd som beaktar att PubSub kan störa andra operationer.
  • Klient-ID för den nya anslutningen hämtas, och spårningen omdirigeras till den anslutningen från den anslutning som ska hämta data.
  • Därefter prenumereras på invalidation messages.
  • Koden för att hantera prenumerationen är något komplex. Eftersom redigo inte kan parsas för ogiltigförklarande meddelanden måste svaret tas emot och bearbetas före parsning.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Get-meddelandet frågar ristretto först, och om det inte finns, hämtas det från Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

Koden för att testa är som ovan. Om du testar den kommer du att kunna se att data uppdateras varje gång data uppdateras i Redis.

Men detta är alltför komplext. Framför allt, för att skala till ett kluster, är det oundvikligt att aktivera spårning för alla masters eller repliker.

Rueidis

För oss som skriver Go-språk har vi den mest moderna och avancerade rueidis. Låt oss skriva kod som använder server assisted client side cache i en Redis-klustermiljö med rueidis.

Installera först beroendet.

  • github.com/redis/rueidis

Skriv sedan koden för att hämta data från Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

Med rueidis behöver du bara DoCache för att använda client side cache. Då läggs det till i den lokala cachen, inklusive hur länge det ska behållas i den lokala cachen, och när DoCache anropas igen hämtas data från den lokala cachen. Självklart hanteras även invalidation messages korrekt.

Varför inte redis-go?

redis-go stöder tyvärr inte server assisted client side cache via det officiella API:et. Dessutom, när PubSub skapas, skapas en ny anslutning, och det finns inget API för att direkt komma åt den anslutningen, så klient-ID:t kan inte heller hämtas. Därför bedömde jag att redis-go inte kunde konfigureras och valde att hoppa över det.

Sexigt

Genom client side cache-strukturen

  • Om data kan förberedas i förväg, kan denna struktur minimera Redis-frågor och trafik samtidigt som den alltid tillhandahåller de senaste data.
  • Detta kan skapa en sorts CQRS-struktur för att drastiskt förbättra läsprestanda.

cqrs

Hur mycket sexigare har det blivit?

Jag har faktiskt använt denna struktur i praktiken och har kortfattat undersökt latensen för två API:er. Jag ber om ursäkt för att jag bara kan beskriva det mycket abstrakt.

  1. Första API:et
    1. Vid första förfrågan: genomsnitt 14.63ms
    2. Vid efterföljande förfrågningar: genomsnitt 2.82ms
    3. Genomsnittlig skillnad: 10.98ms
  2. Andra API:et
    1. Vid första förfrågan: genomsnitt 14.05ms
    2. Vid efterföljande förfrågningar: genomsnitt 1.60ms
    3. Genomsnittlig skillnad: 11.57ms

Det fanns en ytterligare latensförbättring på upp till cirka 82%!

Jag förväntar mig att följande förbättringar har skett:

  • Eliminering av nätverkskommunikation mellan klient och Redis samt trafikbesparingar.
  • Minskning av antalet läskommandon som Redis själv måste utföra.
    • Detta har också effekten att skrivprestandan förbättras.
  • Minimering av parsning av Redis-protokollet.
    • Parsning av Redis-protokollet är inte kostnadsfritt. Att kunna minska detta är en stor möjlighet.

Men allt är en avvägning. För detta har vi offrat minst följande två saker:

  • Behovet av implementering, drift och underhåll av klientens cachehanteringselement.
  • Ökad CPU- och minnesanvändning på klienten som ett resultat av detta.

Slutsats

Personligen var jag nöjd med arkitekturkomponenten, och latensen samt stressen på API-servern var mycket låg. Jag tror att det vore bra att konfigurera arkitekturen på detta sätt om möjligt i framtiden.