GoSuda

Verbesserung der Reaktionsfähigkeit mit Redis Client-Side Caching

By snowmerak
views ...

Was ist Redis?

Ich denke, es gibt kaum jemanden, der Redis nicht kennt. Dennoch möchte ich es kurz mit einigen seiner Merkmale zusammenfassen:

  • Operationen werden in einem einzelnen Thread ausgeführt, was bedeutet, dass alle Operationen atomar sind.
  • Daten werden In-Memory gespeichert und verarbeitet, wodurch alle Operationen schnell sind.
  • Redis kann optional WAL speichern, wodurch der neueste Zustand schnell gesichert und wiederhergestellt werden kann.
  • Es unterstützt verschiedene Typen wie Set, Hash, Bit, List, was zu hoher Produktivität führt.
  • Es verfügt über eine große Community, wodurch vielfältige Erfahrungen, Probleme und Lösungen geteilt werden können.
  • Es wird seit langem entwickelt und betrieben, was eine zuverlässige Stabilität gewährleistet.

Nun zum Hauptthema

Stellen Sie sich vor?

Was wäre, wenn der Cache Ihres Dienstes die folgenden zwei Bedingungen erfüllt?

  1. Es ist notwendig, Benutzern häufig abgefragte Daten im neuesten Zustand bereitzustellen, aber die Aktualisierung ist unregelmäßig, sodass der Cache häufig aktualisiert werden muss.
  2. Die Daten werden nicht aktualisiert, aber es muss häufig auf dieselben Cachedaten zugegriffen und diese abgefragt werden.

Der erste Fall könnte die Echtzeit-Beliebtheitsrangliste eines Online-Shops sein. Wenn die Echtzeit-Beliebtheitsrangliste als Sorted Set in Redis gespeichert wird, wäre es ineffizient, sie jedes Mal zu lesen, wenn ein Benutzer die Hauptseite aufruft. Im zweiten Fall, bei Wechselkursdaten, werden die Wechselkursdaten zwar etwa alle 10 Minuten veröffentlicht, aber die tatsächlichen Abfragen erfolgen sehr häufig. Insbesondere für Won-Dollar, Won-Yen und Won-Yuan wird der Cache sehr häufig abgefragt. In solchen Fällen wäre es effizienter, wenn der API-Server einen separaten lokalen Cache hätte und bei Datenänderungen Redis erneut abfragen würde, um den Cache zu aktualisieren.

Wie kann man also ein solches Verhalten in einer Datenbank - Redis - API-Server-Struktur implementieren?

Geht das nicht mit Redis PubSub?

Wenn wir einen Cache verwenden, abonnieren wir einen Kanal, der uns über Aktualisierungen informiert!

  • Dann müsste eine Logik erstellt werden, die bei Aktualisierungen Nachrichten sendet.
  • Zusätzliche Operationen durch PubSub beeinflussen die Performance.

pubsub-write

pubsub-read

Was wäre, wenn Redis Änderungen erkennen würde?

Was wäre, wenn wir Keyspace Notifications verwenden würden, um Befehlsbenachrichtigungen für einen bestimmten Schlüssel zu erhalten?

  • Es besteht der Aufwand, die zur Aktualisierung verwendeten Schlüssel und Befehle im Voraus zu speichern und zu teilen.
  • Zum Beispiel wird es komplex, da für einen Schlüssel ein einfaches Set der Aktualisierungsbefehl sein kann, für einen anderen Schlüssel LPush, RPush, SAdd oder SRem der Aktualisierungsbefehl ist.
  • Dies erhöht das Potenzial für Kommunikationsfehler und menschliche Fehler beim Programmieren während des Entwicklungsprozesses erheblich.

Was wäre, wenn wir Keyevent Notifications verwenden würden, um Benachrichtigungen pro Befehl zu erhalten?

  • Es wäre notwendig, alle zur Aktualisierung verwendeten Befehle zu abonnieren. Eine geeignete Filterung der eingehenden Schlüssel wäre erforderlich.
  • Zum Beispiel ist es wahrscheinlich, dass einige der durch Del eingehenden Schlüssel für den betreffenden Client keinen lokalen Cache haben.
  • Dies könnte zu unnötigem Ressourcenverbrauch führen.

Daher ist eine Invalidation Message erforderlich!

Was ist eine Invalidation Message?

Invalidation Messages sind ein Konzept, das seit Redis 6.0 als Teil des Server Assisted Client-Side Cache angeboten wird. Invalidation Messages werden in folgendem Ablauf übermittelt:

  1. Es wird angenommen, dass ClientB einen Schlüssel bereits einmal gelesen hat.
  2. ClientA setzt diesen Schlüssel neu.
  3. Redis erkennt die Änderung und sendet eine Invalidation Message an ClientB, um ClientB mitzuteilen, den Cache zu löschen.
  4. ClientB empfängt diese Nachricht und ergreift entsprechende Maßnahmen.

invalidation-message

Wie wird es verwendet?

Grundlegende Funktionsweise

Ein mit Redis verbundener Client aktiviert den Empfang von Invalidation Messages, indem er CLIENT TRACKING ON REDIRECT <client-id> ausführt. Der Client, der die Nachrichten empfangen soll, abonniert dann SUBSCRIBE __redis__:invalidate, um Invalidation Messages zu erhalten.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementierung! Implementierung! Implementierung!

Redigo + Ristretto

Nur mit dieser Erklärung wäre es unklar, wie es tatsächlich im Code verwendet werden sollte. Daher werden wir es kurz mit redigo und ristretto konfigurieren.

Zuerst werden die beiden Abhängigkeiten installiert.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // number of keys to track frequency of (10M).
22		MaxCost:     1 << 30, // maximum cost of cache (1GB).
23		BufferItems: 64,      // number of keys per Get buffer.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Zuerst wird ein einfacher RedisClient erstellt, der Ristretto und Redigo enthält.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Der Code ist etwas komplex.

  • Um das Tracking durchzuführen, wird eine zusätzliche Verbindung aufgebaut. Dies ist eine Maßnahme, um zu verhindern, dass PubSub andere Operationen stört.
  • Die ID der hinzugefügten Verbindung wird abgefragt, und das Tracking wird von der Verbindung, die Daten abfragt, auf diese Verbindung umgeleitet.
  • Anschließend wird die Invalidation Message abonniert.
  • Der Code zur Verarbeitung des Abonnements ist etwas komplex. Da Redigo die Invalidation Message nicht parsen kann, muss die Antwort vor dem Parsing empfangen und verarbeitet werden.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Die Get-Nachricht fragt zuerst Ristretto ab. Falls der Wert dort nicht gefunden wird, wird er von Redis geholt.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

Der Code zum Testen ist der oben gezeigte. Wenn Sie ihn testen, werden Sie sehen, dass die Werte jedes Mal neu aktualisiert werden, wenn Daten in Redis aktualisiert werden.

Dies ist jedoch zu komplex. Vor allem ist es unumgänglich, das Tracking für alle Master oder Replikate zu aktivieren, um auf Cluster zu skalieren.

Rueidis

Für Go-Sprachanwender steht uns rueidis zur Verfügung, die modernste und am weitesten entwickelte Bibliothek. Wir werden Code schreiben, der den Server-Assisted Client-Side Cache in einer Redis-Cluster-Umgebung mit rueidis verwendet.

Zuerst wird die Abhängigkeit installiert.

  • github.com/redis/rueidis

Danach wird der Code zum Abfragen von Daten in Redis geschrieben.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

Mit Rueidis muss man für die Verwendung des Client-Side Cache lediglich DoCache aufrufen. Dadurch wird der Wert zum lokalen Cache hinzugefügt, wie lange er im lokalen Cache bleiben soll, und bei wiederholtem Aufruf von DoCache werden die Daten aus dem lokalen Cache abgerufen. Selbstverständlich werden auch Invalidation Messages korrekt verarbeitet.

Warum nicht redis-go?

redis-go unterstützt leider den Server Assisted Client-Side Cache nicht über eine offizielle API. Darüber hinaus gibt es keine API, um direkt auf die Verbindung zuzugreifen, die beim Erstellen von PubSub eine neue Verbindung herstellt, sodass die Client-ID nicht ermittelt werden kann. Daher wurde redis-go als ungeeignet für diese Konfiguration befunden und übersprungen.

Das ist sexy

Durch die Client-Side-Cache-Struktur

  • Wenn Daten im Voraus vorbereitet werden können, ermöglicht diese Struktur, die Redis-Abfragen und den Datenverkehr zu minimieren und stets die neuesten Daten bereitzustellen.
  • Dadurch kann eine Art CQRS-Struktur geschaffen werden, die die Leseleistung erheblich steigert.

cqrs

Wie viel sexier ist es geworden?

Tatsächlich wird eine solche Struktur bereits vor Ort verwendet, daher habe ich die Latenz für zwei APIs kurz untersucht. Bitte haben Sie Verständnis dafür, dass ich dies nur sehr abstrakt darstellen kann.

  1. Erste API
    1. Bei der ersten Abfrage: Durchschnittlich 14.63ms
    2. Bei nachfolgenden Abfragen: Durchschnittlich 2.82ms
    3. Durchschnittlicher Unterschied: 10.98ms
  2. Zweite API
    1. Bei der ersten Abfrage: Durchschnittlich 14.05ms
    2. Bei nachfolgenden Abfragen: Durchschnittlich 1.60ms
    3. Durchschnittlicher Unterschied: 11.57ms

Es gab eine zusätzliche Latenzverbesserung von bis zu 82%!

Es wird erwartet, dass folgende Verbesserungen erzielt wurden:

  • Eliminierung der Netzwerkkommunikation zwischen Client und Redis und Einsparung von Traffic.
  • Reduzierung der Anzahl der Lese-Befehle, die Redis selbst ausführen muss.
    • Dies führt auch zu einer Verbesserung der Schreibleistung.
  • Minimierung des Parsens des Redis-Protokolls.
    • Das Parsen des Redis-Protokolls ist nicht kostenlos. Dies zu reduzieren, ist eine große Chance.

Aber alles ist ein Kompromiss. Dafür haben wir mindestens die folgenden zwei Punkte geopfert:

  • Notwendigkeit der Implementierung, des Betriebs und der Wartung von Client-Side-Cache-Management-Elementen.
  • Erhöhter CPU- und Speicherverbrauch des Clients.

Fazit

Persönlich war dies eine zufriedenstellende Architekturkomponente, und der Stress für die API-Server und die Latenz waren sehr gering. Ich denke, es wäre gut, Architekturen zukünftig, wenn möglich, mit einer solchen Struktur aufzubauen.