GoSuda

Подобряване на отзивчивостта с кеширане от страна на клиента в Redis

By snowmerak
views ...

Какво представлява Redis?

Предполагам, че малцина не са запознати с Redis. Въпреки това, ако трябва да го споменем накратко с няколко характеристики, то може да бъде обобщен по следния начин:

  • Операциите се изпълняват в единична нишка, като всички операции притежават атомарност.
  • Данните се съхраняват и обработват In-Memory, което прави всички операции бързи.
  • Redis може да съхранява WAL в зависимост от опциите, което позволява бързо архивиране и възстановяване на най-актуалното състояние.
  • Поддържа различни типове данни като Set, Hash, Bit, List, което осигурява висока производителност.
  • Разполага с голяма общност, което позволява споделяне на разнообразен опит, проблеми и решения.
  • Разработва се и се експлоатира дълго време, което гарантира надеждна стабилност.

И така, към същественото

Представете си

Какво би станало, ако кешът на вашата услуга отговаряше на следните две условия?

  1. Когато често достъпваните данни трябва да бъдат предоставени на потребителя в най-актуално състояние, но актуализациите са нередовни и кешът трябва да се обновява често.
  2. Когато актуализация не е необходима, но е необходимо често да се достъпват едни и същи данни от кеша.

Първият случай може да разгледа класацията за популярност в реално време на онлайн магазин. Ако класацията за популярност в реално време на онлайн магазин е съхранена като sorted set, би било неефективно Redis да я чете всеки път, когато потребител достъпи главната страница. Във втория случай, за данни за обменни курсове, дори ако данните за обменния курс се обявяват приблизително на всеки 10 минути, действителните заявки се случват много често. Особено за вон-долар, вон-йена и вон-юан, кешът се достъпва много често. В тези случаи би било по-ефективно API сървърът да има отделен локален кеш и да го актуализира, като отново извлече данни от Redis, когато данните се променят.

И така, как можем да внедрим такова поведение в архитектурата база данни – Redis – API сървър?

Не може ли с Redis PubSub?

Когато използвате кеш, абонирайте се за канал, който може да получава известия за актуализации!

  • Тогава трябва да се създаде логика за изпращане на съобщения при актуализация.
  • Добавените операции поради PubSub влияят на производителността.

pubsub-write

pubsub-read

Ами ако Redis засече промените?

Ами ако използваме Keyspace Notification, за да получаваме известия за команди за конкретен ключ?

  • Съществува неудобството от предварително съхраняване и споделяне на ключове и команди, използвани за актуализация.
  • Например, за някои ключове проста команда Set е команда за актуализация, докато за други ключове LPush, RPush, SAdd или SRem са команди за актуализация, което прави процеса по-сложен.
  • Това значително увеличава вероятността от комуникационни пропуски и човешки грешки при кодиране по време на процеса на разработка.

Ами ако използваме Keyevent Notification, за да получаваме известия по команда?

  • Необходим е абонамент за всички команди, използвани за актуализация. Необходима е подходяща филтрация за входящите ключове.
  • Например, за някои от ключовете, получени чрез Del, е много вероятно съответният клиент да няма локален кеш.
  • Това може да доведе до ненужно изразходване на ресурси.

Ето защо е необходимо Invalidation Message!

Какво е Invalidation Message?

Invalidation Messages е концепция, предоставена като част от Server Assisted Client-Side Cache, добавена от Redis 6.0. Invalidation Message се предава по следния начин:

  1. Предполага се, че ClientB вече е прочел ключа веднъж.
  2. ClientA задава нов ключ.
  3. Redis открива промяната и изпраща Invalidation Message до ClientB, за да го уведоми да изчисти кеша.
  4. ClientB получава съобщението и предприема подходящи действия.

invalidation-message

Как се използва

Основна работна структура

Клиент, свързан с Redis, изпълнява CLIENT TRACKING ON REDIRECT <client-id>, за да получава invalidation message. Клиентът, който трябва да получава съобщения, се абонира за SUBSCRIBE __redis__:invalidate, за да получава invalidation message.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Имплементация! Имплементация! Имплементация!

Redigo + Ristretto

Само с такова обяснение би било неясно как да се използва в реалния код. Затова нека първо го конфигурираме просто с redigo и ristretto.

Първо инсталирайте двете зависимости.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // брой ключове за проследяване на честотата (10M).
22		MaxCost:     1 << 30, // максимална цена на кеша (1GB).
23		BufferItems: 64,      // брой ключове на Get буфер.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Първо, просто създаваме RedisClient, който включва ristretto и redigo.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Кодът е малко сложен.

  • За да активирате Tracking, се установява още една връзка. Това е мярка, предприета, за да се предотврати PubSub да пречи на други операции.
  • Идентификаторът на добавената връзка се извлича и Tracking се пренасочва към тази връзка от връзката, която ще извлича данни.
  • След това се абонираме за invalidation message.
  • Кодът за обработка на абонамента е малко сложен. Тъй като redigo не поддържа анализиране на съобщения за невалидност, трябва да обработим отговора преди анализирането.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Съобщението Get първо проверява ristretto по следния начин и ако не го намери, го извлича от Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

Кодът за тестване е както е показано по-горе. Ако го тествате, ще можете да потвърдите, че стойността се актуализира всеки път, когато данните се актуализират в Redis.

Но това е твърде сложно. Освен това, за да се мащабира до клъстер, е необходимо да се активира Tracking за всички master или replica сървъри.

Rueidis

За тези, които пишат на Go, имаме rueidis, който е най-модерният и напреднал. Нека напишем код, който използва server assisted client side cache в среда на Redis клъстер, използвайки rueidis.

Първо инсталирайте зависимостта.

  • github.com/redis/rueidis

След това напишете код, който извлича данни от Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

С rueidis, за да използвате client side cache, просто трябва да извикате DoCache. Това ще добави данните към локалния кеш, включително колко дълго да се съхраняват там, и ако извикате DoCache отново, то ще извлече данните от локалния кеш. Разбира се, то също така обработва съобщенията за невалидност правилно.

Защо не redis-go?

redis-go за съжаление не поддържа server assisted client side cache като официален API. Дори при създаване на PubSub, няма API за директен достъп до новата връзка, което прави невъзможно да се знае client id. Затова прескочихме redis-go, тъй като преценихме, че самата му структура го прави невъзможен.

Много привлекателно

Чрез структурата на client side cache

  • Ако данните могат да бъдат подготвени предварително, тази структура би могла да минимизира заявките и трафика към Redis, като винаги предоставя най-актуалните данни.
  • Това може да създаде един вид CQRS структура, която драстично подобрява производителността при четене.

cqrs

Колко по-привлекателно стана?

Всъщност, тъй като такава структура вече се използва на практика, проучихме простите латентности за два API-та. Моля, разберете, че мога да пиша само по много абстрактен начин.

  1. Първи API
    1. При първоначална заявка: средно 14.63ms
    2. При последваща заявка: средно 2.82ms
    3. Средна разлика: 10.98ms
  2. Втори API
    1. При първоначална заявка: средно 14.05ms
    2. При последваща заявка: средно 1.60ms
    3. Средна разлика: 11.57ms

Имаше допълнително подобрение на латентността до 82%!

Вероятно са налице следните подобрения:

  • Елиминиране на процеса на мрежова комуникация между клиента и Redis и спестяване на трафик.
  • Намаляване на броя команди за четене, които самият Redis трябва да изпълнява.
    • Това също така подобрява производителността при запис.
  • Минимизиране на анализирането на Redis протокола.
    • Анализирането на Redis протокола не е без разходи. Намаляването му е голяма възможност.

Но всичко е въпрос на компромиси. За да постигнем това, пожертвахме поне следните две неща:

  • Необходимост от внедряване, опериране и поддръжка на елементи за управление на клиентския кеш.
  • Увеличаване на използването на CPU и памет от страна на клиента в резултат на това.

Заключение

Лично аз бях доволен от архитектурния компонент, а латентността и натоварването на API сървъра бяха много ниски. В бъдеще, ако е възможно, бих искал да изграждам архитектура по този начин.