Повышение реактивности с помощью кеша на стороне клиента Redis
Что такое Redis?
Я полагаю, что мало кто не знаком с Redis. Тем не менее, можно кратко упомянуть некоторые его характеристики, которые можно обобщить следующим образом:
- Операции выполняются в однопоточном режиме, что обеспечивает атомарность всех операций.
- Данные хранятся и обрабатываются In-Memory, что обеспечивает высокую скорость всех операций.
- В зависимости от опций, Redis может сохранять WAL, что позволяет быстро резервировать и восстанавливать последнее состояние.
- Поддерживает различные типы данных, такие как Set, Hash, Bit, List, что обеспечивает высокую производительность.
- Имеет большое сообщество, что позволяет обмениваться опытом, проблемами и решениями.
- Разрабатывается и эксплуатируется в течение длительного времени, что обеспечивает надежную стабильность.
Переходим к сути
Представьте себе
Что, если кеш вашего сервиса соответствует следующим двум условиям?
- Необходимо предоставлять пользователю данные, которые часто запрашиваются, в актуальном состоянии, но обновление нерегулярно, что требует частых обновлений кеша.
- Обновление не происходит, но к одним и тем же кешированным данным требуется частый доступ для запросов.
Первый случай можно рассмотреть на примере рейтинга популярности товаров в реальном времени в интернет-магазине. Если рейтинг популярности товаров в реальном времени хранится в виде отсортированного набора, неэффективно, если пользователь будет считывать его из Redis каждый раз, когда он заходит на главную страницу. Второй случай касается данных о курсах валют: даже если данные о курсах валют объявляются примерно каждые 10 минут, фактические запросы происходят очень часто. Более того, запросы к кешу для курсов вон-доллар, вон-иена и вон-юань происходят очень часто. В таких случаях API-серверу было бы эффективнее иметь отдельный кеш локально и обновлять его из Redis при изменении данных.
Итак, как реализовать такое поведение в архитектуре "база данных - Redis - API-сервер"?
Разве Redis PubSub не подходит?
При использовании кеша подписывайтесь на канал, который может получать информацию об обновлении!
- Тогда необходимо создать логику для отправки сообщений при обновлении.
- Дополнительные операции, вызванные PubSub, влияют на производительность.
А что, если Redis обнаружит изменения?
Что, если использовать Keyspace Notification для получения уведомлений о командах для конкретного ключа?
- Существует неудобство, связанное с необходимостью предварительного сохранения и совместного использования ключей и команд, используемых для обновления.
- Например, для одного ключа обновляющей командой является простой Set, а для другого — LPush, или RPush, или SAdd, или SRem, что усложняет ситуацию.
- Это значительно увеличивает вероятность ошибок в коммуникации и человеческих ошибок при кодировании в процессе разработки.
Что, если использовать Keyevent Notification для получения уведомлений на уровне команд?
- Требуется подписка на все команды, используемые для обновления. При этом необходима соответствующая фильтрация ключей, поступающих оттуда.
- Например, для некоторых из всех ключей, поступающих от команды Del, у данного клиента, скорее всего, нет локального кеша.
- Это может привести к ненужной трате ресурсов.
Поэтому необходимы Invalidation Message!
Что такое Invalidation Message?
Invalidation Messages — это концепция, представленная в Redis 6.0 как часть Server Assisted Client-Side Cache. Invalidation Message передаются следующим образом:
- Предполагается, что ClientB уже однажды прочитал ключ.
- ClientA устанавливает новый ключ.
- Redis обнаруживает изменение и отправляет Invalidation Message ClientB, уведомляя ClientB об удалении кеша.
- ClientB получает это сообщение и предпринимает соответствующие действия.
Как это использовать?
Базовая структура работы
Клиент, подключенный к Redis, запускает CLIENT TRACKING ON REDIRECT <client-id>
, чтобы получать invalidation message. А клиент, который должен получать сообщения, подписывается на SUBSCRIBE __redis__:invalidate
, чтобы получать invalidation message.
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Реализация! Реализация! Реализация!
Redigo + Ristretto
Если объяснять только так, то не совсем понятно, как это использовать в коде. Поэтому сначала попробуем настроить это с помощью redigo
и ristretto
.
Сначала установите две зависимости.
github.com/gomodule/redigo
github.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // number of keys to track frequency of (10M).
22 MaxCost: 1 << 30, // maximum cost of cache (1GB).
23 BufferItems: 64, // number of keys per Get buffer.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Сначала создается простой RedisClient
, включающий ristretto и redigo.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
Код немного сложен.
- Для Tracking устанавливается еще одно соединение. Это делается с учетом того, что PubSub может мешать другим операциям.
- Идентификатор добавленного соединения запрашивается, чтобы соединение, которое будет запрашивать данные, перенаправляло Tracking на это соединение.
- Затем подписывается на invalidation message.
- Код обработки подписки немного сложен. Поскольку redigo не может анализировать сообщения об аннулировании, необходимо получить ответ до анализа.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Сообщение Get
сначала запрашивает ristretto, и если его там нет, получает его из Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
Код для тестирования приведен выше. Если вы попробуете его, то увидите, что значения обновляются каждый раз, когда данные в Redis обновляются.
Однако это слишком сложно. Более того, для расширения до кластера неизбежно потребуется включить Tracking для всех мастеров или реплик.
Rueidis
Поскольку мы используем Go, у нас есть rueidis
, самый современный и развитый. Давайте напишем код, использующий server assisted client side cache в кластерной среде Redis с помощью rueidis.
Сначала установите зависимость.
github.com/redis/rueidis
Затем напишите код для запроса данных из Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
В rueidis для использования client side cache достаточно просто вызвать DoCache
. Это добавит данные в локальный кеш с указанием времени его сохранения, и при повторном вызове DoCache
данные будут извлечены из локального кеша. Естественно, сообщения об аннулировании также обрабатываются корректно.
Почему не redis-go?
К сожалению, redis-go
не поддерживает server assisted client side cache через официальный API. Более того, при создании PubSub он создает новое соединение, и нет API для прямого доступа к этому соединению, поэтому невозможно узнать идентификатор клиента. Поэтому redis-go
был пропущен, поскольку его конфигурация была признана невозможной.
Это привлекательно
Через структуру client side cache
- Если данные могут быть заранее подготовлены, эта структура позволит минимизировать запросы и трафик к Redis, всегда предоставляя актуальные данные.
- Это позволит создать своего рода структуру CQRS, что значительно улучшит производительность чтения.
Насколько это стало привлекательнее?
На самом деле, поскольку такая структура уже используется на практике, я провел простой анализ задержек для двух API. Прошу прощения за то, что могу описать это лишь очень абстрактно.
- Первый API
- При первом запросе: в среднем 14.63 мс
- При последующих запросах: в среднем 2.82 мс
- Средняя разница: 10.98 мс
- Второй API
- При первом запросе: в среднем 14.05 мс
- При последующих запросах: в среднем 1.60 мс
- Средняя разница: 11.57 мс
Было достигнуто дополнительное улучшение задержки до 82%!
Вероятно, были следующие улучшения:
- Исключение сетевого взаимодействия между клиентом и Redis и экономия трафика.
- Уменьшение количества команд чтения, которые должен выполнять сам Redis.
- Это также повышает производительность записи.
- Минимизация парсинга протокола Redis.
- Парсинг протокола Redis также не является бесплатным. Уменьшение этого — большая возможность.
Однако все это компромиссы. Для этого мы пожертвовали как минимум двумя вещами:
- Необходимость реализации, эксплуатации и обслуживания элементов управления клиентским кешем.
- Увеличение использования ЦП и памяти клиентом из-за этого.
Заключение
Лично я был доволен этой архитектурной составляющей, и задержка, а также нагрузка на API-сервер были очень низкими. В будущем я надеюсь, что по возможности буду строить архитектуру именно таким образом.