GoSuda

Повышение реактивности с помощью кеша на стороне клиента Redis

By snowmerak
views ...

Что такое Redis?

Я полагаю, что мало кто не знаком с Redis. Тем не менее, можно кратко упомянуть некоторые его характеристики, которые можно обобщить следующим образом:

  • Операции выполняются в однопоточном режиме, что обеспечивает атомарность всех операций.
  • Данные хранятся и обрабатываются In-Memory, что обеспечивает высокую скорость всех операций.
  • В зависимости от опций, Redis может сохранять WAL, что позволяет быстро резервировать и восстанавливать последнее состояние.
  • Поддерживает различные типы данных, такие как Set, Hash, Bit, List, что обеспечивает высокую производительность.
  • Имеет большое сообщество, что позволяет обмениваться опытом, проблемами и решениями.
  • Разрабатывается и эксплуатируется в течение длительного времени, что обеспечивает надежную стабильность.

Переходим к сути

Представьте себе

Что, если кеш вашего сервиса соответствует следующим двум условиям?

  1. Необходимо предоставлять пользователю данные, которые часто запрашиваются, в актуальном состоянии, но обновление нерегулярно, что требует частых обновлений кеша.
  2. Обновление не происходит, но к одним и тем же кешированным данным требуется частый доступ для запросов.

Первый случай можно рассмотреть на примере рейтинга популярности товаров в реальном времени в интернет-магазине. Если рейтинг популярности товаров в реальном времени хранится в виде отсортированного набора, неэффективно, если пользователь будет считывать его из Redis каждый раз, когда он заходит на главную страницу. Второй случай касается данных о курсах валют: даже если данные о курсах валют объявляются примерно каждые 10 минут, фактические запросы происходят очень часто. Более того, запросы к кешу для курсов вон-доллар, вон-иена и вон-юань происходят очень часто. В таких случаях API-серверу было бы эффективнее иметь отдельный кеш локально и обновлять его из Redis при изменении данных.

Итак, как реализовать такое поведение в архитектуре "база данных - Redis - API-сервер"?

Разве Redis PubSub не подходит?

При использовании кеша подписывайтесь на канал, который может получать информацию об обновлении!

  • Тогда необходимо создать логику для отправки сообщений при обновлении.
  • Дополнительные операции, вызванные PubSub, влияют на производительность.

pubsub-write

pubsub-read

А что, если Redis обнаружит изменения?

Что, если использовать Keyspace Notification для получения уведомлений о командах для конкретного ключа?

  • Существует неудобство, связанное с необходимостью предварительного сохранения и совместного использования ключей и команд, используемых для обновления.
  • Например, для одного ключа обновляющей командой является простой Set, а для другого — LPush, или RPush, или SAdd, или SRem, что усложняет ситуацию.
  • Это значительно увеличивает вероятность ошибок в коммуникации и человеческих ошибок при кодировании в процессе разработки.

Что, если использовать Keyevent Notification для получения уведомлений на уровне команд?

  • Требуется подписка на все команды, используемые для обновления. При этом необходима соответствующая фильтрация ключей, поступающих оттуда.
  • Например, для некоторых из всех ключей, поступающих от команды Del, у данного клиента, скорее всего, нет локального кеша.
  • Это может привести к ненужной трате ресурсов.

Поэтому необходимы Invalidation Message!

Что такое Invalidation Message?

Invalidation Messages — это концепция, представленная в Redis 6.0 как часть Server Assisted Client-Side Cache. Invalidation Message передаются следующим образом:

  1. Предполагается, что ClientB уже однажды прочитал ключ.
  2. ClientA устанавливает новый ключ.
  3. Redis обнаруживает изменение и отправляет Invalidation Message ClientB, уведомляя ClientB об удалении кеша.
  4. ClientB получает это сообщение и предпринимает соответствующие действия.

invalidation-message

Как это использовать?

Базовая структура работы

Клиент, подключенный к Redis, запускает CLIENT TRACKING ON REDIRECT <client-id>, чтобы получать invalidation message. А клиент, который должен получать сообщения, подписывается на SUBSCRIBE __redis__:invalidate, чтобы получать invalidation message.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Реализация! Реализация! Реализация!

Redigo + Ristretto

Если объяснять только так, то не совсем понятно, как это использовать в коде. Поэтому сначала попробуем настроить это с помощью redigo и ristretto.

Сначала установите две зависимости.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // number of keys to track frequency of (10M).
22		MaxCost:     1 << 30, // maximum cost of cache (1GB).
23		BufferItems: 64,      // number of keys per Get buffer.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Сначала создается простой RedisClient, включающий ristretto и redigo.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Код немного сложен.

  • Для Tracking устанавливается еще одно соединение. Это делается с учетом того, что PubSub может мешать другим операциям.
  • Идентификатор добавленного соединения запрашивается, чтобы соединение, которое будет запрашивать данные, перенаправляло Tracking на это соединение.
  • Затем подписывается на invalidation message.
  • Код обработки подписки немного сложен. Поскольку redigo не может анализировать сообщения об аннулировании, необходимо получить ответ до анализа.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Сообщение Get сначала запрашивает ristretto, и если его там нет, получает его из Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

Код для тестирования приведен выше. Если вы попробуете его, то увидите, что значения обновляются каждый раз, когда данные в Redis обновляются.

Однако это слишком сложно. Более того, для расширения до кластера неизбежно потребуется включить Tracking для всех мастеров или реплик.

Rueidis

Поскольку мы используем Go, у нас есть rueidis, самый современный и развитый. Давайте напишем код, использующий server assisted client side cache в кластерной среде Redis с помощью rueidis.

Сначала установите зависимость.

  • github.com/redis/rueidis

Затем напишите код для запроса данных из Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

В rueidis для использования client side cache достаточно просто вызвать DoCache. Это добавит данные в локальный кеш с указанием времени его сохранения, и при повторном вызове DoCache данные будут извлечены из локального кеша. Естественно, сообщения об аннулировании также обрабатываются корректно.

Почему не redis-go?

К сожалению, redis-go не поддерживает server assisted client side cache через официальный API. Более того, при создании PubSub он создает новое соединение, и нет API для прямого доступа к этому соединению, поэтому невозможно узнать идентификатор клиента. Поэтому redis-go был пропущен, поскольку его конфигурация была признана невозможной.

Это привлекательно

Через структуру client side cache

  • Если данные могут быть заранее подготовлены, эта структура позволит минимизировать запросы и трафик к Redis, всегда предоставляя актуальные данные.
  • Это позволит создать своего рода структуру CQRS, что значительно улучшит производительность чтения.

cqrs

Насколько это стало привлекательнее?

На самом деле, поскольку такая структура уже используется на практике, я провел простой анализ задержек для двух API. Прошу прощения за то, что могу описать это лишь очень абстрактно.

  1. Первый API
    1. При первом запросе: в среднем 14.63 мс
    2. При последующих запросах: в среднем 2.82 мс
    3. Средняя разница: 10.98 мс
  2. Второй API
    1. При первом запросе: в среднем 14.05 мс
    2. При последующих запросах: в среднем 1.60 мс
    3. Средняя разница: 11.57 мс

Было достигнуто дополнительное улучшение задержки до 82%!

Вероятно, были следующие улучшения:

  • Исключение сетевого взаимодействия между клиентом и Redis и экономия трафика.
  • Уменьшение количества команд чтения, которые должен выполнять сам Redis.
    • Это также повышает производительность записи.
  • Минимизация парсинга протокола Redis.
    • Парсинг протокола Redis также не является бесплатным. Уменьшение этого — большая возможность.

Однако все это компромиссы. Для этого мы пожертвовали как минимум двумя вещами:

  • Необходимость реализации, эксплуатации и обслуживания элементов управления клиентским кешем.
  • Увеличение использования ЦП и памяти клиентом из-за этого.

Заключение

Лично я был доволен этой архитектурной составляющей, и задержка, а также нагрузка на API-сервер были очень низкими. В будущем я надеюсь, что по возможности буду строить архитектуру именно таким образом.