Повышение реактивности с помощью кеша на стороне клиента Redis
Что такое Redis?
Полагаю, мало кто не знает Redis. Тем не менее, можно кратко упомянуть и классифицировать его по нескольким характеристикам следующим образом:
- Операции выполняются в однопоточном режиме, что обеспечивает атомарность всех операций.
- Данные хранятся и обрабатываются In-Memory, что делает все операции быстрыми.
- Redis может сохранять WAL в зависимости от опций, что позволяет быстро резервировать и восстанавливать последнее состояние.
- Поддерживает различные типы данных, такие как Set, Hash, Bit, List, обеспечивая высокую производительность.
- Имеет большое сообщество, что позволяет делиться разнообразным опытом, проблемами и решениями.
- Разрабатывается и эксплуатируется в течение длительного времени, что обеспечивает надежную стабильность.
Переходим к сути
Представьте себе
Что, если кеш вашего сервиса соответствует следующим двум условиям?
- Необходимо предоставлять пользователю часто запрашиваемые данные в актуальном состоянии, но обновление происходит нерегулярно, что требует частой инвалидации кеша.
- Обновление не происходит, но требуется частый доступ к одним и тем же данным кеша для их получения.
В первом случае можно рассмотреть рейтинг популярности товаров в реальном времени в интернет-магазине. Если рейтинг популярности товаров в реальном времени хранится в sorted set, то неэффективно считывать его из Redis каждый раз, когда пользователь заходит на главную страницу. Во втором случае, даже если данные о курсах валют публикуются примерно каждые 10 минут, фактические запросы происходят очень часто. При этом запросы к кешу для пар "вона-доллар", "вона-иена", "вона-юань" происходят очень часто. В таких случаях более эффективным будет, если API-сервер будет иметь отдельный локальный кеш, а при изменении данных будет повторно запрашивать Redis для обновления.
Как же реализовать такое поведение в архитектуре "база данных - Redis - API-сервер"?
Разве нельзя использовать Redis PubSub?
При использовании кеша подпишитесь на канал, который может получать уведомления об обновлении!
- Тогда необходимо создать логику для отправки сообщений при обновлении.
- Дополнительные действия, вызванные PubSub, влияют на производительность.


Что, если Redis будет обнаруживать изменения?
Использовать Keyspace Notification для получения уведомлений о командах, связанных с конкретным ключом?
- Существует неудобство, связанное с необходимостью предварительного сохранения и совместного использования ключей и команд, используемых для обновления.
- Например, для одних ключей командой обновления является простой Set, для других — LPush или RPush, а для третьих — SAdd или SRem, что усложняет процесс.
- Это значительно увеличивает вероятность ошибок коммуникации и человеческих ошибок при кодировании в процессе разработки.
Использовать Keyevent Notification для получения уведомлений на уровне команд?
- Требуется подписка на все команды, используемые для обновления. При этом необходима соответствующая фильтрация входящих ключей.
- Например, для некоторых ключей, поступающих с командой Del, клиент, скорее всего, не будет иметь локального кеша.
- Это может привести к ненужной трате ресурсов.
Поэтому необходим Invalidation Message!
Что такое Invalidation Message?
Invalidation Messages — это концепция, представленная в Redis 6.0 как часть Server Assisted Client-Side Cache. Invalidation Message передается следующим образом:
- Предположим, ClientB уже один раз прочитал key.
- ClientA устанавливает новый key.
- Redis обнаруживает изменение и отправляет Invalidation Message ClientB, уведомляя его о необходимости очистки кеша.
- ClientB получает это сообщение и предпринимает соответствующие действия.

Как это использовать
Базовая структура работы
Клиент, подключенный к Redis, выполняет команду CLIENT TRACKING ON REDIRECT <client-id>, чтобы получать invalidation message. Клиент, который должен получать сообщения, подписывается на SUBSCRIBE __redis__:invalidate, чтобы получать invalidation message.
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Реализация! Реализация! Реализация!
Redigo + Ristretto
Такое объяснение может быть не совсем ясным для практического использования в коде. Поэтому сначала мы настроим его с помощью redigo и ristretto.
Сначала установите две зависимости.
github.com/gomodule/redigogithub.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // количество ключей для отслеживания частоты (10М).
22 MaxCost: 1 << 30, // максимальная стоимость кеша (1ГБ).
23 BufferItems: 64, // количество ключей на буфер Get.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Сначала мы просто создаем RedisClient, который включает ristretto и redigo.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
Код немного сложен.
- Для Tracking устанавливается еще одно соединение. Это делается для того, чтобы PubSub не мешал другим операциям.
- Идентификатор добавленного соединения запрашивается, а затем соединение, которое будет запрашивать данные, перенаправляет Tracking на это соединение.
- Затем подписывается на invalidation message.
- Код обработки подписки немного сложен. Поскольку redigo не поддерживает парсинг сообщений об инвалидации, необходимо получать и обрабатывать ответ до парсинга.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Сообщение Get сначала запрашивает ristretto, и если оно не найдено, получает его из Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
Код для тестирования приведен выше. Если вы его протестируете, то увидите, что данные обновляются каждый раз, когда они изменяются в Redis.
Однако это слишком сложно. Более того, для масштабирования кластера неизбежно потребуется активировать Tracking для всех master-узлов или реплик.
Rueidis
Поскольку мы используем язык Go, у нас есть самый современный и продвинутый rueidis. Мы напишем код, использующий server assisted client side cache в среде кластера Redis с помощью rueidis.
Сначала установите зависимость.
github.com/redis/rueidis
Затем напишите код для запроса данных из Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
В rueidis для использования client side cache достаточно просто вызвать DoCache. Это добавит данные в локальный кеш с указанием срока его хранения, и при последующих вызовах DoCache данные будут извлекаться из локального кеша. Естественно, сообщения об инвалидации также обрабатываются корректно.
Почему не redis-go?
redis-go, к сожалению, не поддерживает server assisted client side cache через официальный API. Более того, при создании PubSub он создает новое соединение, и нет API для прямого доступа к этому соединению, поэтому невозможно узнать client id. Таким образом, было решено пропустить redis-go, поскольку его конфигурация в принципе невозможна.
Элегантно
Через структуру client side cache
- Если данные могут быть подготовлены заранее, эта структура позволит минимизировать запросы к Redis и трафик, а также всегда предоставлять актуальные данные.
- Это позволит создать своего рода структуру CQRS, значительно увеличив производительность чтения.

Насколько это стало элегантнее?
Поскольку на практике уже используется такая структура, я проверил задержку для двух API. Прошу прощения за абстрактность изложения.
- Первый API
- При первом запросе: в среднем 14.63 мс
- При последующих запросах: в среднем 2.82 мс
- Средняя разница: 10.98 мс
- Второй API
- При первом запросе: в среднем 14.05 мс
- При последующих запросах: в среднем 1.60 мс
- Средняя разница: 11.57 мс
Было достигнуто дополнительное улучшение задержки до 82%!
Предполагается, что были следующие улучшения:
- Исключение сетевого взаимодействия между клиентом и Redis и экономия трафика.
- Уменьшение количества команд чтения, которые должен выполнять сам Redis.
- Это также приводит к повышению производительности записи.
- Минимизация парсинга протокола Redis.
- Парсинг протокола Redis также не является бесплатным. Уменьшение этого является большой возможностью.
Однако все имеет свою цену. Для этого мы пожертвовали как минимум двумя вещами:
- Необходимость реализации, эксплуатации и поддержки элементов управления клиентским кешем.
- Увеличение использования CPU и памяти клиентом.
Заключение
Лично я остался доволен этой архитектурной компонентой, а также тем, что она значительно снизила задержку и нагрузку на API-сервер. В будущем я хотел бы, по возможности, продолжать строить архитектуру подобным образом.