Повишаване на отзивчивостта с Redis кеш от страна на клиента
Какво е Redis?
Смятам, че малцина не са запознати с Redis. Въпреки това, накратко можем да го обобщим със следните характеристики:
- Операциите се извършват в единична нишка, което прави всички операции атомарни.
- Данните се съхраняват и обработват In-Memory, което прави всички операции бързи.
- Redis може да съхранява WAL в зависимост от опциите, което позволява бързо архивиране и възстановяване на най-новото състояние.
- Поддържа различни типове като Set, Hash, Bit, List, което осигурява висока продуктивност.
- Разполага с голяма общност, което позволява споделяне на разнообразен опит, проблеми и решения.
- Разработва се и се експлоатира дълго време, което гарантира надеждна стабилност.
И така, към основната тема
Представете си?
Какво би станало, ако кешът на вашата услуга отговаря на следните две условия?
- Трябва да предоставяте на потребителите най-актуалните данни, които се търсят често, но актуализациите са нередовни, което налага чести актуализации на кеша.
- Актуализацията не се извършва, но често е необходимо да се достъпват едни и същи кеширани данни за справка.
Първият случай може да се разгледа като класиране на популярност в реално време за онлайн магазин. Когато класирането на популярност в реално време за онлайн магазин се съхранява като sorted set, е неефективно Redis да чете данни всеки път, когато потребител достъпва главната страница. Във втория случай, за данните за обменния курс, дори ако данните за обменния курс се обявяват приблизително на всеки 10 минути, действителните справки се извършват много често. Освен това, за обменното отношение вон-долар, вон-йена, вон-юан, кешът се търси много често. В такива случаи би било ефективно API сървърът да има отделен локален кеш и да актуализира данните чрез повторно запитване към Redis, когато данните се променят.
И така, как можем да внедрим такова поведение в структурата База данни - Redis - API сървър?
Не може ли с Redis PubSub?
Когато използвате кеш, абонирайте се за канал, който може да получава известия за актуализация!
- Тогава трябва да създадете логика за изпращане на съобщения при актуализация.
- Допълнителните операции, дължащи се на PubSub, влияят на производителността.
Ами ако Redis засече промени?
Използвайте Keyspace Notification, за да получавате известия за команди за конкретен ключ?
- Има неудобството да се съхраняват и споделят предварително ключовете и командите, използвани за актуализация.
- Например, става сложно, когато за един ключ проста команда Set е команда за актуализация, а за друг ключ LPush, RPush, SAdd или SRem е команда за актуализация.
- Това значително увеличава вероятността от комуникационни грешки и човешки грешки в кодирането по време на процеса на разработка.
Използвайте Keyevent Notification, за да получавате известия на ниво команда?
- Изисква се абонамент за всички команди, използвани за актуализация. Изисква се подходящо филтриране на ключовете, които идват оттам.
- Например, за някои от всички ключове, които идват с Del, този клиент вероятно няма локален кеш.
- Това може да доведе до ненужно изразходване на ресурси.
Ето защо е необходимо Invalidation Message!
Какво е Invalidation Message?
Invalidation Messages е концепция, предоставена като част от Server Assisted Client-Side Cache, добавена от Redis 6.0. Invalidation Message се предава по следния начин:
- Предполага се, че ClientB вече е прочел ключа веднъж.
- ClientA задава нов ключ.
- Redis открива промяната и изпраща Invalidation Message до ClientB, за да го уведоми да изчисти кеша.
- ClientB получава съобщението и предприема подходящи действия.
Как се използва
Основна работна структура
Клиент, свързан към Redis, получава съобщения за невалидност, като изпълнява CLIENT TRACKING ON REDIRECT <client-id>
. Клиентът, който трябва да получава съобщения, се абонира за SUBSCRIBE __redis__:invalidate
, за да получава съобщения за невалидност.
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Внедряване! Внедряване! Внедряване!
Redigo + Ristretto
Само с това обяснение е трудно да се разбере как да се използва в реалния код. Затова нека първо да го конфигурираме просто с redigo
и ristretto
.
Първо инсталирайте двете зависимости.
github.com/gomodule/redigo
github.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // брой ключове за проследяване на честотата (10M).
22 MaxCost: 1 << 30, // максимална цена на кеша (1GB).
23 BufferItems: 64, // брой ключове на буфер Get.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Първо, просто създаваме RedisClient
, който включва ristretto и redigo.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
Кодът е малко сложен.
- За да се извърши Tracking, се установява още една връзка. Това е мярка, която отчита възможността PubSub да попречи на други операции.
- Чрез заявка за ID на добавената връзка, Tracking се пренасочва към тази връзка от връзката, която ще извлича данни.
- След това се абонираме за съобщения за невалидност.
- Кодът за обработка на абонамента е малко сложен. Тъй като redigo не може да анализира съобщения за невалидност, трябва да се получи и обработи отговорът преди анализа.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Съобщението Get
първо търси в Ristretto, а ако не го намери, го извлича от Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
Кодът за тестване е както по-горе. Ако го тествате, ще можете да видите, че стойността се актуализира всеки път, когато данните в Redis се актуализират.
Но това е твърде сложно. Освен това, за да се разшири до клъстер, е неизбежно да се активира Tracking за всички master или replica възли.
Rueidis
За тези, които използват Go, разполагаме с най-модерния и напреднал rueidis
. Ще напишем код, който използва кеш на страната на клиента, подпомогнат от сървър, в среда на Redis клъстер, използвайки rueidis.
Първо инсталирайте зависимостта.
github.com/redis/rueidis
След това напишете кода за извличане на данни от Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
С rueidis, за да използвате кеша от страната на клиента, е достатъчно да извикате DoCache
. Тогава той добавя данни към локалния кеш, включително колко дълго да се съхраняват в него, и ако се извика DoCache
отново, той извлича данните от локалния кеш. Разбира се, той обработва правилно и съобщенията за невалидност.
Защо не redis-go?
redis-go
за съжаление не поддържа кеш от страна на клиента, подпомогнат от сървър, като официален API. Освен това, когато се създава PubSub, няма API за директен достъп до новата създадена връзка, така че ID на клиента не може да бъде получено. Затова преценихме, че redis-go
не може да бъде конфигуриран и го пропуснахме.
Изглежда секси
Чрез структурата на кеша от страната на клиента
- Ако данните могат да бъдат подготвени предварително, тази структура може да минимизира заявките и трафика към Redis, като същевременно винаги предоставя най-новите данни.
- Това позволява създаването на един вид CQRS структура, която драстично подобрява производителността при четене.
Колко по-секси е станало?
Всъщност, тъй като такава структура се използва на място, разгледахме накратко латентността за два API. Моля, извинете ме, че мога да пиша само много абстрактно.
- Първи API
- Първоначална заявка: средно 14.63ms
- Последващи заявки: средно 2.82ms
- Средна разлика: 10.98ms
- Втори API
- Първоначална заявка: средно 14.05ms
- Последващи заявки: средно 1.60ms
- Средна разлика: 11.57ms
Имаше допълнително подобрение на латентността до 82%!
Очаква се следните подобрения:
- Избягване на процеса на мрежова комуникация между клиента и Redis и спестяване на трафик
- Намаляване на броя на командите за четене, които самият Redis трябва да изпълни
- Това също така подобрява производителността при запис.
- Минимизиране на анализа на протокола на Redis
- Анализът на протокола на Redis не е без разходи. Намаляването му е голяма възможност.
Но всичко е компромис. За да постигнем това, ние пожертвахме най-малко две неща:
- Необходимост от внедряване, експлоатация и поддръжка на елементи за управление на кеша от страна на клиента
- Увеличаване на използването на CPU и памет от клиента в резултат на това
Заключение
Лично аз бях доволен от тази архитектурна конфигурация, а латентността и натоварването на API сървъра бяха много ниски. Надявам се да мога да конфигурирам архитектурата по този начин и в бъдеще, ако е възможно.