Verbetering van de Responsiviteit met Redis Client-Side Caching
Wat is Redis?
Ik denk niet dat er veel mensen zijn die Redis niet kennen. Desalniettemin kan ik het kort samenvatten met een paar kenmerken:
- Bewerkingen worden uitgevoerd in een enkele thread, wat betekent dat alle bewerkingen atomair zijn.
- Gegevens worden In-Memory opgeslagen en verwerkt, wat resulteert in snelle bewerkingen.
- Redis kan optioneel WAL opslaan, waardoor het snel de meest recente status kan back-uppen en herstellen.
- Het ondersteunt verschillende typen zoals Set, Hash, Bit en List, wat leidt tot hoge productiviteit.
- Het heeft een grote community, waardoor diverse ervaringen, problemen en oplossingen kunnen worden gedeeld.
- Het is al lange tijd in ontwikkeling en operatie, wat resulteert in betrouwbare stabiliteit.
En nu ter zake
Stelt u zich eens voor?
Wat als de cache van uw service aan de volgende twee voorwaarden voldoet?
- Gegevens die frequent worden opgevraagd, moeten in de meest recente staat aan de gebruiker worden geleverd, maar de updates zijn onregelmatig, waardoor frequente cache-updates noodzakelijk zijn.
- Updates zijn niet nodig, maar dezelfde cachegegevens moeten frequent worden benaderd en opgevraagd.
Het eerste geval kan worden overwogen voor realtime populariteitsranglijsten van een winkel. Wanneer de realtime populariteitsranglijsten van een winkel als een sorted set worden opgeslagen, is het inefficiënt voor Redis om deze elke keer te lezen wanneer een gebruiker de hoofdpagina benadert. In het tweede geval, voor wisselkoersgegevens, zelfs als de wisselkoers ongeveer elke 10 minuten wordt bekendgemaakt, vinden de daadwerkelijke opvragingen zeer frequent plaats. Bovendien worden de cachegegevens voor Won-Dollar, Won-Yen en Won-Yuan zeer frequent opgevraagd. In dergelijke gevallen zou het efficiënter zijn als de API-server een aparte lokale cache zou hebben en de Redis-cache opnieuw zou opvragen en bijwerken wanneer de gegevens veranderen.
Hoe kunnen we dan deze functionaliteit implementeren in een database - Redis - API-server architectuur?
Is Redis PubSub niet voldoende?
Wanneer u een cache gebruikt, abonneer u dan op een kanaal dat de update status kan ontvangen!
- Dan moet er een logica worden gecreëerd die berichten verzendt bij een update.
- Extra bewerkingen door PubSub beïnvloeden de prestaties.
Wat als Redis veranderingen detecteert?
Wat als we Keyspace Notification gebruiken om commando meldingen te ontvangen voor de desbetreffende sleutel?
- Er is een omslachtige procedure om de sleutel en het commando die voor de update worden gebruikt, vooraf op te slaan en te delen.
- Bijvoorbeeld, voor sommige sleutels is een eenvoudige Set het updatecommando, terwijl voor andere sleutels LPush, RPush of SAdd en SRem het updatecommando zijn, wat de complexiteit verhoogt.
- Dit verhoogt de kans op communicatiefouten en menselijke fouten in de code tijdens het ontwikkelingsproces aanzienlijk.
Wat als we Keyevent Notification gebruiken om meldingen per commando te ontvangen?
- Abonnement op alle gebruikte updatecommando's is vereist. Passende filtering van de binnenkomende sleutels is noodzakelijk.
- Bijvoorbeeld, voor sommige van alle sleutels die binnenkomen via Del, is het zeer waarschijnlijk dat de betreffende client geen lokale cache heeft.
- Dit kan leiden tot onnodig resourceverbruik.
Daarom is Invalidation Message noodzakelijk!
Wat is een Invalidation Message?
Invalidation Messages zijn een concept dat wordt aangeboden als onderdeel van Server Assisted Client-Side Cache, toegevoegd vanaf Redis 6.0. Invalidation Messages worden in de volgende stroom geleverd:
- Aangenomen wordt dat ClientB de sleutel al eenmaal heeft gelezen.
- ClientA stelt de sleutel opnieuw in.
- Redis detecteert de wijziging en publiceert een Invalidation Message naar ClientB om ClientB te informeren de cache te wissen.
- ClientB ontvangt dit bericht en neemt de passende maatregelen.
Hoe wordt het gebruikt?
Basiswerkingsstructuur
Een client die verbonden is met Redis, zorgt ervoor dat CLIENT TRACKING ON REDIRECT <client-id>
wordt uitgevoerd om invalidation messages te ontvangen. En de client die berichten moet ontvangen, abonneert zich op SUBSCRIBE __redis__:invalidate
om invalidation messages te ontvangen.
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Implementatie! Implementatie! Implementatie!
Redigo + Ristretto
Als ik het zo uitleg, is het misschien onduidelijk hoe het daadwerkelijk in code moet worden gebruikt. Laten we het daarom eenvoudigweg configureren met redigo
en ristretto
.
Installeer eerst de twee afhankelijkheden.
github.com/gomodule/redigo
github.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // aantal sleutels om de frequentie van bij te houden (10M).
22 MaxCost: 1 << 30, // maximale kosten van de cache (1GB).
23 BufferItems: 64, // aantal sleutels per Get buffer.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Eerst creëren we eenvoudigweg een RedisClient
die ristretto en redigo omvat.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
De code is enigszins complex.
- Er wordt een extra connectie tot stand gebracht voor Tracking. Dit is een maatregel die rekening houdt met de mogelijkheid dat PubSub andere operaties kan hinderen.
- Het ID van de toegevoegde connectie wordt opgevraagd, waarna de connectie die de gegevens moet opvragen Tracking naar deze connectie omleidt.
- Vervolgens wordt geabonneerd op invalidation messages.
- De code voor het afhandelen van het abonnement is enigszins complex. Omdat redigo invalidation messages niet kan parsen, moet de respons vóór parsing worden ontvangen en verwerkt.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Het Get
-bericht zoekt eerst in Ristretto, en als het daar niet wordt gevonden, haalt het de waarde op uit Redis, zoals hieronder weergegeven.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
De code om te testen is zoals hierboven weergegeven. Als u het eenmaal test, zult u zien dat de waarden worden bijgewerkt telkens wanneer de gegevens in Redis worden vernieuwd.
Dit is echter te complex. Bovendien is het onvermijdelijk om Tracking op alle masters of replica's te activeren om een cluster uit te breiden.
Rueidis
Aangezien we Go gebruiken, hebben we rueidis
, de meest moderne en geavanceerde optie. We zullen code schrijven die gebruikmaakt van server assisted client side cache in een Redis-clusteromgeving met rueidis.
Installeer eerst de afhankelijkheid.
github.com/redis/rueidis
En schrijf vervolgens de code om gegevens uit Redis op te vragen.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
Met rueidis hoeft u alleen DoCache
te gebruiken om client side cache te benutten. Dit voegt de gegevens toe aan de lokale cache en bepaalt hoe lang deze daar moeten blijven. Als u vervolgens opnieuw DoCache
aanroept, worden de gegevens uit de lokale cache opgehaald. Uiteraard worden invalidation messages ook correct verwerkt.
Waarom geen redis-go?
redis-go
ondersteunt helaas de server assisted client side cache niet via zijn officiële API. Bovendien is er geen API om direct toegang te krijgen tot de client-ID wanneer een nieuwe connectie wordt gemaakt voor PubSub, waardoor het onmogelijk is om de client-ID te achterhalen. Daarom is geoordeeld dat redis-go
niet geschikt is voor deze configuratie en is het gepasseerd.
Sexy
Via de client side cache structuur
- Als de gegevens vooraf kunnen worden voorbereid, kan deze structuur de query's en het verkeer naar Redis minimaliseren en altijd de meest recente gegevens leveren.
- Hierdoor kan een soort CQRS-structuur worden gecreëerd, wat de leesprestaties exponentieel verbetert.
Hoeveel sexier is het geworden?
Ik heb de latentie voor twee API's kort onderzocht, aangezien deze structuur daadwerkelijk in de praktijk wordt gebruikt. Houd er rekening mee dat ik het alleen abstract kan beschrijven.
- Eerste API
- Eerste query: gemiddeld 14.63ms
- Latere query's: gemiddeld 2.82ms
- Gemiddeld verschil: 10.98ms
- Tweede API
- Eerste query: gemiddeld 14.05ms
- Latere query's: gemiddeld 1.60ms
- Gemiddeld verschil: 11.57ms
Er was een extra latentieverbetering van wel 82%!
De volgende verbeteringen worden verwacht:
- Eliminatie van netwerkcommunicatie tussen client en Redis en besparing van verkeer.
- Vermindering van het aantal leescommando's dat Redis zelf moet uitvoeren.
- Dit heeft ook het effect dat de schrijfprestaties toenemen.
- Minimalisering van het parsen van Redis-protocollen.
- Het parsen van Redis-protocollen brengt niet nul kosten met zich mee. Het verminderen hiervan biedt een grote kans.
Echter, alles is een afweging. Hiervoor hebben we ten minste de volgende twee zaken opgeofferd:
- Noodzaak tot implementatie, operatie en onderhoud van client-side cache management componenten.
- Verhoogd CPU- en geheugengebruik van de client als gevolg hiervan.
Conclusie
Persoonlijk was ik tevreden met de architectuurcomponent, en de latentie en belasting van de API-server waren zeer laag. Ik ben van mening dat het in de toekomst wenselijk zou zijn om de architectuur zo te structureren, indien mogelijk.