Redisのクライアントサイドキャッシュによる応答性の向上
Redisとは何か?
Redisをご存知ない方は少ないと存じますが、いくつかの特性を簡潔に述べるならば、以下のように整理できます。
- シングルスレッドで演算が実行されるため、全ての演算がアトミック性を有します。
- インメモリにデータが保存され演算されるため、全ての演算が高速です。
- RedisはオプションによりWALを保存できるため、迅速に最新の状態をバックアップし復元できます。
- Set、Hash、Bit、Listなど多様な型をサポートしており、高い生産性を有します。
- 大規模なコミュニティを有しており、多様な経験、課題、解決策を共有できます。
- 長期間にわたり開発および運用されているため、信頼できる安定性があります。
本題へ
想像してみてください。
もし皆様のサービスのキャッシュが次の2つの条件に合致する場合、どうなるでしょうか?
- 頻繁に参照されるデータを最新の状態でユーザーに提供する必要があるが、更新が不規則であるため、頻繁にキャッシュを更新しなければならない場合。
- 更新はされないが、同一のキャッシュデータに頻繁にアクセスして参照する必要がある場合。
最初のケースでは、ショッピングモールのリアルタイム人気ランキングが考えられます。ショッピングモールのリアルタイム人気ランキングをsorted setとして保存した場合、Redisからユーザーがメインページにアクセスするたびに読み込むのは非効率です。 2番目のケースでは、為替データについて、約10分ごとに為替データが公示されても、実際の参照は非常に頻繁に発生します。特に円ドル、円円、円元については、非常に頻繁にキャッシュを参照することになります。このようなケースでは、APIサーバーがローカルに別途キャッシュを保持し、データが変更されたらRedisを再度参照して更新する方が効率的な動作となるでしょう。
それでは、データベース - Redis - APIサーバーの構造において、このような動作をどのように実装できるでしょうか?
Redis PubSubではだめなのか?
キャッシュを使用する際、更新の有無を受け取れるチャネルを購読しよう!
- そうすると、更新時にメッセージを送信するロジックを構築する必要があります。
- PubSubによる追加動作が入るため、パフォーマンスに影響を与えます。
ではRedisが変更を感知するなら?
Keyspace Notificationを使用して、該当キーに対するコマンド通知を受け取る場合は?
- 更新に用いられるキーとコマンドを事前に保存し共有する手間が存在します。
- 例えば、あるキーに対しては単純なSetが更新コマンドであり、別のキーはLPush、あるいはRPushやSAddおよびSRemが更新コマンドとなるなど、複雑さが増します。
- これは、開発過程におけるコミュニケーションミスやコーディングでのヒューマンエラー発生の可能性を大幅に増加させます。
Keyevent Notificationを使用して、コマンド単位で通知を受け取る場合は?
- 更新に用いられる全てのコマンドに対する購読が必要です。そこから入ってくるキーに対して適切なフィルタリングが必要です。
- 例えば、Delで入ってくる全てのキーの一部について、当該クライアントはローカルキャッシュを持っていない可能性が高いです。
- これは不必要なリソースの浪費につながる可能性があります。
そこで必要となるのがInvalidation Message!
Invalidation Messageとは?
Invalidation Messagesは、Redis 6.0から追加されたServer Assisted Client-Side Cacheの一環として提供される概念です。Invalidation Messageは以下の流れで伝達されます。
- ClientBが既にkeyを一度読み込んだと仮定します。
- ClientAが当該keyを新しく設定します。
- Redisは変更を感知し、ClientBにInvalidation Messageを発行してClientBにキャッシュを消去するよう通知します。
- ClientBは当該メッセージを受け取り、適切な措置を講じます。
使用方法
基本動作構造
Redisに接続されたクライアントがCLIENT TRACKING ON REDIRECT <client-id>
を実行することで、invalidation messageを受け取るようにします。そして、メッセージを受け取るべきクライアントはSUBSCRIBE __redis__:invalidate
でinvalidation messageを受け取るように購読します。
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
実装!実装!実装!
Redigo + Ristretto
これだけ説明されても、実際にコード上でどのように使用すればよいか曖昧です。そこで、簡単にredigo
とristretto
で構成してみましょう。
まず、2つの依存関係をインストールします。
github.com/gomodule/redigo
github.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // 頻度を追跡するキーの数 (10M)。
22 MaxCost: 1 << 30, // キャッシュの最大コスト (1GB)。
23 BufferItems: 64, // Getバッファあたりのキーの数。
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
まず、ristrettoとredigoを含むRedisClient
を簡単に作成します。
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
コードが少し複雑です。
- トラッキングのために、もう一つ接続を確立します。これは、PubSubが他の動作の妨げになることを考慮した措置です。
- 追加された接続のIDを照会し、データを照会する接続からトラッキングを当該接続にリダイレクトさせます。
- そして、invalidation messageを購読します。
- 購読を処理するコードが少し複雑です。redigoは無効化メッセージのパースに対応していないため、パース前の応答を受け取って処理する必要があります。
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Get
メッセージは、上記のようにまずristrettoを照会し、なければRedisから取得するようにします。
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
テストするためのコードは上記の通りです。一度テストしてみると、Redisでデータが更新されるたびに新しい値が更新されることを確認できます。
しかし、これはあまりにも複雑です。何よりも、クラスターに対して拡張するためには、必然的にすべてのマスター、あるいはレプリカに対してトラッキングを有効にする必要があります。
Rueidis
Go言語を使用する以上、我々には最もモダンで進化したrueidis
があります。rueidisを使用したRedisクラスター環境でのサーバーアシスト型クライアントサイドキャッシュを使用するコードを記述してみましょう。
まず、依存関係をインストールします。
github.com/redis/rueidis
そして、Redisからデータを照会するコードを記述します。
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
rueidisには、クライアントサイドキャッシュを使用するためにDoCache
を実行するだけで十分です。そうすると、ローカルキャッシュにどれだけ維持するかといった情報がローカルキャッシュに追加され、同様にDoCache
を呼び出すと、ローカルキャッシュ内からデータが照会されて取得されます。当然、無効化メッセージも正常に処理されます。
なぜredis-goではないのか?
redis-go
は残念ながら公式APIとしてサーバーアシスト型クライアントサイドキャッシュをサポートしていません。さらに、PubSubを生成する際に新しい接続を作成し、その接続に直接アクセスするAPIがないため、クライアントIDを知ることもできません。そのため、redis-go
は構成自体が不可能であると判断し、スキップしました。
魅力的ですね
クライアントサイドキャッシュ構造を通じて
- 事前に準備できるデータであれば、この構造を通じてRedisへのクエリおよびトラフィックを最小限に抑えつつ、常に最新のデータを提供できるでしょう。
- これにより、一種のCQRS構造を構築し、読み取り性能を飛躍的に向上させることができます。
どのくらい魅力的になったのか?
実際に現場でこのような構造で使用されているため、2つのAPIについて簡単なレイテンシを調べてみました。非常に抽象的にしか記述できない点をご容赦ください。
- 最初のAPI
- 初回照会時:平均14.63ms
- その後照会時:平均2.82ms
- 平均差:10.98ms
- 2番目のAPI
- 初回照会時:平均14.05ms
- その後照会時:平均1.60ms
- 平均差:11.57ms
最大で約82%の追加的なレイテンシ改善がありました!
おそらく、次のような改善点があったものと期待されます。
- クライアントとRedis間のネットワーク通信過程の省略およびトラフィックの節約
- Redis自体が実行すべき読み取りコマンド数の削減
- これは書き込み性能も向上させる効果があります。
- Redisプロトコルのパース最小化
- Redisプロトコルをパースするコストはゼロではありません。これを削減できるのは大きな機会です。
しかし、全てはトレードオフです。このために、私たちは少なくとも以下の2つを犠牲にしました。
- クライアントサイドキャッシュ管理要素の実装、運用、および保守の必要性
- これによるクライアントのCPUおよびメモリ使用量の増加
結論
個人的には満足のいくアーキテクチャ構成要素であり、レイテンシおよびAPIサーバーへの負荷も非常に少なかったです。今後も可能であれば、このような構造でアーキテクチャを構成できれば良いと考えております。