GoSuda

Zvyšování Reakceschopnosti pomocí Redis Client-Side Caching

By snowmerak
views ...

Co je Redis?

Předpokládám, že není mnoho lidí, kteří by neznali Redis. Nicméně, pokud bychom měli stručně zmínit některé jeho charakteristiky, lze je shrnout následovně:

  • Operace jsou prováděny v jednom vlákně, což zajišťuje atomicitu všech operací.
  • Data jsou ukládána a operace prováděny In-Memory, díky čemuž jsou všechny operace rychlé.
  • Redis může volitelně ukládat WAL, což umožňuje rychlé zálohování a obnovu nejnovějšího stavu.
  • Podporuje různé datové typy, jako jsou Set, Hash, Bit, List, což zajišťuje vysokou produktivitu.
  • velkou komunitu, což umožňuje sdílení různých zkušeností, problémů a řešení.
  • Je dlouhodobě vyvíjen a provozován, což zaručuje spolehlivou stabilitu.

Takže k věci

Představte si to:

Co kdyby cache vaší služby splňovala následující dvě podmínky?

  1. Je nutné poskytovat uživatelům data, která jsou často vyhledávána, v aktuálním stavu, ale aktualizace jsou nepravidelné, což vyžaduje časté obnovování cache.
  2. Aktualizace nejsou nutné, ale je potřeba často přistupovat a vyhledávat stejná data v cache.

První případ lze uvažovat u žebříčku popularity v reálném čase v internetovém obchodě. Pokud je žebříček popularity uložen jako sorted set, je neefektivní, aby Redis četl data pokaždé, když uživatel přistoupí na hlavní stránku. Druhý případ se týká kurzů měn: i když jsou data o kurzech zveřejňována přibližně každých 10 minut, skutečné vyhledávání probíhá velmi často. Konkrétně pro kurzy Won-Dolar, Won-Jen a Won-Yuan je cache vyhledávána velmi často. V takových případech by bylo efektivnější, kdyby měl API server lokální cache a při změně dat znovu vyhledal a aktualizoval data z Redisu.

Jak tedy lze takové chování implementovat ve struktuře Databáze – Redis – API Server?

Nestačí Redis PubSub?

Při použití cache se přihlaste k odběru kanálu, který informuje o nutnosti aktualizace!

  • To by vyžadovalo vytvoření logiky pro odesílání zpráv při aktualizaci.
  • Zahrnutí dodatečné operace kvůli PubSub by ovlivnilo výkon.

pubsub-write

pubsub-read

Co když Redis detekuje změnu?

Co kdybychom použili Keyspace Notification k přijímání upozornění na příkazy pro daný klíč?

  • Existuje nepříjemnost spočívající v nutnosti předem uložit a sdílet klíče a příkazy použité pro aktualizaci.
  • Například pro některé klíče je aktualizačním příkazem jednoduchý Set, zatímco pro jiné to může být LPush, RPush, SAdd nebo SRem, což vede ke složitosti.
  • To výrazně zvyšuje pravděpodobnost komunikačních chyb a lidských chyb při kódování během vývoje.

Co kdybychom použili Keyevent Notification k přijímání upozornění na úrovni příkazů?

  • Je nutné se přihlásit k odběru všech příkazů používaných pro aktualizaci. Je nutné provést odpovídající filtrování klíčů, které přicházejí.
  • Například u všech klíčů, které přicházejí s příkazem Del, je vysoká pravděpodobnost, že daný klient nemá lokální cache pro některé z nich.
  • To by mohlo vést ke zbytečnému plýtvání prostředky.

Proto je potřeba Invalidation Message!

Co je Invalidation Message?

Invalidation Messages je koncept, který je součástí Server Assisted Client-Side Cache a je dostupný od verze Redis 6.0. Invalidation Message se přenáší následujícím způsobem:

  1. Předpokládejme, že ClientB již jednou přečetl klíč.
  2. ClientA nastaví novou hodnotu pro daný klíč.
  3. Redis detekuje změnu a publikuje Invalidation Message pro ClientB, aby mu oznámil, že má smazat cache.
  4. ClientB obdrží zprávu a provede odpovídající opatření.

invalidation-message

Jak se to používá

Základní princip fungování

Klient připojený k Redisu spustí CLIENT TRACKING ON REDIRECT <client-id>, čímž se povolí příjem invalidation message. Klient, který má přijímat zprávy, se přihlásí k odběru SUBSCRIBE __redis__:invalidate, aby obdržel invalidation message.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementace! Implementace! Implementace!

Redigo + Ristretto

Pouhý popis nemusí být dostatečný pro pochopení, jak to skutečně použít v kódu. Proto se pokusíme o jednoduchou konfiguraci s použitím redigo a ristretto.

Nejprve nainstalujeme obě závislosti.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // number of keys to track frequency of (10M).
22		MaxCost:     1 << 30, // maximum cost of cache (1GB).
23		BufferItems: 64,      // number of keys per Get buffer.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Nejprve vytvoříme jednoduchý RedisClient zahrnující ristretto a redigo.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Kód je trochu složitý.

  • Pro Tracking navážeme další připojení. To je opatření, které zohledňuje, že PubSub by mohl narušovat jiné operace.
  • Zjistíme ID přidaného připojení a v připojení pro vyhledávání dat nastavíme Tracking tak, aby přesměroval na toto připojení.
  • Poté se přihlásíme k odběru invalidation message.
  • Kód pro zpracování odběru je trochu složitý. Jelikož redigo neprovádí parsování pro zprávu o neplatnosti, je nutné přijmout a zpracovat odpověď před parsováním.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Zpráva Get nejprve vyhledá v ristretto, a pokud tam data nejsou, načte je z Redisu.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

Testovací kód je uveden výše. Pokud ho vyzkoušíte, uvidíte, že se hodnota aktualizuje pokaždé, když se data v Redisu změní.

Tento přístup je však příliš komplikovaný. Kromě toho je pro škálování na cluster nevyhnutelně nutné aktivovat Tracking na všech masterech nebo replikách.

Rueidis

Pro ty, kteří používají jazyk Go, máme k dispozici nejmodernější a nejpokročilejší rueidis. Nyní napíšeme kód, který používá server assisted client side cache v prostředí clusteru Redis s využitím rueidis.

Nejprve nainstalujeme závislost.

  • github.com/redis/rueidis

A poté napíšeme kód pro vyhledávání dat v Redisu.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

V rueidis stačí k použití client side cache pouze zavolat DoCache. Tím se data přidají do lokální cache a určí se, jak dlouho mají být v lokální cache uchovávána. Stejné volání DoCache pak vyhledá data v lokální cache. Samozřejmě, že invalidation message jsou zpracovávány korektně.

Proč ne redis-go?

redis-go bohužel oficiálně nepodporuje server assisted client side cache. Navíc při vytváření PubSub vytváří nové připojení a neexistuje API pro přímý přístup k tomuto připojení, takže nelze zjistit client id. Proto jsme se rozhodli redis-go vynechat, jelikož se zdálo, že implementace není možná.

Je to elegantní

Prostřednictvím struktury client side cache

  • Pokud se jedná o data, která lze předem připravit, tato struktura by mohla minimalizovat dotazy a provoz na Redisu a zároveň vždy poskytovat nejnovější data.
  • Tímto způsobem lze vytvořit jakousi strukturu CQRS, která exponenciálně zvýší výkon čtení.

cqrs

Jak moc je to elegantnější?

Protože se tato struktura skutečně používá v praxi, provedli jsme jednoduché měření latence pro dvě API. Prosím, omluvte, že mohu psát pouze velmi abstraktně.

  1. První API
    1. Při prvním vyhledání: průměrně 14.63ms
    2. Při následném vyhledání: průměrně 2.82ms
    3. Průměrný rozdíl: 10.98ms
  2. Druhé API
    1. Při prvním vyhledání: průměrně 14.05ms
    2. Při následném vyhledání: průměrně 1.60ms
    3. Průměrný rozdíl: 11.57ms

Došlo k dodatečnému zlepšení latence až o 82%!

Očekáváme, že k tomuto zlepšení došlo díky následujícím faktorům:

  • Eliminace síťové komunikace mezi klientem a Redisem a úspora provozu
  • Snížení počtu příkazů pro čtení, které musí Redis sám provádět
    • To má také za následek zlepšení výkonu zápisu.
  • Minimalizace parsování protokolu Redis
    • Parsování protokolu Redis není bez nákladů. Možnost snížit tyto náklady je velká příležitost.

Avšak vše je trade-off. Za toto jsme museli obětovat minimálně následující dvě věci:

  • Nutnost implementace, provozu a údržby správy prvků client side cache
  • Zvýšení využití CPU a paměti klienta v důsledku toho

Závěr

Osobně jsem byl s touto architektonickou komponentou spokojen a zatížení API serveru i latence byly velmi nízké. Domnívám se, že by bylo dobré konfigurovat architekturu tímto způsobem i v budoucnu, pokud to bude možné.