GoSuda

通过 Redis 客户端侧缓存提升响应性

By snowmerak
views ...

What is Redis?

我认为很少有人不知道 Redis。然而,如果用一些特性来简要概括,可以总结如下:

  • 运算在单线程中执行,所有运算都具有原子性
  • 数据存储和运算在 In-Memory 中进行,所有运算都快速
  • Redis 可以根据选项保存 WAL,从而实现快速的最新状态备份恢复
  • 支持 Set、Hash、Bit、List 等多种类型,具有很高的生产力
  • 拥有庞大的社区,可以共享各种经验、问题和解决方案。
  • 经过长时间的开发和运营,具有可靠的稳定性

那么,言归正传

想象一下?

如果您的服务的缓存符合以下两个条件,会怎么样?

  1. 需要向用户提供最新状态的频繁查询数据,但更新不规则,需要频繁更新缓存时。
  2. 无法更新,但需要频繁访问和查询相同的缓存数据时。

第一个案例可以考虑购物中心实时人气排名。当购物中心实时人气排名以 sorted set 存储时,如果每次用户访问主页时都从 Redis 读取,效率会很低。第二个案例是汇率数据,即使汇率数据大约每10分钟公布一次,实际查询也发生得非常频繁。特别是对于韩元-美元、韩元-日元、韩元-人民币的汇率,缓存查询非常频繁。在这些情况下,API 服务器在本地拥有单独的缓存,当数据更改时,重新查询 Redis 并更新会是一种更高效的操作。

那么,如何在数据库 - Redis - API 服务器的结构中实现这种操作呢?

Redis PubSub 不行吗?

使用缓存时,订阅可以接收更新通知的 Channel!

  • 那么就需要创建发送更新消息的逻辑。
  • PubSub 会引入额外的操作,从而影响性能。

pubsub-write

pubsub-read

那么,如果 Redis 感知到更改呢?

使用 Keyspace Notification 接收关于该 Key 的 Command 通知?

  • 存在需要预先存储和共享用于更新的 Key 和 Command 的麻烦。
  • 例如,对于某些 Key,简单的 Set 是更新 Command,而某些 Key 则是 LPush、RPush 或 SAdd 以及 SRem 等,变得复杂。
  • 这将大大增加开发过程中沟通失误和编码中人为错误的发生可能性。

使用 Keyevent Notification 接收 Command 级别的通知?

  • 需要订阅所有用于更新的 Command。并且需要对传入的 Key 进行适当的过滤。
  • 例如,对于通过 Del 传入的所有 Key 中的一部分,该客户端很可能没有本地缓存。
  • 这可能导致不必要的资源浪费。

所以需要 Invalidation Message!

什么是 Invalidation Message?

Invalidation Messages 是 Redis 6.0 以后作为 Server Assisted Client-Side Cache 的一部分提供的一个概念。Invalidation Message 按以下流程传递:

  1. 假设 ClientB 已经读取过 Key。
  2. ClientA 重新设置该 Key。
  3. Redis 感知到更改并向 ClientB 发布 Invalidation Message,通知 ClientB 清除缓存。
  4. ClientB 接收到该消息并采取适当的措施。

invalidation-message

如何使用

基本操作结构

连接到 Redis 的客户端通过执行 CLIENT TRACKING ON REDIRECT <client-id> 来接收 invalidation message。然后,需要接收消息的客户端通过 SUBSCRIBE __redis__:invalidate 来订阅 invalidation message。

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

实现!实现!实现!

Redigo + Ristretto

如果只这样解释,在实际代码中使用时会有些模糊。因此,我们先用 redigoristretto 简单地构建一下。

首先安装两个依赖项。

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // number of keys to track frequency of (10M).
22		MaxCost:     1 << 30, // maximum cost of cache (1GB).
23		BufferItems: 64,      // number of keys per Get buffer.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

首先,简单地创建一个包含 ristretto 和 redigo 的 RedisClient

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

代码稍微有些复杂。

  • 为了进行 Tracking,需要额外建立一个连接。这是考虑到 PubSub 可能会干扰其他操作的措施。
  • 查询新增连接的 ID,并让查询数据的连接将 Tracking 重定向到该连接。
  • 然后订阅 invalidation message。
  • 处理订阅的代码有些复杂。因为 redigo 无法解析 invalidation message,所以必须在解析之前接收响应并进行处理。
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Get 消息如下所示,首先查询 ristretto,如果不存在,则从 Redis 获取。

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

用于测试的代码如上所示。如果您尝试测试一次,您会发现 Redis 中的数据每次更新时都会刷新其值。

然而,这太复杂了。最重要的是,为了扩展到集群,必然需要对所有 master 或 replica 启用 Tracking。

Rueidis

对于 Go 语言而言,我们拥有最现代化且先进的 rueidis。我们将编写使用 rueidis 在 Redis 集群环境下使用 server assisted client side cache 的代码。

首先,安装依赖项。

  • github.com/redis/rueidis

然后,编写查询 Redis 数据的代码。

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

在 rueidis 中,要使用 client side cache,只需执行 DoCache 即可。然后,它会将数据添加到本地缓存中,并决定在本地缓存中保留多长时间,然后再次调用 DoCache 即可从本地缓存中查询数据。当然,它也会正常处理 invalidation message。

为什么不用 redis-go?

redis-go 遗憾的是不支持官方 API 的 server assisted client side cache。甚至在创建 PubSub 时,它会创建一个新的连接,并且没有直接访问该连接的 API,因此也无法知道 client ID。因此,我认为 redis-go 的结构本身就无法实现,所以跳过了。

真棒

通过 client side cache 结构

  • 如果是可预先准备的数据,通过这种结构,可以最大程度地减少对 Redis 的查询和流量,并始终提供最新数据。
  • 通过这种方式,可以创建一种 CQRS 结构,从而显著提高读取性能。

cqrs

提升了多少?

实际上,现场恰好正在使用这种结构,所以我对两个 API 进行了简单的延迟查找。请原谅我只能用非常抽象的描述。

  1. 第一个 API
    1. 首次查询时:平均 14.63ms
    2. 后续查询时:平均 2.82ms
    3. 平均差距:10.98ms
  2. 第二个 API
    1. 首次查询时:平均 14.05ms
    2. 后续查询时:平均 1.60ms
    3. 平均差距:11.57ms

延迟最多提升了约 82%!

我期望有以下改进:

  • 省略客户端与 Redis 之间的网络通信过程并节省流量
  • 减少 Redis 本身需要执行的读取 Command 数量
    • 这也提高了写入性能。
  • 最小化 Redis 协议解析
    • 解析 Redis 协议并非没有成本。减少这部分是巨大的机会。

然而,一切都是权衡。为此,我们至少牺牲了以下两点:

  • 需要实现、操作和维护客户端缓存管理要素
  • 由此导致客户端的 CPU 和内存使用量增加

结论

就我个人而言,这是一个令人满意的架构组件,并且对延迟和 API 服务器的压力都非常小。我希望将来如果可能的话,能够以这种结构构建架构。