GoSuda

Forbedring av responsivitet ved bruk av Redis client-side cache

By snowmerak
views ...

Hva er Redis?

Jeg tror det er få som ikke kjenner til Redis. Likevel, for å nevne det kort med noen få egenskaper, kan det oppsummeres som følger:

  • Operasjoner utføres i en enkelttråd, noe som gir alle operasjoner atomisitet.
  • Data lagres og operasjoner utføres In-Memory, noe som gjør alle operasjoner raske.
  • Redis kan lagre WAL avhengig av opsjonene, noe som muliggjør rask sikkerhetskopiering og gjenoppretting av den nyeste tilstanden.
  • Den støtter flere typer som Set, Hash, Bit, List, noe som gir høy produktivitet.
  • Den har et stort fellesskap, noe som gjør det mulig å dele ulike erfaringer, problemstillinger og løsninger.
  • Den har blitt utviklet og driftet over lang tid, noe som gir pålitelig stabilitet.

Så til poenget

Se for deg?

Hva om cache-en for tjenesten deres oppfyller følgende to betingelser?

  1. Når data som ofte etterspørres, må leveres til brukeren i den nyeste tilstanden, men oppdateringen er uregelmessig, slik at cache-en må oppdateres ofte.
  2. Når data ikke oppdateres, men den samme cachedataen må aksesseres og etterspørres ofte.

I det første tilfellet kan man vurdere sanntids popularitetsrangering i en nettbutikk. Hvis sanntids popularitetsrangeringen i en nettbutikk lagres som en sorted set, vil det være ineffektivt å lese fra Redis hver gang en bruker aksesserer hovedsiden. I det andre tilfellet, for valutadata, selv om valutadataen publiseres omtrent hvert 10. minutt, skjer selve etterspørselen svært ofte. Spesielt for KRW-USD, KRW-JPY og KRW-CNY etterspørres cache-en svært hyppig. I slike tilfeller vil det være en mer effektiv operasjon for API serveren å ha en separat lokal cache og oppdatere den ved å etterspørre Redis igjen når data endres.

Så, hvordan kan en slik operasjon implementeres i en database - Redis - API server struktur?

Fungerer ikke Redis PubSub?

Når du bruker cache, abonner på en kanal der du kan motta informasjon om oppdateringer!

  • Da må man lage logikk for å sende meldinger ved oppdatering.
  • Ytterligere operasjoner som følge av PubSub påvirker ytelsen.

pubsub-write

pubsub-read

Hva om Redis oppdager endringer?

Hva om man bruker Keyspace Notification for å motta kommando-varsel for den aktuelle nøkkelen?

  • Det er bryderi med å måtte lagre og dele nøkler og kommandoer som brukes for oppdatering på forhånd.
  • For eksempel blir det komplisert, da en enkel Set er oppdateringskommandoen for noen nøkler, mens LPush, eller RPush, SAdd og SRem er oppdateringskommandoer for andre nøkler.
  • Dette øker betraktelig muligheten for kommunikasjonsfeil og menneskelige feil i kodingen under utviklingsprosessen.

Hva om man bruker Keyevent Notification for å motta varsler på kommandonivå?

  • Abonnement på alle kommandoer som brukes for oppdatering er nødvendig. Deretter kreves passende filtrering av nøklene som kommer inn.
  • For eksempel, for en del av alle nøkler som kommer inn via Del, er det sannsynlig at den aktuelle klienten ikke har en lokal cache.
  • Dette kan føre til unødvendig ressursforbruk.

Derfor trengs Invalidation Message!

Hva er Invalidation Message?

Invalidation Messages er et konsept som tilbys som en del av Server Assisted Client-Side Cache, lagt til fra Redis 6.0. Invalidation Message overføres i følgende flyt:

  1. Anta at ClientB allerede har lest en key én gang.
  2. ClientA setter en ny verdi for den key.
  3. Redis oppdager endringen og publiserer en Invalidation Message til ClientB for å varsle ClientB om å slette cache-en.
  4. ClientB mottar meldingen og iverksetter passende tiltak.

invalidation-message

Hvordan brukes det?

Grunnleggende operasjonsstruktur

Klienten som er koblet til Redis, mottar invalidation message ved å kjøre CLIENT TRACKING ON REDIRECT <client-id>. Klienten som skal motta meldingene, abonnerer deretter for å motta invalidation message ved hjelp av SUBSCRIBE __redis__:invalidate.

default tracking

1# klient 1
2> SET a 100
1# klient 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# klient 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # sporing
1# klient 1
2> SET a 200
1# klient 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# klient 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# klient 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# klient 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# klient 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementering! Implementering! Implementering!

Redigo + Ristretto

Bare en slik forklaring gjør det uklart hvordan det skal brukes i selve koden. La oss derfor først konfigurere det enkelt med redigo og ristretto.

Installer først de to dependencyene.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // antall nøkler å spore frekvensen av (10M).
22		MaxCost:     1 << 30, // maksimal kostnad for cache (1GB).
23		BufferItems: 64,      // antall nøkler per Get-buffer.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Først oppretter vi enkelt en RedisClient som inkluderer ristretto og redigo.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Koden er litt kompleks.

  • For å utføre Tracking opprettes en ekstra tilkobling. Dette er et tiltak for å unngå at PubSub forstyrrer andre operasjoner.
  • Klient-ID-en til den ekstra tilkoblingen hentes, og tilkoblingen som skal hente data, Redirects til denne tilkoblingen for Tracking.
  • Deretter abonneres det på invalidation message.
  • Koden for håndtering av abonnementet er litt kompleks. Siden redigo ikke parser ugyldiggjøringsmeldinger, må responsen behandles før parsing.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Get meldingen sjekker ristretto først, og hvis den ikke finnes, henter den fra Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

Koden for testing er som vist ovenfor. Hvis du tester den, vil du kunne se at verdien oppdateres hver gang dataen oppdateres i Redis.

Men dette er for komplisert. Mer enn noe annet, for å skalere ut til en klynge, er det uunngåelig å måtte aktivere Tracking for alle mastere, eller replikaer.

Rueidis

Så lenge vi bruker Go-språket, har vi den mest moderne og avanserte rueidis. La oss skrive kode som bruker server assisted client side cache i et Redis klynge-miljø ved hjelp av rueidis.

Installer først dependencyen.

  • github.com/redis/rueidis

Deretter skriver vi koden for å hente data fra Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

Med rueidis trenger man bare å bruke DoCache for å bruke client side cache. Dette legger til data i den lokale cache-en, inkludert hvor lenge den skal beholdes, og et kall til DoCache vil hente dataen fra den lokale cache-en. Naturligvis håndteres invalidation messages også korrekt.

Hvorfor ikke redis-go?

redis-go støtter dessverre ikke server assisted client side cache via offisiell API. I tillegg, når man oppretter PubSub, lages en ny tilkobling, og det finnes ingen API for direkte aksess til denne tilkoblingen, så det er umulig å få tak i client id. Derfor ble det vurdert at redis-go ikke kunne konfigureres i det hele tatt, og det ble utelatt.

Sexy

Gjennom client side cache strukturen

  • Hvis data kan forberedes på forhånd, vil denne strukturen kunne levere de nyeste dataene til enhver tid, samtidig som spørringer mot Redis og trafikk minimeres.
  • Dette kan skape en slags CQRS struktur og forbedre leseytelsen dramatisk.

cqrs

Hvor mye mer sexy ble det?

Siden jeg faktisk bruker denne strukturen i praksis, sjekket jeg enkel latency for to APIer. Vær så snill å ha forståelse for at jeg bare kan skrive det svært abstrakt.

  1. Første API
    1. Ved første etterspørsel: gjennomsnitt 14.63ms
    2. Ved påfølgende etterspørsel: gjennomsnitt 2.82ms
    3. Gjennomsnittlig forskjell: 10.98ms
  2. Andre API
    1. Ved første etterspørsel: gjennomsnitt 14.05ms
    2. Ved påfølgende etterspørsel: gjennomsnitt 1.60ms
    3. Gjennomsnittlig forskjell: 11.57ms

Det var en ytterligere latency-forbedring på opptil rundt 82%!

Jeg forventer at følgende forbedringer har skjedd:

  • Eliminering av nettverkskommunikasjonsprosessen mellom klienten og Redis, og trafikkbesparelse.
  • Reduksjon i antall lesekommandoer som Redis selv må utføre.
    • Dette har også en effekt på å forbedre skriveytelsen.
  • Minimalisering av parsing av Redis protokollen.
    • Parsing av Redis protokollen er heller ikke kostnadsfritt. Å kunne redusere dette er en stor mulighet.

Men alt er en trade-off. For dette ofret vi minst følgende to ting:

  • Behov for implementering, drift og vedlikehold av client side cache administrasjonselementer.
  • Økt CPU- og memory-bruk på klienten som følge av dette.

Konklusjon

Personlig var dette en tilfredsstillende arkitekturkomponent, og stresset på latency og API server var svært lavt. Jeg tenker at det ville være flott å konfigurere arkitekturen med en slik struktur også i fremtiden, hvis mulig.