GoSuda

A reszponzivitás javítása Redis kliensoldali gyorsítótárral

By snowmerak
views ...

Mi az a Redis?

Gondolom, kevesen vannak, akik nem ismerik a Rediset. Azonban, ha röviden szeretném megemlíteni néhány tulajdonságát, akkor a következőképpen foglalhatók össze:

  • A műveleteket egy szálon hajtja végre, így minden művelet atomikus tulajdonságú.
  • Az adatok In-Memory módon tárolódnak és dolgozódnak fel, így minden művelet gyors.
  • A Redis opciótól függően WAL-t is tárolhat, ami lehetővé teszi a legfrissebb állapot gyors mentését és helyreállítását.
  • Támogatja a különböző típusokat, mint például a Set, Hash, Bit, List, ami magas termelékenységet biztosít.
  • Nagy közösséggel rendelkezik, így sokféle tapasztalat, probléma és megoldás megosztható.
  • Hosszú ideje fejlesztés és üzemeltetés alatt áll, így megbízható stabilitással rendelkezik.

Tehát a lényegre térve

Képzelje el!

Mi lenne, ha a szolgáltatásai gyorsítótára a következő két feltételnek megfelelne?

  1. Gyakran lekérdezett adatokat kell a felhasználóknak a legfrissebb állapotban szolgáltatni, de a frissítés rendszertelen, ezért gyakran kell a gyorsítótárat frissíteni.
  2. A frissítés nem történik meg, de ugyanazt a gyorsítótár-adatot gyakran kell lekérdezni.

Az első esetre példa lehet egy bevásárlóközpont valós idejű népszerűségi rangsora. Ha a bevásárlóközpont valós idejű népszerűségi rangsorát sorted set-ként tároljuk, akkor ineffektív lenne a Redisből minden alkalommal beolvasni, amikor a felhasználó a főoldalra lép. A második esetre példa az árfolyamadatok: bár az árfolyamadatokat nagyjából 10 percenként teszik közzé, a tényleges lekérdezések nagyon gyakran fordulnak elő. Ráadásul a won-dollár, won-jen és won-jüan árfolyamokat nagyon gyakran kell lekérdezni a gyorsítótárból. Ezekben az esetekben az lenne a hatékonyabb működés, ha az API szerver külön gyorsítótárat tartana lokálisan, és ha az adatok megváltoznak, újra lekérdezné és frissítené a Rediset.

Akkor hogyan valósítható meg ez a működés az adatbázis - Redis - API szerver struktúrában?

Nem működne a Redis PubSub?

Amikor gyorsítótárat használunk, iratkozzunk fel egy csatornára, amelyen keresztül megkaphatjuk a frissítési értesítéseket!

  • Akkor létre kell hozni egy logikát, amely üzenetet küld a frissítéskor.
  • A PubSub miatt további műveletek kerülnek be, ami befolyásolja a teljesítményt.

pubsub-write

pubsub-read

Akkor mi van, ha a Redis érzékeli a változást?

Mi van, ha Keyspace Notificationt használunk, hogy értesítést kapjunk az adott kulcshoz tartozó parancsokról?

  • Kényelmetlenséget jelent, hogy előre el kell tárolni és megosztani a frissítéshez használt kulcsokat és parancsokat.
  • Például bonyolulttá válik, ha egy kulcs esetében egyszerű Set a frissítési parancs, míg egy másik kulcs esetében LPush, RPush, SAdd vagy SRem a frissítési parancs.
  • Ez drasztikusan megnöveli a kommunikációs hibák és az emberi hibák valószínűségét a fejlesztési folyamat során.

Mi van, ha Keyevent Notificationt használunk, hogy parancsonként értesítést kapjunk?

  • Az összes frissítéshez használt parancsra feliratkozni kell. Ezenkívül megfelelő szűrésre van szükség a bejövő kulcsok esetében.
  • Például, a Del által érkező összes kulcs közül valószínűleg néhányhoz az adott kliensnek nincs lokális gyorsítótára.
  • Ez felesleges erőforrás-pazarláshoz vezethet.

Ezért van szükség az Invalidation Message-re!

Mi az az Invalidation Message?

Az Invalidation Messages egy koncepció, amelyet a Redis 6.0-tól kezdve a Server Assisted Client-Side Cache részeként vezettek be. Az Invalidation Message a következő folyamat szerint kerül továbbításra:

  1. Feltételezzük, hogy a ClientB már egyszer elolvasta a kulcsot.
  2. A ClientA új értéket állít be az adott kulcshoz.
  3. A Redis érzékeli a változást, és Invalidation Message-t küld a ClientB-nek, értesítve azt, hogy törölje a gyorsítótárat.
  4. A ClientB fogadja az üzenetet, és megteszi a megfelelő intézkedéseket.

invalidation-message

Hogyan kell használni?

Alapvető működési struktúra

A Redishez csatlakoztatott kliens a CLIENT TRACKING ON REDIRECT <client-id> parancs futtatásával fogad invalidation message-eket. Az üzeneteket fogadni kívánó kliens pedig az SUBSCRIBE __redis__:invalidate paranccsal iratkozik fel az invalidation message-ek fogadására.

default tracking

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

broadcasting tracking

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementáció! Implementáció! Implementáció!

Redigo + Ristretto

Az ilyen magyarázatok után bizonytalan lehet, hogyan is kellene használni a kódban. Ezért először egyszerűen konfiguráljuk redigo és ristretto segítségével.

Először telepítse a két függőséget.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // a frekvenciáját követendő kulcsok száma (10M).
22		MaxCost:     1 << 30, // a gyorsítótár maximális költsége (1GB).
23		BufferItems: 64,      // Get pufferenkénti kulcsok száma.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Először egyszerűen létrehozzuk a RedisClient-et, amely tartalmazza a ristrettót és a redigót.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

A kód kissé bonyolult.

  • A Tracking funkcióhoz egy további kapcsolatot létesít. Ez egy óvintézkedés, figyelembe véve, hogy a PubSub akadályozhatja más műveleteket.
  • Lekérdezi a hozzáadott kapcsolat azonosítóját, majd a Tracking funkciót átirányítja erre a kapcsolatra azon a kapcsolaton keresztül, amelyen az adatokat lekérdezik.
  • Ezután feliratkozik az invalidation message-ekre.
  • Az előfizetés kezelésének kódja kissé bonyolult. Mivel a redigo nem tudja elemezni az érvénytelenítési üzeneteket, az elemzés előtti választ kell feldolgozni.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

A Get üzenet először lekérdezi a ristrettót, és ha nem találja, akkor a Redisből szerzi be.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

A teszteléshez szükséges kód a fenti. Ha kipróbálja, láthatja, hogy a Redisben lévő adatok frissülésekor az értékek is frissülnek.

De ez túl bonyolult. Sőt, a fürt skálázásához elkerülhetetlenül engedélyezni kell a Trackinget minden masteren vagy replikán.

Rueidis

Mivel Go nyelvet használunk, a legmodernebb és legfejlettebb rueidis áll rendelkezésünkre. Írjunk kódot, amely a rueidis-t használja a Redis fürt környezetében a server assisted client side cache alkalmazásához.

Először is telepítse a függőségeket.

  • github.com/redis/rueidis

Ezután írjon kódot az adatok lekérdezéséhez a Redisből.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

A rueidis-ben a client side cache használatához csupán DoCache szükséges. Ezután hozzáadja a lokális gyorsítótárhoz, és az is szabályozható, hogy mennyi ideig maradjon a lokális gyorsítótárban, és ugyanaz a DoCache hívás lekéri az adatokat a lokális gyorsítótárból. Természetesen az invalidation message-eket is megfelelően kezeli.

Miért nem redis-go?

A redis-go sajnos nem támogatja a server assisted client side cache-t hivatalos API-val. Sőt, amikor PubSub-ot hoz létre, új kapcsolatot hoz létre, és nincs API, amely közvetlenül hozzáférne ehhez a kapcsolathoz, így a kliens azonosítója sem ismert. Ezért úgy ítéltem meg, hogy a redis-go konfigurációja eleve lehetetlen, és kihagytam.

Szexi

A client side cache struktúrán keresztül

  • Ha az adatok előre elkészíthetők, akkor ez a struktúra minimálisra csökkentheti a Redis lekérdezéseit és forgalmát, miközben mindig a legfrissebb adatokat szolgáltatja.
  • Ezáltal egyfajta CQRS struktúra hozható létre, amely drasztikusan növeli az olvasási teljesítményt.

cqrs

Mennyivel szexibb lett?

Valójában a helyszínen már ilyen struktúrát használunk, ezért megnéztem a két API egyszerű késleltetését. Kérem, értse meg, hogy csak nagyon absztrakt módon tudom leírni.

  1. Első API
    1. Első lekérdezéskor: átlagosan 14.63ms
    2. Későbbi lekérdezésekkor: átlagosan 2.82ms
    3. Átlagos különbség: 10.98ms
  2. Második API
    1. Első lekérdezéskor: átlagosan 14.05ms
    2. Későbbi lekérdezésekkor: átlagosan 1.60ms
    3. Átlagos különbség: 11.57ms

Akár 82%-os további késleltetés-javulás is történt!

Valószínűleg a következő fejlesztések történtek:

  • A kliens és a Redis közötti hálózati kommunikáció kihagyása és a forgalom megtakarítása
  • A Redis által végrehajtandó olvasási parancsok számának csökkenése
    • Ez a írási teljesítményt is növeli.
  • A Redis protokoll elemzésének minimalizálása
    • A Redis protokoll elemzése sem nulla költséggel jár. Ennek csökkentése nagy lehetőség.

Azonban minden kompromisszum. Ehhez legalább a következő két dolgot áldoztuk fel:

  • A kliens oldali gyorsítótár-kezelő elem implementálásának, üzemeltetésének és karbantartásának szükségessége
  • Emiatt megnövekedett a kliens CPU- és memóriahasználata

Konklúzió

Személy szerint elégedett voltam az architektúra összetevőjével, és a késleltetés, valamint az API szerverre nehezedő terhelés is nagyon alacsony volt. A jövőben is szeretném, ha lehetséges, ilyen struktúrával épülnének fel az architektúrák.