GoSuda

Forbedring av responsivitet med Redis Client-Side Caching

By snowmerak
views ...

Hva er Redis?

Jeg tror det er få som ikke kjenner til Redis. Men hvis vi likevel skal nevne noen kjennetegn kort, kan det oppsummeres slik:

  • Operasjoner utføres i én tråd, slik at alle operasjoner har atomisitet.
  • Data lagres og behandles In-Memory, slik at alle operasjoner er raske.
  • Redis kan, avhengig av opsjoner, lagre WAL, noe som muliggjør rask sikkerhetskopiering og gjenoppretting av den nyeste tilstanden.
  • Den støtter flere typer som Set, Hash, Bit, List, noe som gir høy produktivitet.
  • Den har et stort fellesskap, slik at man kan dele diverse erfaringer, problemer og løsninger.
  • Den har vært utviklet og drevet over lang tid, noe som gir pålitelig stabilitet.

Så til hovedsaken

Se for deg dette?

Hva om cache-en til tjenesten deres oppfyller følgende to betingelser?

  1. Når data som ofte spørres om, må leveres til brukeren i sanntid, men oppdateringen er uregelmessig, slik at cache-en må oppdateres hyppig.
  2. Når oppdatering ikke skjer, men det er nødvendig å aksessere og spørre den samme cache-dataen svært hyppig.

Det første tilfellet kan være en sanntids popularitetsrangering i en nettbutikk. Hvis den sanntids popularitetsrangeringen er lagret som et sorted set, er det ineffektivt å lese fra Redis hver gang en bruker aksesserer hovedsiden. Det andre tilfellet gjelder valutakursdata: selv om valutakursdataen kun kunngjøres omtrent hvert 10. minutt, skjer selve spørringen svært hyppig. Spesielt for valutakursene mellom won-dollar, won-yen og won-yuan, spørres cache-en ekstremt hyppig. I slike tilfeller vil det være en mer effektiv operasjon hvis API-serveren har en separat lokal cache, og oppdaterer den ved å spørre Redis på nytt når dataene endres.

Så, hvordan kan man implementere denne operasjonen i en arkitektur som består av database - Redis - API-server?

Er ikke Redis PubSub tilstrekkelig?

Når man bruker cache, abonner på en kanal som kan motta informasjon om oppdateringer!

  • Da må man lage en logikk som sender en melding ved oppdatering.
  • Ytelsen påvirkes av den ekstra operasjonen forårsaket av PubSub.

pubsub-write

pubsub-read

Hva om Redis oppdager endringen?

Hva om man bruker Keyspace Notification for å motta kommando-varsler for den aktuelle nøkkelen?

  • Det er tungvint å måtte lagre og dele nøkkelen og kommandoen som brukes for oppdateringen på forhånd.
  • For eksempel blir det komplisert fordi for noen nøkler er en enkel Set oppdateringskommandoen, mens for andre nøkler er det LPush, eller RPush, eller SAdd og SRem som er oppdateringskommandoen.
  • Dette øker betraktelig risikoen for kommunikasjonsfeil og menneskelige feil i kodingen under utviklingsprosessen.

Hva om man bruker Keyevent Notification for å motta varsler på kommandonivå?

  • Det er nødvendig å abonnere på alle kommandoer som brukes for oppdatering. Det kreves også passende filtrering av nøklene som mottas derfra.
  • For eksempel er det sannsynlig at den aktuelle klienten ikke har en lokal cache for noen av nøklene som kommer inn via Del.
  • Dette kan føre til unødvendig ressursforbruk.

Derfor trengs Invalidation Message!

Hva er Invalidation Message?

Invalidation Messages er et konsept som tilbys som en del av Server Assisted Client-Side Cache, som ble lagt til fra og med Redis 6.0. Invalidation Message overføres i følgende flyt:

  1. Anta at ClientB allerede har lest nøkkelen én gang.
  2. ClientA setter den aktuelle nøkkelen på nytt.
  3. Redis oppdager endringen og publiserer en Invalidation Message til ClientB for å informere ClientB om å slette cache-en.
  4. ClientB mottar meldingen og iverksetter passende tiltak.

invalidation-message

Hvordan brukes det?

Grunnleggende driftsstruktur

Klienten som er koblet til Redis, kjører CLIENT TRACKING ON REDIRECT <client-id> for å motta invalidation message. Klienten som skal motta meldingen, abonnerer på SUBSCRIBE __redis__:invalidate for å motta invalidation message.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementering! Implementering! Implementering!

Redigo + Ristretto

Hvis det bare forklares slik, er det uklart hvordan det faktisk skal brukes i kode. La oss derfor først sette det opp enkelt med redigo og ristretto.

Først installeres de to avhengighetene.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // antall nøkler å spore frekvensen av (10M).
22		MaxCost:     1 << 30, // maksimal kostnad for cache (1GB).
23		BufferItems: 64,      // antall nøkler per Get buffer.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Først opprettes en enkel RedisClient som inneholder ristretto og redigo.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Koden er litt komplisert.

  • En ekstra tilkobling opprettes for Tracking. Dette er et tiltak tatt for å unngå at PubSub forstyrrer andre operasjoner.
  • Client ID-en til den ekstra tilkoblingen hentes, og Trackingen på tilkoblingen som skal hente data, omdirigeres til denne tilkoblingen.
  • Deretter abonneres det på invalidation message.
  • Koden som behandler abonnementet er litt komplisert. Siden redigo ikke kan parse ugyldiggjøringsmeldingen, må responsen mottas og behandles før parsing.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Get-meldingen spør først ristretto, og hvis den ikke finner data der, henter den fra Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

Koden for testing er som vist ovenfor. Hvis man tester den, vil man kunne bekrefte at verdien oppdateres på nytt hver gang dataen i Redis oppdateres.

Men dette er for komplisert. I tillegg er det uunngåelig at Tracking må aktiveres for alle mastere, eller replikaer, for å kunne skalere til en klynge.

Rueidis

For de som bruker Go-språket, har vi den mest moderne og avanserte rueidis. Vi skal skrive kode som bruker server assisted client side cache i et Redis klyngemiljø ved hjelp av rueidis.

Først installeres avhengigheten.

  • github.com/redis/rueidis

Deretter skrives koden for å spørre data fra Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

I rueidis trenger man bare å bruke DoCache for å bruke client side cache. Da legges det til i den lokale cache-en, inkludert hvor lenge den skal beholdes i den lokale cache-en, og ved å kalle DoCache på nytt hentes dataen fra den lokale cache-en. Naturligvis behandles invalidation message også korrekt.

Hvorfor ikke redis-go?

redis-go støtter dessverre ikke server assisted client side cache gjennom sitt offisielle API. I tillegg er det ikke mulig å vite client id-en fordi det ikke finnes et API for direkte tilgang til tilkoblingen når en ny tilkobling opprettes for PubSub. Derfor ble det vurdert at redis-go ikke kunne brukes, og det ble hoppet over.

Sexy!

Gjennom client side cache-strukturen

  • Hvis data kan forberedes på forhånd, kan denne strukturen minimere spørringer og trafikk til Redis, samtidig som den alltid leverer de nyeste dataene.
  • Dette muliggjør opprettelsen av en slags CQRS-struktur, som dramatisk kan øke leseytelsen.

cqrs

Hvor mye mer sexy ble det?

Siden denne strukturen faktisk er i bruk i feltet, har jeg kort sett på latensen for to API-er. Vær snill å unnskylde at jeg bare kan skrive om det på en svært abstrakt måte.

  1. Første API
    1. Første spørring: Gjennomsnittlig 14.63ms
    2. Senere spørringer: Gjennomsnittlig 2.82ms
    3. Gjennomsnittlig forskjell: 10.98ms
  2. Andre API
    1. Første spørring: Gjennomsnittlig 14.05ms
    2. Senere spørringer: Gjennomsnittlig 1.60ms
    3. Gjennomsnittlig forskjell: 11.57ms

Det var en ytterligere latensforbedring på så mye som 82%!

Jeg forventer at følgende forbedringer har funnet sted:

  • Eliminering av nettverkskommunikasjon og trafikkbesparelser mellom klienten og Redis.
  • Reduksjon av antall lesekommandoer som Redis selv må utføre.
    • Dette har også effekten av å øke skriveytelsen.
  • Minimalisering av parsing av Redis-protokollen.
    • Det er ikke null kostnad å parse Redis-protokollen. Å kunne redusere dette er en stor mulighet.

Men alt er en Trade-Off. For dette har vi ofret minst de to følgende tingene:

  • Behov for implementering, drift og vedlikehold av administrasjonselementer for klient-side cache.
  • Økt CPU- og minnebruk på klienten som følge av dette.

Konklusjon

Personlig var jeg fornøyd med arkitekturkomponenten, og latensen og stresset på API-serveren var svært lavt. Jeg tenker at det vil være bra å sette opp arkitekturen med denne strukturen hvis det er mulig i fremtiden.