GoSuda

Zlepšení odezvy pomocí Redis client-side cache

By snowmerak
views ...

Co je Redis?

Myslím, že není mnoho lidí, kteří neznají Redis. Nicméně, abychom se krátce zmínili o několika charakteristikách a posunuli se dál, lze to shrnout následovně.

  • Operace jsou prováděny v single thread, takže všechny operace mají atomicity.
  • Data jsou ukládána a zpracovávána In-Memory, takže všechny operace jsou rychlé.
  • Redis může ukládat WAL v závislosti na možnosti, což umožňuje rychlé backup a recovery nejnovějšího stavu.
  • Podporuje various types jako Set, Hash, Bit, List, což nabízí vysokou productivity.
  • large community, což umožňuje sharing různých zkušeností, problémů a řešení.
  • Je dlouho vyvíjen a provozován, což poskytuje spolehlivou stability.

A teď k věci

Představte si?

Co kdyby cache vaší služby splňovala následující dvě podmínky?

  1. Když je třeba často zobrazovaná data poskytovat uživateli v nejnovějším stavu, ale aktualizace jsou nepravidelné a vyžadují časté aktualizace cache.
  2. Když aktualizace nejsou potřeba, ale ke stejným datům v cache je třeba přistupovat a dotazovat se na ně často.

V prvním případě lze zvážit hodnocení popularity v reálném čase v online obchodě. Když je hodnocení popularity v reálném čase v online obchodě uloženo jako sorted set, je neefektivní číst ho z Redis pokaždé, když uživatel přistoupí na hlavní stránku. V druhém případě, co se týče dat o směnném kurzu, i když jsou data o směnném kurzu vyhlašována zhruba každých 10 minut, skutečné dotazy se vyskytují velmi často. Navíc, pro KRW-USD, KRW-JPY a KRW-CNY se cache dotazuje velmi často. V takových případech by bylo efektivnější, aby API server měl lokálně samostatnou cache a při změně dat se znovu dotazoval na Redis, aby ji aktualizoval.

Jak tedy lze toto chování implementovat ve struktuře Database - Redis - API Server??

Nestačilo by Redis PubSub?

Při použití cache se přihlásíme k odběru kanálu, který může přijímat oznámení o aktualizacích!

  • Pak je třeba vytvořit logiku pro odesílání zpráv při aktualizaci.
  • Zahrnutí dodatečných operací kvůli PubSub ovlivňuje výkon.

pubsub-write

pubsub-read

Co když Redis detekuje změnu?

Co když se pomocí Keyspace Notification přijímají oznámení o příkazech pro daný klíč?

  • Existuje nepříjemnost nutnosti předem uložit a sdílet klíče a příkazy používané pro aktualizace.
  • Například, pro některé klíče je jednoduchý Set příkazem pro aktualizaci, zatímco pro jiné se příkazy LPush, RPush nebo SAdd a SRem stávají příkazy pro aktualizaci, což je složité.
  • To výrazně zvyšuje možnost komunikačních chyb a lidských chyb v kódování během procesu vývoje.

Co když se pomocí Keyevent Notification přijímají oznámení podle jednotky příkazu?

  • Je vyžadováno přihlášení k odběru všech příkazů používaných pro aktualizace. Je nutné provést vhodné filtrování příchozích klíčů odtud.
  • Například, pro některé ze všech klíčů přicházejících přes Del je vysoce pravděpodobné, že daný klient nemá lokální cache.
  • To může vést k zbytečnému plýtvání zdroji.

Proto je potřeba Invalidation Message!

Co je Invalidation Message?

Invalidation Messages je koncept poskytovaný jako součást Server Assisted Client-Side Cache, přidaný od Redis 6.0. Invalidation Message je doručována v následujícím toku.

  1. Předpokládejme, že ClientB již jednou přečetl key.
  2. ClientA nastaví corresponding key znovu.
  3. Redis detekuje změnu a publikuje Invalidation Message pro ClientB, čímž ClientB oznámí, aby cache vymazal.
  4. ClientB přijme zprávu a provede příslušnou akci.

invalidation-message

Jak se to používá

Základní struktura operace

Klient připojený k Redis přijímá invalidation messages spuštěním CLIENT TRACKING ON REDIRECT <client-id>. A klient, který potřebuje přijímat zprávy, se přihlásí k odběru invalidation messages pomocí SUBSCRIBE __redis__:invalidate.

default tracking

1# klient 1
2> SET a 100
1# klient 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# klient 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # sledování
1# klient 1
2> SET a 200
1# klient 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# klient 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# klient 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# klient 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# klient 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementace! Implementace! Implementace!

Redigo + Ristretto

Vysvětlení pouze tímto způsobem činí nejasným, jak to použít v reálném kódu. Proto to nejprve jednoduše nakonfigurujeme s redigo a ristretto.

Nejprve nainstalujte dvě závislosti.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // počet klíčů pro sledování frekvence (10M).
22		MaxCost:     1 << 30, // maximální cena cache (1GB).
23		BufferItems: 64,      // počet klíčů na Get buffer.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err) // selhalo generování cache
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err) // selhalo připojení k redis
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err) // selhalo uzavření připojení k redis
45	}
46
47	return nil
48}

Nejprve jednoduše vytvořte RedisClient včetně ristretto a redigo.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err) // selhalo připojení k redis
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err) // selhalo získání client id
10	}
11	slog.Info("client id", "id", clientId) // client id
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err) // selhalo povolení tracking
16	}
17	slog.Info("subscription result", "result", subscriptionResult) // výsledek subscription
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err) // selhalo subscribe
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err) // selhalo přijetí zprávy
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data) // přijatá zpráva
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count) // subscription
37		case error:
38			return fmt.Errorf("error: %w", msg) // chyba
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg) // neočekávaná zpráva
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys) // přijatá invalidation zpráva
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg)) // neočekávaná zpráva
54		}
55	}
56}

Kód je trochu složitý.

  • Pro Tracking se navazuje další spojení. Toto je opatření s ohledem na to, že PubSub by mohl rušit jiné operace.
  • ID přidaného spojení se dotazuje a Tracking se přesměrovává na toto spojení ze spojení, které dotazuje data.
  • A přihlašuje se k odběru invalidation message.
  • Kód pro zpracování odběru je trochu složitý. Protože redigo neparsuje invalidation messages, je třeba přijmout odpověď před parsováním a zpracovat ji.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key) // cache hit
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v)) // neočekávaný typ
10		}
11	}
12	slog.Info("cache miss", "key", key) // cache miss
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err) // selhalo získání klíče
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Zpráva Get nejprve dotazuje ristretto takto, a pokud není nalezena, načte ji z Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err) // selhalo sledování invalidation zprávy
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down") // vypínání
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err) // selhalo získání klíče
40				return
41			}
42			slog.Info("got key", "value", v) // získán klíč
43		}
44	}
45}

Kód pro testování je uveden výše. Pokud ho otestujete, budete moci potvrdit, že hodnota je znovu aktualizována, kdykoli se data v Redis aktualizují.

Nicméně, toto je příliš složité. Především, pro škálování clusteru je nevyhnutelně nutné povolit Tracking pro všechny mastery nebo repliky.

Rueidis

Pokud používáte jazyk Go, máme k dispozici nejmodernější a nejpokročilejší rueidis. Napíšeme kód využívající server assisted client side cache v prostředí Redis cluster s použitím rueidis.

Nejprve nainstalujte závislost.

  • github.com/redis/rueidis

A napište kód pro dotazování dat z Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down") // vypínání
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error()) // selhalo získání klíče
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err) // selhalo převést odpověď na int64
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key) // cache hit
48			case false:
49				slog.Info("missed key", "key", key) // chybějící klíč
50			}
51			slog.Info("got key", "value", i) // získán klíč
52		}
53	}
54}

S rueidis stačí použít DoCache pro použití client side cache. Poté se přidá do lokální cache, včetně specifikace, jak dlouho ji uchovávat, a opětovné volání DoCache načte data z lokální cache. Přirozeně, invalidation messages jsou také správně zpracovány.

Proč ne redis-go?

redis-go bohužel nepodporuje server assisted client side cache prostřednictvím svého oficiálního API. Navíc, při vytváření PubSub neexistuje API pro přímý přístup k nově vytvořenému spojení, takže client ID nelze zjistit. Proto bylo usouzeno, že samotná konfigurace je s redis-go nemožná, a bylo to přeskočeno.

Sexy

Prostřednictvím struktury client side cache

  • Pokud data lze připravit předem, tato struktura umožňuje minimalizovat dotazy a provoz na Redis a zároveň vždy poskytovat nejnovější data.
  • Díky tomu lze vytvořit jakousi CQRS strukturu a dramaticky zlepšit výkon čtení.

cqrs

O kolik se to stalo sexy?

Vzhledem k tomu, že se tato struktura v praxi aktuálně používá, podíval jsem se na jednoduchou latency pro dvě API. Prosím, pochopte, že mohu psát pouze velmi abstraktně.

  1. První API
    1. Při prvním dotazu: Průměrně 14.63ms
    2. Při následném dotazu: Průměrně 2.82ms
    3. Průměrný rozdíl: 10.98ms
  2. Druhé API
    1. Při prvním dotazu: Průměrně 14.05ms
    2. Při následném dotazu: Průměrně 1.60ms
    3. Průměrný rozdíl: 11.57ms

Došlo k dodatečnému zlepšení latency až o přibližně 82 %!

Očekávám, že došlo k následujícím zlepšením.

  • Vynechání procesu síťové komunikace mezi klientem a Redis a úspora provozu.
  • Snížení počtu příkazů pro čtení, které musí Redis sám provést.
  • To má také efekt zlepšení výkonu zápisu.
  • Minimalizace parsování protokolu Redis.
  • Parsování protokolu Redis není bez nákladů. Snížení toho je velkou příležitostí.

Nicméně, vše je trade-off. Kvůli tomu jsme obětovali minimálně následující dvě věci.

  • Nutnost implementace, provozu a údržby prvků správy client-side cache.
  • Zvýšené využití CPU a memory na straně klienta v důsledku toho.

Závěr

Osobně to byla uspokojivá architektonická komponenta a zatížení na latency a API server bylo velmi malé. Věřím, že by bylo prospěšné konfigurovat architekturu s takovou strukturou i v budoucnu, pokud to bude možné.