GoSuda

De responsiviteit verbeteren met Redis Client-Side Caching

By snowmerak
views ...

Wat is Redis?

Ik neem aan dat weinig mensen Redis niet kennen. Desalniettemin kan het, kort samengevat met enkele kenmerken, als volgt worden omschreven:

  • Operaties worden uitgevoerd in een enkele thread, waardoor alle operaties atomisch zijn.
  • Gegevens worden opgeslagen en bewerkt in In-Memory, waardoor alle operaties snel zijn.
  • Redis kan, afhankelijk van de opties, WAL opslaan, waardoor de meest recente status snel kan worden geback-upt en hersteld.
  • Het ondersteunt diverse typen zoals Set, Hash, Bit, List, wat resulteert in hoge productiviteit.
  • Het heeft een grote community, waardoor diverse ervaringen, problemen en oplossingen kunnen worden gedeeld.
  • Het is langdurig ontwikkeld en operationeel, wat zorgt voor betrouwbare stabiliteit.

En nu ter zake

Stelt u zich eens voor?

Wat als de cache van uw dienst aan de volgende twee voorwaarden voldoet?

  1. Vaak opgevraagde gegevens moeten in de meest recente staat aan de gebruiker worden geleverd, maar de updates zijn onregelmatig, waardoor de cache frequent moet worden vernieuwd.
  2. Updates vinden niet plaats, maar dezelfde cachedata moet frequent worden benaderd en opgevraagd.

Het eerste geval betreft de realtime populariteitsranglijst van een online winkel. Wanneer de realtime populariteitsranglijst van een online winkel wordt opgeslagen als een sorted set, zou het inefficiënt zijn als Redis deze elke keer uitleest wanneer een gebruiker de hoofdpagina bezoekt. In het tweede geval, voor wisselkoersgegevens, zelfs als de wisselkoersgegevens ongeveer elke 10 minuten worden gepubliceerd, vinden de daadwerkelijke opvragingen zeer frequent plaats. Bovendien worden de wisselkoersen voor Won-Dollar, Won-Yen en Won-Yuan zeer frequent uit de cache opgevraagd. In dergelijke gevallen zou het efficiënter zijn als de API-server een aparte lokale cache heeft en deze bij een wijziging van de gegevens bijwerkt door Redis opnieuw te raadplegen.

Hoe kunnen we een dergelijke functionaliteit implementeren in een architectuur met Database - Redis - API-server?

Kan het niet met Redis PubSub?

Laten we ons abonneren op een kanaal dat updates ontvangt wanneer de cache wordt gebruikt!

  • Dan moeten we logica creëren die berichten verzendt bij updates.
  • De extra operaties als gevolg van PubSub beïnvloeden de prestaties.

pubsub-write

pubsub-read

Wat als Redis wijzigingen detecteert?

Wat als we Keyspace Notification gebruiken om commando-notificaties voor een specifieke sleutel te ontvangen?

  • Er is de omslachtige taak om de sleutels en commando's die voor updates worden gebruikt, vooraf op te slaan en te delen.
  • Bijvoorbeeld, voor sommige sleutels is een eenvoudige Set het updatecommando, terwijl voor andere een LPush, RPush, SAdd of SRem het updatecommando is, wat de complexiteit verhoogt.
  • Dit verhoogt aanzienlijk de kans op communicatiefouten en menselijke fouten tijdens het ontwikkelingsproces.

Wat als we Keyevent Notification gebruiken om notificaties per commando te ontvangen?

  • Alle commando's die voor updates worden gebruikt, moeten worden geabonneerd. Vervolgens is er een adequate filtering nodig voor de binnenkomende sleutels.
  • Bijvoorbeeld, voor sommige sleutels die binnenkomen via Del, is de kans groot dat de betreffende client geen lokale cache heeft.
  • Dit kan leiden tot onnodige verspilling van resources.

Daarom is er behoefte aan Invalidation Message!

Wat is Invalidation Message?

Invalidation Messages zijn een concept dat wordt aangeboden als onderdeel van Server Assisted Client-Side Cache, toegevoegd vanaf Redis 6.0. Invalidation Messages worden volgens de volgende stroom geleverd:

  1. ClientB heeft de sleutel al eens gelezen.
  2. ClientA stelt de sleutel opnieuw in.
  3. Redis detecteert de wijziging en publiceert een Invalidation Message naar ClientB om ClientB te laten weten dat de cache moet worden gewist.
  4. ClientB ontvangt het bericht en onderneemt passende actie.

invalidation-message

Hoe te gebruiken

Basiswerkingsstructuur

Een client die is verbonden met Redis, ontvangt invalidation messages door CLIENT TRACKING ON REDIRECT <client-id> uit te voeren. En de client die berichten moet ontvangen, abonneert zich op SUBSCRIBE __redis__:invalidate om invalidation messages te ontvangen.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementatie! Implementatie! Implementatie!

Redigo + Ristretto

Alleen zo'n uitleg maakt het onduidelijk hoe het daadwerkelijk in code te gebruiken. Laten we daarom eerst een eenvoudige configuratie maken met redigo en ristretto.

Installeer eerst de twee dependencies.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // aantal sleutels om de frequentie van bij te houden (10M).
22		MaxCost:     1 << 30, // maximale kosten van cache (1GB).
23		BufferItems: 64,      // aantal sleutels per Get-buffer.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Eerst creëren we eenvoudig een RedisClient die ristretto en redigo omvat.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

De code is enigszins complex.

  • Er wordt een extra verbinding tot stand gebracht voor Tracking. Dit is een maatregel om te voorkomen dat PubSub andere operaties hindert.
  • De ID van de toegevoegde verbinding wordt opgevraagd, waarna de Tracking van de verbinding die gegevens opvraagt, wordt omgeleid naar deze verbinding.
  • Vervolgens wordt geabonneerd op invalidation messages.
  • De code voor het verwerken van de abonnementen is enigszins complex. Omdat redigo invalidation messages niet kan parsen, moeten de antwoorden vóór parsing worden verwerkt.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Het Get-bericht zoekt eerst in Ristretto, en als het daar niet wordt gevonden, haalt het de waarde op uit Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

De code om te testen is zoals hierboven weergegeven. Als u deze test, zult u zien dat de waarde wordt bijgewerkt telkens wanneer gegevens in Redis worden vernieuwd.

Dit is echter te complex. Bovendien is het onvermijdelijk om Tracking te activeren voor alle masters of replica's om uit te breiden naar een cluster.

Rueidis

Voor Go-ontwikkelaars is er de meest moderne en geavanceerde rueidis. Laten we code schrijven die de server assisted client side cache gebruikt in een Redis-clusteromgeving met rueidis.

Installeer eerst de dependency.

  • github.com/redis/rueidis

Schrijf vervolgens de code om gegevens uit Redis op te vragen.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

Voor client-side cache met rueidis volstaat DoCache. Dit voegt het item toe aan de lokale cache met een opgegeven retentietijd, en bij een volgende DoCache-aanroep wordt de data uit de lokale cache opgehaald. Uiteraard worden invalidation messages ook correct verwerkt.

Waarom geen redis-go?

redis-go ondersteunt helaas de server assisted client side cache niet via de officiële API. Bovendien is er geen API om direct toegang te krijgen tot de verbinding wanneer een PubSub wordt aangemaakt met een nieuwe verbinding, waardoor de client-ID niet kan worden achterhaald. Daarom werd geoordeeld dat een configuratie met redis-go onmogelijk was en deze werd overgeslagen.

Sexy

Via de client side cache-structuur

  • Als gegevens vooraf kunnen worden voorbereid, kan deze structuur de Redis-query's en het verkeer minimaliseren en tegelijkertijd altijd de meest recente gegevens leveren.
  • Hierdoor kan een soort CQRS-structuur worden gecreëerd om de leesprestaties exponentieel te verbeteren.

cqrs

Hoeveel sexier is het geworden?

Aangezien een dergelijke structuur daadwerkelijk in de praktijk wordt gebruikt, heb ik de latency van twee API's kort onderzocht. Ik vraag uw begrip dat ik dit slechts abstract kan beschrijven.

  1. Eerste API
    1. Bij eerste opvraging: gemiddeld 14.63ms
    2. Bij daaropvolgende opvragingen: gemiddeld 2.82ms
    3. Gemiddeld verschil: 10.98ms
  2. Tweede API
    1. Bij eerste opvraging: gemiddeld 14.05ms
    2. Bij daaropvolgende opvragingen: gemiddeld 1.60ms
    3. Gemiddeld verschil: 11.57ms

Er was een aanvullende latentieverbetering van wel 82%!

Ik verwacht dat de volgende verbeteringen zijn gerealiseerd:

  • Het overslaan van netwerkcommunicatie tussen client en Redis en besparing op verkeer.
  • Vermindering van het aantal leescommando's dat Redis zelf moet uitvoeren.
    • Dit heeft ook het effect dat de schrijfprestaties worden verbeterd.
  • Minimalisatie van het parsen van het Redis-protocol.
    • Het parsen van het Redis-protocol is niet kosteloos. Dit kunnen verminderen is een grote kans.

Echter, alles is een afweging. Hiervoor hebben we minstens de volgende twee zaken opgeofferd:

  • De noodzaak om client-side cachebeheerelementen te implementeren, te exploiteren en te onderhouden.
  • De daaruit voortvloeiende toename van CPU- en geheugengebruik aan de clientzijde.

Conclusie

Persoonlijk was ik tevreden met de architectuurcomponent en de belasting van de API-server wat betreft latency en stress was zeer laag. Ik ben van mening dat het wenselijk zou zijn om in de toekomst, indien mogelijk, architectuur op deze manier op te bouwen.