GoSuda

Reaktivität durch Redis Client-Side Caching verbessern

By snowmerak
views ...

Was ist Redis?

Ich denke, es gibt kaum jemanden, der Redis nicht kennt. Dennoch möchte ich kurz einige Merkmale zusammenfassen:

  • Operationen werden in einem Single Thread ausgeführt, wodurch alle Operationen Atomicity besitzen.
  • Daten werden In-Memory gespeichert und verarbeitet, wodurch alle Operationen schnell sind.
  • Redis kann optional WAL speichern, was eine schnelle Sicherung und Wiederherstellung des neuesten Zustands ermöglicht.
  • Es unterstützt verschiedene Typen wie Set, Hash, Bit, List, was zu hoher Productivity führt.
  • Es verfügt über eine große Community, in der vielfältige Erfahrungen, Issues und Lösungen geteilt werden können.
  • Es wird seit langer Zeit entwickelt und betrieben, was eine zuverlässige Stability gewährleistet.

Nun zum Hauptthema

Stellen Sie sich vor...

Was wäre, wenn der Cache Ihres Dienstes die folgenden zwei Bedingungen erfüllt?

  1. Es müssen häufig abgefragte Daten dem Benutzer im neuesten Zustand bereitgestellt werden, aber die Aktualisierung erfolgt unregelmäßig, sodass der Cache häufig aktualisiert werden muss.
  2. Die Daten werden nicht aktualisiert, aber es muss häufig auf dieselben Cache-Daten zugegriffen und diese abgefragt werden.

Der erste Fall könnte die Echtzeit-Beliebtheitsrangliste eines Online-Shops sein. Wenn die Echtzeit-Beliebtheitsrangliste eines Online-Shops als Sorted Set gespeichert wird, ist es ineffizient, wenn Redis sie jedes Mal liest, wenn ein Benutzer auf die Hauptseite zugreift. Im zweiten Fall, bei Währungsdaten, wird der Wechselkurs zwar etwa alle 10 Minuten veröffentlicht, aber die tatsächliche Abfrage erfolgt sehr häufig. Insbesondere für Won-Dollar, Won-Yen und Won-Yuan wird der Cache sehr häufig abgefragt. In solchen Fällen wäre es effizienter, wenn der API-Server einen separaten lokalen Cache hätte und diesen aktualisieren würde, indem er Redis erneut abfragt, sobald sich die Daten ändern.

Wie kann man also ein solches Verhalten in einer Datenbank - Redis - API-Server-Struktur implementieren?

Geht das nicht mit Redis PubSub?

Wenn Sie einen Cache verwenden, abonnieren Sie einen Kanal, über den Sie Benachrichtigungen über Aktualisierungen erhalten können!

  • Dann müssen Sie eine Logik erstellen, die Nachrichten bei der Aktualisierung sendet.
  • Zusätzliche Operationen durch PubSub beeinflussen die Performance.

pubsub-write

pubsub-read

Was ist, wenn Redis Änderungen erkennt?

Was wäre, wenn man Keyspace Notifications verwendet, um Benachrichtigungen über Befehle für einen bestimmten Key zu erhalten?

  • Es ist umständlich, die zum Aktualisieren verwendeten Keys und Commands im Voraus zu speichern und zu teilen.
  • Zum Beispiel wird es komplex, wenn für einen bestimmten Key ein einfacher Set der Update-Command ist, während für einen anderen Key LPush, RPush, SAdd oder SRem der Update-Command ist.
  • Dies erhöht erheblich die Wahrscheinlichkeit von Kommunikationsfehlern und Human Errors bei der Codierung während des Entwicklungsprozesses.

Was wäre, wenn man Keyevent Notifications verwendet, um Benachrichtigungen auf Command-Ebene zu erhalten?

  • Es ist notwendig, alle zur Aktualisierung verwendeten Commands zu abonnieren. Dort ist eine angemessene Filterung der eingehenden Keys erforderlich.
  • Zum Beispiel ist es wahrscheinlich, dass ein Client keinen lokalen Cache für einige der Keys hat, die über Del eingehen.
  • Dies kann zu unnötiger Ressourcenverschwendung führen.

Daher ist eine Invalidation Message erforderlich!

Was ist eine Invalidation Message?

Invalidation Messages sind ein Konzept, das als Teil des Server Assisted Client-Side Cache, der ab Redis 6.0 verfügbar ist, bereitgestellt wird. Eine Invalidation Message wird in folgendem Ablauf übermittelt:

  1. Angenommen, ClientB hat einen Key bereits einmal gelesen.
  2. ClientA setzt diesen Key neu.
  3. Redis erkennt die Änderung und veröffentlicht eine Invalidation Message an ClientB, um ClientB mitzuteilen, den Cache zu löschen.
  4. ClientB empfängt diese Nachricht und ergreift entsprechende Maßnahmen.

invalidation-message

Wie man es benutzt

Grundlegende Funktionsweise

Ein mit Redis verbundener Client aktiviert den Empfang von Invalidation Messages, indem er CLIENT TRACKING ON REDIRECT <client-id> ausführt. Der Client, der die Nachrichten empfangen soll, abonniert dann SUBSCRIBE __redis__:invalidate, um Invalidation Messages zu erhalten.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementierung! Implementierung! Implementierung!

Redigo + Ristretto

Nur so erklärt, ist es unklar, wie man es tatsächlich im Code verwenden soll. Daher werden wir es zunächst einfach mit redigo und ristretto konfigurieren.

Installieren Sie zuerst die beiden Dependencies.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // Anzahl der Keys, deren Häufigkeit verfolgt werden soll (10M).
22		MaxCost:     1 << 30, // Maximale Kosten des Caches (1GB).
23		BufferItems: 64,      // Anzahl der Keys pro Get-Buffer.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Zunächst wird ein einfacher RedisClient erstellt, der Ristretto und Redigo beinhaltet.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Der Code ist etwas komplex.

  • Um das Tracking durchzuführen, wird eine weitere Verbindung hergestellt. Dies ist eine Maßnahme, um zu verhindern, dass PubSub andere Operationen stört.
  • Die ID der hinzugefügten Verbindung wird abgefragt, und die Verbindung, die Daten abfragen soll, leitet das Tracking an diese Verbindung um.
  • Und es werden Invalidation Messages abonniert.
  • Der Code zur Verarbeitung des Abonnements ist etwas komplex. Da Redigo keine Invalidation Messages parsen kann, muss die Antwort vor dem Parsen empfangen und verarbeitet werden.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Die Get-Nachricht fragt zuerst Ristretto ab und holt sie von Redis, falls nicht gefunden.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

Der Code zum Testen ist wie oben. Wenn Sie es einmal testen, werden Sie sehen, dass die Werte jedes Mal aktualisiert werden, wenn Daten in Redis aktualisiert werden.

Dies ist jedoch zu komplex. Vor allem ist es unumgänglich, das Tracking für alle Master- oder Replika-Instanzen zu aktivieren, um auf Cluster zu erweitern.

Rueidis

Da wir Go verwenden, haben wir rueidis, das modernste und fortschrittlichste. Wir werden Code schreiben, der Server Assisted Client Side Cache in einer Redis-Cluster-Umgebung mit Rueidis verwendet.

Zuerst installieren Sie die Abhängigkeit.

  • github.com/redis/rueidis

Und schreiben Sie den Code, um Daten aus Redis abzufragen.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

Für die Verwendung von Client Side Cache mit Rueidis genügt ein einfacher DoCache-Aufruf. Dieser fügt die Daten zum lokalen Cache hinzu und legt fest, wie lange sie dort verbleiben sollen. Bei wiederholtem Aufruf von DoCache werden die Daten aus dem lokalen Cache abgerufen. Natürlich werden auch Invalidation Messages korrekt verarbeitet.

Warum nicht redis-go?

redis-go unterstützt leider den Server Assisted Client Side Cache nicht über die offizielle API. Darüber hinaus gibt es keine API, die direkten Zugriff auf die neue Verbindung beim Erstellen von PubSub ermöglicht, sodass die Client ID nicht ermittelt werden kann. Daher wurde redis-go als ungeeignet für die Konfiguration befunden und übersprungen.

Sehr elegant

Durch die Client Side Cache Struktur

  • Wenn Daten im Voraus vorbereitet werden können, kann diese Struktur die Abfragen und den Traffic zu Redis minimieren und stets die neuesten Daten bereitstellen.
  • Dadurch kann eine Art CQRS-Struktur geschaffen werden, um die Lese-Performance erheblich zu steigern.

cqrs

Wie viel eleganter ist es geworden?

Da eine solche Struktur derzeit tatsächlich im Einsatz ist, habe ich die Latenzzeiten für zwei APIs kurz untersucht. Bitte haben Sie Verständnis dafür, dass ich dies nur sehr abstrakt darstellen kann.

  1. Erste API
    1. Bei erster Abfrage: Durchschnittlich 14.63ms
    2. Bei nachfolgenden Abfragen: Durchschnittlich 2.82ms
    3. Durchschnittliche Differenz: 10.98ms
  2. Zweite API
    1. Bei erster Abfrage: Durchschnittlich 14.05ms
    2. Bei nachfolgenden Abfragen: Durchschnittlich 1.60ms
    3. Durchschnittliche Differenz: 11.57ms

Es gab eine zusätzliche Latenzverbesserung von bis zu 82%!

Ich erwarte, dass folgende Verbesserungen erzielt wurden:

  • Eliminierung der Netzwerkkommunikation zwischen Client und Redis und Traffic-Einsparungen
  • Reduzierung der Anzahl der Lese-Commands, die Redis selbst ausführen muss
    • Dies hat auch den Effekt, die Schreib-Performance zu verbessern.
  • Minimierung des Parsings des Redis-Protokolls
    • Das Parsen des Redis-Protokolls ist nicht kostenlos. Dies zu reduzieren ist eine große Chance.

Doch alles hat seinen Preis. Dafür haben wir mindestens die folgenden beiden Punkte geopfert:

  • Implementierung und Wartung von Client Side Cache Management-Komponenten erforderlich
  • Erhöhter CPU- und Speicherverbrauch auf Clientseite

Fazit

Persönlich war dies eine zufriedenstellende Architekturkomponente, und die Latenz sowie die Belastung des API-Servers waren sehr gering. Ich denke, es wäre gut, die Architektur auch in Zukunft, wenn möglich, in dieser Weise zu gestalten.