GoSuda

Forbedring af reaktionsevnen med Redis Client-Side Caching

By snowmerak
views ...

Hvad er Redis?

Jeg tror, de fleste kender Redis. Men for dem, der ikke gør, kan jeg kort opsummere nogle af dens karakteristika:

  • Operationer udføres i en enkelt tråd, hvilket giver alle operationer atomicitet.
  • Data gemmes og behandles In-Memory, hvilket gør alle operationer hurtige.
  • Redis kan, afhængigt af indstillingerne, gemme WAL, hvilket muliggør hurtig sikkerhedskopiering og genoprettelse af den seneste tilstand.
  • Den understøtter flere typer som Set, Hash, Bit, List, hvilket giver høj produktivitet.
  • Den har et stort fællesskab, hvor man kan dele diverse erfaringer, problemer og løsninger.
  • Den er udviklet og drevet i lang tid, hvilket giver pålidelig stabilitet.

Til sagen

Forestil dig?

Hvad nu hvis din services cache opfylder følgende to betingelser?

  1. Du skal give brugerne de seneste data, der ofte forespørges, men opdateringer er uregelmæssige, hvilket kræver hyppige cacheopdateringer.
  2. Dataene opdateres ikke, men de samme cachedata skal tilgås og forespørges ofte.

Det første tilfælde kan være en realtids-popularitetsrangering i en online butik. Hvis realtids-popularitetsrangeringen gemmes som et sorted set, ville det være ineffektivt for Redis at læse den, hver gang en bruger tilgår hovedsiden. I det andet tilfælde, for valutakursdata, selvom valutakursdataene annonceres cirka hvert 10. minut, sker de faktiske forespørgsler meget hyppigt. Især for Won-Dollar, Won-Yen og Won-Yuan forespørges cachen meget ofte. I disse tilfælde ville det være en effektiv handling, hvis API-serveren havde en separat lokal cache og derefter forespurgte Redis igen for at opdatere dataene, når de ændres.

Hvordan kan man så implementere denne adfærd i en database - Redis - API-serverstruktur?

Kan det ikke gøres med Redis PubSub?

Når du bruger cache, skal du abonnere på en kanal, der kan modtage opdateringsstatus!

  • Så skal du oprette en logik, der sender en besked ved opdatering.
  • Ydeevnen påvirkes, da der tilføjes yderligere handlinger på grund af PubSub.

pubsub-write

pubsub-read

Hvad hvis Redis registrerer ændringer?

Hvad hvis man bruger Keyspace Notification til at modtage kommando-meddelelser for den pågældende nøgle?

  • Der er den ulempe, at nøgler og kommandoer, der bruges til opdatering, skal gemmes og deles på forhånd.
  • For eksempel bliver det kompliceret, da en simpel Set kan være en opdateringskommando for en nøgle, mens LPush, RPush, SAdd eller SRem kan være opdateringskommandoer for andre nøgler.
  • Dette øger markant sandsynligheden for kommunikationsfejl og menneskelige fejl i kode under udviklingsprocessen.

Hvad hvis man bruger Keyevent Notification til at modtage notifikationer pr. kommando?

  • Det kræver abonnement på alle kommandoer, der bruges til opdatering. Der kræves passende filtrering for de nøgler, der kommer ind derfra.
  • For eksempel er det sandsynligt, at klienten ikke har en lokal cache for nogle af de nøgler, der kommer ind via Del.
  • Dette kan føre til unødvendigt ressourcespild.

Derfor er Invalidation Message nødvendig!

Hvad er Invalidation Message?

Invalidation Messages er et koncept, der er introduceret som en del af Server Assisted Client-Side Cache, tilføjet siden Redis 6.0. Invalidation Messages leveres i følgende flow:

  1. Antag at ClientB allerede har læst nøglen en gang.
  2. ClientA indstiller den pågældende nøgle igen.
  3. Redis registrerer ændringen og udsteder en Invalidation Message til ClientB for at meddele ClientB at slette cachen.
  4. ClientB modtager meddelelsen og træffer passende foranstaltninger.

invalidation-message

Hvordan bruges det?

Grundlæggende operationsstruktur

En klient, der er forbundet til Redis, får besked om invalidation messages ved at udføre CLIENT TRACKING ON REDIRECT <client-id>. Og klienten, der skal modtage beskeder, abonnerer på SUBSCRIBE __redis__:invalidate for at modtage invalidation messages.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementering! Implementering! Implementering!

Redigo + Ristretto

Hvis det kun forklares på denne måde, er det uklart, hvordan det skal bruges i kode. Så lad os først konfigurere det med redigo og ristretto.

Installer først de to dependencies.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // number of keys to track frequency of (10M).
22		MaxCost:     1 << 30, // maximum cost of cache (1GB).
23		BufferItems: 64,      // number of keys per Get buffer.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Først oprettes en simpel RedisClient, der indeholder ristretto og redigo.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Koden er lidt kompleks.

  • For at udføre Tracking oprettes en yderligere forbindelse. Dette er en foranstaltning, der tager højde for, at PubSub kan forstyrre andre operationer.
  • Client-ID'et for den tilføjede forbindelse forespørges, og Tracking viderestilles til denne forbindelse fra den forbindelse, der skal forespørge data.
  • Og invalidation message abonneres på.
  • Koden til håndtering af abonnement er lidt kompleks. Da redigo ikke kan parse invalidation messages, skal svaret modtages og behandles før parsing.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Get-meddelelsen forespørger først ristretto som vist nedenfor, og hvis den ikke findes, hentes den fra Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

Koden til test er som ovenfor. Hvis du tester den, vil du kunne bekræfte, at værdien opdateres, hver gang dataene opdateres i Redis.

Men dette er for komplekst. Frem for alt er det uundgåeligt at aktivere Tracking for alle mastere eller replikaer for at skalere klyngen.

Rueidis

For os, der bruger Go-sprog, har vi den mest moderne og avancerede rueidis. Vi vil skrive kode, der bruger server assisted client side cache i et Redis-klyngemiljø ved hjælp af rueidis.

Først skal du installere afhængigheden.

  • github.com/redis/rueidis

Og skriv kode, der forespørger data fra Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

Rueidis kræver blot DoCache for at bruge client side cache. Derefter tilføjes det til den lokale cache med en indstilling for, hvor længe det skal bevares i den lokale cache, og data hentes fra den lokale cache ved at kalde DoCache igen. Naturligvis behandles invalidation messages også korrekt.

Hvorfor ikke redis-go?

redis-go understøtter desværre ikke server assisted client side cache som et officielt API. Desuden er der ingen API til direkte adgang til forbindelsen, når man opretter PubSub, da den opretter en ny forbindelse, så client id kan ikke kendes. Derfor blev redis-go udeladt, da det blev vurderet, at dens konfiguration ikke var mulig.

Sexet

Gennem client side cache-strukturen

  • Hvis data kan forberedes på forhånd, vil denne struktur muliggøre levering af de nyeste data med minimalt antal Redis-forespørgsler og trafik.
  • Dette kan bruges til at skabe en slags CQRS-struktur, hvilket dramatisk forbedrer læseydelsen.

cqrs

Hvor meget mere sexet er det blevet?

Faktisk bruges en sådan struktur i praksis, så jeg har lavet en simpel latensanalyse for de to API'er. Jeg beder om forståelse for, at jeg kun kan udtrykke det meget abstrakt.

  1. Første API
    1. Ved første forespørgsel: gennemsnitligt 14,63 ms
    2. Ved efterfølgende forespørgsler: gennemsnitligt 2,82 ms
    3. Gennemsnitlig forskel: 10,98 ms
  2. Andet API
    1. Ved første forespørgsel: gennemsnitligt 14,05 ms
    2. Ved efterfølgende forespørgsler: gennemsnitligt 1,60 ms
    3. Gennemsnitlig forskel: 11,57 ms

Der var en yderligere latensforbedring på op til 82%!

Jeg forventer, at følgende forbedringer er opnået:

  • Udeladelse af netværkskommunikationsprocessen mellem klienten og Redis og besparelse af trafik
  • Reduktion af antallet af læsekommandoer, som Redis selv skal udføre
    • Dette forbedrer også skriveydelsen.
  • Minimering af parsing af Redis-protokollen
    • Parsing af Redis-protokollen er heller ikke omkostningsfrit. At kunne reducere dette er en stor mulighed.

Men alt er en afvejning. For dette har vi ofret mindst de to følgende ting:

  • Implementering, drift og vedligeholdelse af cache management-elementer på klientsiden er nødvendig.
  • Øget CPU- og hukommelsesforbrug på klienten som følge heraf.

Konklusion

Personligt var det en tilfredsstillende arkitekturkomponent, og latensen og stresset på API-serveren var meget lav. Jeg mener, at det ville være godt at konfigurere arkitekturen med en sådan struktur, hvis det er muligt i fremtiden.