通过 Redis 客户端侧缓存提升响应性
What is Redis?
我认为很少有人不知道 Redis。然而,如果用一些特性来简要概括,可以总结如下:
- 运算在单线程中执行,所有运算都具有原子性。
- 数据存储和运算在 In-Memory 中进行,所有运算都快速。
- Redis 可以根据选项保存 WAL,从而实现快速的最新状态备份和恢复。
- 支持 Set、Hash、Bit、List 等多种类型,具有很高的生产力。
- 拥有庞大的社区,可以共享各种经验、问题和解决方案。
- 经过长时间的开发和运营,具有可靠的稳定性。
那么,言归正传
想象一下?
如果您的服务的缓存符合以下两个条件,会怎么样?
- 需要向用户提供最新状态的频繁查询数据,但更新不规则,需要频繁更新缓存时。
- 无法更新,但需要频繁访问和查询相同的缓存数据时。
第一个案例可以考虑购物中心实时人气排名。当购物中心实时人气排名以 sorted set 存储时,如果每次用户访问主页时都从 Redis 读取,效率会很低。第二个案例是汇率数据,即使汇率数据大约每10分钟公布一次,实际查询也发生得非常频繁。特别是对于韩元-美元、韩元-日元、韩元-人民币的汇率,缓存查询非常频繁。在这些情况下,API 服务器在本地拥有单独的缓存,当数据更改时,重新查询 Redis 并更新会是一种更高效的操作。
那么,如何在数据库 - Redis - API 服务器的结构中实现这种操作呢?
Redis PubSub 不行吗?
使用缓存时,订阅可以接收更新通知的 Channel!
- 那么就需要创建发送更新消息的逻辑。
- PubSub 会引入额外的操作,从而影响性能。
那么,如果 Redis 感知到更改呢?
使用 Keyspace Notification 接收关于该 Key 的 Command 通知?
- 存在需要预先存储和共享用于更新的 Key 和 Command 的麻烦。
- 例如,对于某些 Key,简单的 Set 是更新 Command,而某些 Key 则是 LPush、RPush 或 SAdd 以及 SRem 等,变得复杂。
- 这将大大增加开发过程中沟通失误和编码中人为错误的发生可能性。
使用 Keyevent Notification 接收 Command 级别的通知?
- 需要订阅所有用于更新的 Command。并且需要对传入的 Key 进行适当的过滤。
- 例如,对于通过 Del 传入的所有 Key 中的一部分,该客户端很可能没有本地缓存。
- 这可能导致不必要的资源浪费。
所以需要 Invalidation Message!
什么是 Invalidation Message?
Invalidation Messages 是 Redis 6.0 以后作为 Server Assisted Client-Side Cache 的一部分提供的一个概念。Invalidation Message 按以下流程传递:
- 假设 ClientB 已经读取过 Key。
- ClientA 重新设置该 Key。
- Redis 感知到更改并向 ClientB 发布 Invalidation Message,通知 ClientB 清除缓存。
- ClientB 接收到该消息并采取适当的措施。
如何使用
基本操作结构
连接到 Redis 的客户端通过执行 CLIENT TRACKING ON REDIRECT <client-id>
来接收 invalidation message。然后,需要接收消息的客户端通过 SUBSCRIBE __redis__:invalidate
来订阅 invalidation message。
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
实现!实现!实现!
Redigo + Ristretto
如果只这样解释,在实际代码中使用时会有些模糊。因此,我们先用 redigo
和 ristretto
简单地构建一下。
首先安装两个依赖项。
github.com/gomodule/redigo
github.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // number of keys to track frequency of (10M).
22 MaxCost: 1 << 30, // maximum cost of cache (1GB).
23 BufferItems: 64, // number of keys per Get buffer.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
首先,简单地创建一个包含 ristretto 和 redigo 的 RedisClient
。
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
代码稍微有些复杂。
- 为了进行 Tracking,需要额外建立一个连接。这是考虑到 PubSub 可能会干扰其他操作的措施。
- 查询新增连接的 ID,并让查询数据的连接将 Tracking 重定向到该连接。
- 然后订阅 invalidation message。
- 处理订阅的代码有些复杂。因为 redigo 无法解析 invalidation message,所以必须在解析之前接收响应并进行处理。
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Get
消息如下所示,首先查询 ristretto,如果不存在,则从 Redis 获取。
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
用于测试的代码如上所示。如果您尝试测试一次,您会发现 Redis 中的数据每次更新时都会刷新其值。
然而,这太复杂了。最重要的是,为了扩展到集群,必然需要对所有 master 或 replica 启用 Tracking。
Rueidis
对于 Go 语言而言,我们拥有最现代化且先进的 rueidis
。我们将编写使用 rueidis
在 Redis 集群环境下使用 server assisted client side cache 的代码。
首先,安装依赖项。
github.com/redis/rueidis
然后,编写查询 Redis 数据的代码。
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
在 rueidis 中,要使用 client side cache,只需执行 DoCache
即可。然后,它会将数据添加到本地缓存中,并决定在本地缓存中保留多长时间,然后再次调用 DoCache
即可从本地缓存中查询数据。当然,它也会正常处理 invalidation message。
为什么不用 redis-go?
redis-go
遗憾的是不支持官方 API 的 server assisted client side cache。甚至在创建 PubSub 时,它会创建一个新的连接,并且没有直接访问该连接的 API,因此也无法知道 client ID。因此,我认为 redis-go
的结构本身就无法实现,所以跳过了。
真棒
通过 client side cache 结构
- 如果是可预先准备的数据,通过这种结构,可以最大程度地减少对 Redis 的查询和流量,并始终提供最新数据。
- 通过这种方式,可以创建一种 CQRS 结构,从而显著提高读取性能。
提升了多少?
实际上,现场恰好正在使用这种结构,所以我对两个 API 进行了简单的延迟查找。请原谅我只能用非常抽象的描述。
- 第一个 API
- 首次查询时:平均 14.63ms
- 后续查询时:平均 2.82ms
- 平均差距:10.98ms
- 第二个 API
- 首次查询时:平均 14.05ms
- 后续查询时:平均 1.60ms
- 平均差距:11.57ms
延迟最多提升了约 82%!
我期望有以下改进:
- 省略客户端与 Redis 之间的网络通信过程并节省流量
- 减少 Redis 本身需要执行的读取 Command 数量
- 这也提高了写入性能。
- 最小化 Redis 协议解析
- 解析 Redis 协议并非没有成本。减少这部分是巨大的机会。
然而,一切都是权衡。为此,我们至少牺牲了以下两点:
- 需要实现、操作和维护客户端缓存管理要素
- 由此导致客户端的 CPU 和内存使用量增加
结论
就我个人而言,这是一个令人满意的架构组件,并且对延迟和 API 服务器的压力都非常小。我希望将来如果可能的话,能够以这种结构构建架构。