Améliorer la réactivité avec le cache côté client de Redis
Qu'est-ce que Redis ?
Je pense que peu de gens ne connaissent pas Redis. Cependant, pour en donner un bref aperçu à travers quelques-unes de ses caractéristiques, on pourrait le résumer comme suit :
- Les opérations sont effectuées en mono-thread, ce qui confère à toutes les opérations une atomicité.
- Les données sont stockées et traitées en mémoire, ce qui rend toutes les opérations rapides.
- Redis peut stocker des WAL en option, permettant une sauvegarde et une récupération rapides de l'état le plus récent.
- Il prend en charge plusieurs types tels que Set, Hash, Bit, List, offrant une productivité élevée.
- Il dispose d'une grande communauté, permettant de partager diverses expériences, problèmes et solutions.
- Il est développé et exploité depuis longtemps, garantissant une stabilité fiable.
Entrons dans le vif du sujet
Imaginez ?
Que se passerait-il si le cache de votre service remplissait les deux conditions suivantes ?
- Vous devez fournir aux utilisateurs les données les plus récentes, mais la mise à jour est irrégulière, nécessitant des mises à jour fréquentes du cache.
- Les mises à jour ne sont pas nécessaires, mais les mêmes données du cache doivent être consultées fréquemment.
Le premier cas peut être illustré par le classement en temps réel des articles populaires d'un centre commercial. Lorsque le classement en temps réel des articles populaires est stocké dans un sorted set, il serait inefficace que l'utilisateur lise ce classement depuis Redis chaque fois qu'il accède à la page principale. Dans le second cas, pour les données de taux de change, même si les données de taux de change sont publiées approximativement toutes les 10 minutes, les requêtes réelles sont très fréquentes. De plus, les requêtes de cache pour le won-dollar, le won-yen et le won-yuan sont extrêmement fréquentes. Dans ces cas, il serait plus efficace que le serveur API dispose d'un cache local séparé et, lorsque les données changent, qu'il interroge à nouveau Redis pour les mettre à jour.
Alors, comment peut-on implémenter un tel comportement dans une architecture Base de données - Redis - Serveur API ?
Impossible avec Redis PubSub ?
Lors de l'utilisation du cache, abonnez-vous à un canal pour recevoir les notifications de mise à jour !
- Il faudrait alors créer une logique pour envoyer des messages lors de la mise à jour.
- L'ajout d'opérations dues à PubSub affecte les performances.
Et si Redis détectait les changements ?
En utilisant Keyspace Notification pour recevoir des notifications de commande pour une clé donnée ?
- Il y a la complexité de devoir stocker et partager à l'avance les clés et les commandes utilisées pour la mise à jour.
- Par exemple, pour certaines clés, un simple Set est une commande de mise à jour, et pour d'autres, LPush, RPush, SAdd ou SRem sont des commandes de mise à jour, ce qui rend les choses plus complexes.
- Cela augmente considérablement la probabilité d'erreurs de communication et d'erreurs humaines dans le code pendant le processus de développement.
En utilisant Keyevent Notification pour recevoir des notifications au niveau des commandes ?
- Il est nécessaire de s'abonner à toutes les commandes utilisées pour la mise à jour. Un filtrage approprié des clés entrantes est alors requis.
- Par exemple, il est fort probable que le client ne dispose pas d'un cache local pour certaines des clés entrantes via Del.
- Cela peut entraîner un gaspillage inutile de ressources.
D'où la nécessité de l'Invalidation Message !
Qu'est-ce qu'un Invalidation Message ?
Les Invalidation Messages sont un concept introduit avec Redis 6.0, faisant partie du Server Assisted Client-Side Cache. Un Invalidation Message est transmis selon le flux suivant :
- Supposons que ClientB ait déjà lu une fois la clé.
- ClientA définit une nouvelle valeur pour cette clé.
- Redis détecte le changement et publie un Invalidation Message à ClientB pour l'informer de vider son cache.
- ClientB reçoit ce message et prend les mesures appropriées.
Comment l'utiliser
Structure de fonctionnement de base
Un client connecté à Redis active la réception des messages d'invalidation en exécutant CLIENT TRACKING ON REDIRECT <client-id>
. Ensuite, le client qui doit recevoir les messages s'abonne à SUBSCRIBE __redis__:invalidate
pour les recevoir.
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Implémentation ! Implémentation ! Implémentation !
Redigo + Ristretto
Une telle explication pourrait rendre l'utilisation réelle dans le code un peu ambiguë. Par conséquent, nous allons commencer par une configuration simple avec redigo
et ristretto
.
Installez d'abord les deux dépendances.
github.com/gomodule/redigo
github.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // nombre de clés dont la fréquence est suivie (10M).
22 MaxCost: 1 << 30, // coût maximum du cache (1GB).
23 BufferItems: 64, // nombre de clés par tampon Get.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Nous créons d'abord un RedisClient
simple qui inclut ristretto et redigo.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
Le code est un peu complexe.
- Pour le Tracking, une connexion supplémentaire est établie. Cette mesure est prise pour éviter que PubSub n'interfère avec d'autres opérations.
- L'ID de la connexion ajoutée est récupéré, et la connexion qui interrogera les données redirige le Tracking vers cette connexion.
- Ensuite, les messages d'invalidation sont souscrits.
- Le code de gestion de la souscription est un peu complexe. Étant donné que redigo ne gère pas le parsing des messages d'invalidation, la réponse doit être traitée avant le parsing.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Le message Get
interroge d'abord Ristretto et, si la valeur n'est pas trouvée, la récupère de Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
Le code de test est celui ci-dessus. Si vous le testez, vous pourrez constater que les valeurs sont mises à jour chaque fois que les données sont actualisées dans Redis.
Mais c'est trop complexe. De plus, pour l'extension au cluster, il est inévitable d'activer le Tracking pour tous les maîtres ou réplicas.
Rueidis
Pour les utilisateurs de Go, nous disposons de rueidis
, la bibliothèque la plus moderne et la plus avancée. Nous allons écrire du code utilisant le cache côté client assisté par serveur dans un environnement de cluster Redis avec rueidis.
Tout d'abord, installez la dépendance.
github.com/redis/rueidis
Puis, écrivez le code pour interroger les données dans Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
Avec rueidis, pour utiliser le cache côté client, il suffit d'appeler DoCache
. Cela ajoute la donnée au cache local, en spécifiant sa durée de vie. Un appel ultérieur à DoCache
récupérera la donnée directement depuis le cache local. Bien entendu, les messages d'invalidation sont également traités correctement.
Pourquoi pas redis-go ?
Malheureusement, redis-go
ne prend pas en charge officiellement le cache côté client assisté par le serveur via son API. De plus, lors de la création d'un PubSub, il n'y a pas d'API permettant d'accéder directement à la nouvelle connexion créée, ce qui rend impossible de connaître l'ID du client. Par conséquent, il a été jugé que redis-go
ne permettait pas une configuration viable et a été écarté.
C'est sexy
Grâce à la structure du cache côté client
- Si les données peuvent être préparées à l'avance, cette structure permet de minimiser les requêtes et le trafic vers Redis tout en fournissant toujours les données les plus récentes.
- Cela permet de créer une sorte de structure CQRS, améliorant considérablement les performances de lecture.
À quel point est-ce devenu plus sexy ?
En fait, une telle structure est actuellement utilisée sur le terrain, et j'ai examiné les latences de deux API. Veuillez excuser le fait que je ne puisse les décrire que de manière très abstraite.
- Première API
- Première consultation : moyenne de 14.63ms
- Consultations ultérieures : moyenne de 2.82ms
- Écart moyen : 10.98ms
- Deuxième API
- Première consultation : moyenne de 14.05ms
- Consultations ultérieures : moyenne de 1.60ms
- Écart moyen : 11.57ms
Il y a eu une amélioration supplémentaire de la latence allant jusqu'à 82 % !
Les améliorations suivantes sont attendues :
- Suppression du processus de communication réseau entre le client et Redis et économie de trafic.
- Réduction du nombre de commandes de lecture que Redis doit exécuter.
- Cela a également pour effet d'améliorer les performances d'écriture.
- Minimisation de l'analyse du protocole Redis.
- L'analyse du protocole Redis n'est pas sans coût. La réduire représente une opportunité majeure.
Cependant, tout est une question de compromis. Pour cela, nous avons au moins sacrifié les deux éléments suivants :
- Nécessité d'implémenter, d'opérer et de maintenir la gestion des éléments du cache côté client.
- Augmentation de l'utilisation du CPU et de la mémoire côté client en conséquence.
Conclusion
Personnellement, cette architecture s'est avérée satisfaisante, avec une latence et un stress sur le serveur API très faibles. J'envisage de continuer à architecturer les systèmes de cette manière si possible.