A válaszkészség javítása Redis kliensoldali gyorsítótárral
Mi az a Redis?
Gondolom, kevesen vannak, akik nem ismerik a Redis-t. Azonban, ha röviden szeretném összefoglalni néhány jellemzőjét, az alábbiak szerint tehetném:
- Egyetlen szálon futnak a műveletek, így minden művelet atomos tulajdonságokkal rendelkezik.
- Az adatok In-Memory tárolódnak és dolgozódnak fel, így minden művelet gyors.
- A Redis, opciótól függően, képes WAL (Write-Ahead Log) tárolására, ami lehetővé teszi az aktuális állapot gyors mentését és helyreállítását.
- Számos különböző típusú adatstruktúrát támogat, mint például Set, Hash, Bit, List, ezáltal magas hatékonyságot biztosítva.
- Nagy közösséggel rendelkezik, így széles körű tapasztalatokat, problémákat és megoldásokat oszthatunk meg.
- Hosszú ideje fejlesztik és üzemeltetik, így megbízható stabilitással bír.
És akkor a lényegre térve
Képzelje el!
Mi lenne, ha a szolgáltatásai gyorsítótára a következő két feltételnek felelne meg?
- Gyakran lekérdezett adatokat kell a felhasználóknak a legfrissebb állapotban biztosítani, de a frissítés rendszertelen, ezért gyakran kell frissíteni a gyorsítótárat.
- Nincs frissítés, de ugyanazt a gyorsítótárazott adatot gyakran kell lekérdezni.
Az első esetre példa lehet egy bevásárlóközpont valós idejű népszerűségi rangsora. Ha a bevásárlóközpont valós idejű népszerűségi rangsorát sorted set-ként tároljuk, akkor ineffektív lenne a Redis-ből olvasni minden alkalommal, amikor a felhasználó a főoldalra lép. A második esetre példa az árfolyamadatok, ahol az árfolyamadatokat körülbelül 10 percenként teszik közzé, de a tényleges lekérdezések nagyon gyakran történnek. Ráadásul a dél-koreai won-dollár, dél-koreai won-japán jen, dél-koreai won-kínai jüan árfolyamokat rendkívül gyakran lekérdezik a gyorsítótárból. Ezekben az esetekben hatékonyabb lenne, ha az API szerver külön gyorsítótárral rendelkezne helyben, és az adatok megváltozásakor újra lekérdezné a Redis-t a frissítéshez.
Hogyan lehetne akkor megvalósítani ezt a viselkedést egy adatbázis - Redis - API szerver struktúrában?
Nem működne a Redis PubSub?
A gyorsítótár használatakor iratkozzunk fel egy csatornára, amelyen keresztül értesítést kapunk a frissítésről!
- Akkor létre kell hozni egy logikát, amely üzenetet küld frissítés esetén.
- A PubSub miatti további műveletek befolyásolják a teljesítményt.


Akkor mi van, ha a Redis érzékeli a változásokat?
Mi van, ha a Keyspace Notificationt használjuk az adott kulcshoz tartozó parancsértesítések fogadására?
- Kényelmetlenséget jelent, hogy a frissítéshez használt kulcsokat és parancsokat előre el kell tárolni és megosztani.
- Például, egyes kulcsok esetén egy egyszerű Set a frissítési parancs, míg más kulcsoknál az LPush, RPush, SAdd vagy SRem a frissítési parancs, ami bonyolulttá teszi a rendszert.
- Ez jelentősen növeli a kommunikációs hibák és az emberi hibák valószínűségét a fejlesztési folyamat során.
Mi van, ha a Keyevent Notificationt használjuk parancsonkénti értesítések fogadására?
- Az összes frissítéshez használt parancsra fel kell iratkozni. Ehhez megfelelő szűrésre van szükség a beérkező kulcsok tekintetében.
- Például, a Del paranccsal érkező kulcsok közül néhányra valószínűleg nem rendelkezik lokális gyorsítótárral az adott kliens.
- Ez felesleges erőforrás-pazarláshoz vezethet.
Ezért van szükség Invalidation Message-re!
Mi az az Invalidation Message?
Az Invalidation Messages a Redis 6.0-tól kezdve bevezetett Server Assisted Client-Side Cache részeként elérhető koncepció. Az Invalidation Message a következő folyamat szerint kerül átadásra:
- Feltételezzük, hogy a ClientB már egyszer elolvasta a kulcsot.
- A ClientA új értéket állít be az adott kulcshoz.
- A Redis érzékeli a változást, és Invalidation Message-et küld a ClientB-nek, értesítve arról, hogy törölje a gyorsítótárat.
- A ClientB fogadja az üzenetet, és megteszi a megfelelő intézkedéseket.

Hogyan kell használni?
Alapvető működési struktúra
A Redis-hez csatlakozó kliens a CLIENT TRACKING ON REDIRECT <client-id> parancs végrehajtásával kap invalidation message-et. A message-t fogadni kívánó kliens pedig a SUBSCRIBE __redis__:invalidate paranccsal iratkozik fel az invalidation message-ek fogadására.
alapértelmezett követés
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
sugárzott követés
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Megvalósítás! Megvalósítás! Megvalósítás!
Redigo + Ristretto
Ha csak így magyarázom el, akkor a kódban való tényleges használata bizonytalan lehet. Ezért először egyszerűen a redigo és a ristretto segítségével fogom felépíteni.
Először telepítse a két függőséget.
github.com/gomodule/redigogithub.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // number of keys to track frequency of (10M).
22 MaxCost: 1 << 30, // maximum cost of cache (1GB).
23 BufferItems: 64, // number of keys per Get buffer.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Először egyszerűen létrehozunk egy RedisClient-et, amely tartalmazza a ristretto-t és a redigo-t.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
A kód kissé bonyolult.
- A Tracking-hez egy további kapcsolatot létesít. Ez azért van, hogy a PubSub ne zavarja más műveleteket.
- Lekérdezi a hozzáadott kapcsolat azonosítóját, majd a Tracking-et átirányítja az adott kapcsolatra, ahol az adatokat lekérdezik.
- Ezt követően feliratkozik az invalidation message-ekre.
- Az előfizetést kezelő kód kissé bonyolult. Mivel a redigo nem tudja értelmezni az érvénytelenítési üzeneteket, a válaszokat az értelmezés előtt kell feldolgozni.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
A Get üzenet a következőképpen először a ristretto-t kérdezi le, és ha nem találja, akkor a Redis-ből kéri le.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
A teszteléshez szükséges kód a fenti. Ha kipróbálja, láthatja, hogy a Redis-ben az adatok frissülésekor az értékek is frissülnek.
Ez azonban túl bonyolult. Ráadásul a fürt kiterjesztéséhez elkerülhetetlenül minden master vagy replika számára engedélyezni kell a Tracking-et.
Rueidis
A Go nyelv használata esetén rendelkezésünkre áll a legmodernebb és legfejlettebb rueidis. Írjunk egy kódot, amely a rueidis-t használja a szerver által támogatott kliensoldali gyorsítótárazáshoz Redis klaszter környezetben.
Először is, telepítse a függőséget.
github.com/redis/rueidis
Ezután írjon egy kódot a Redis adatok lekérdezéséhez.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
A rueidis-ben a kliensoldali gyorsítótár használatához csak a DoCache függvényt kell meghívni. Ezután hozzáadja a helyi gyorsítótárhoz, beleértve azt is, hogy mennyi ideig kell azt megtartani a helyi gyorsítótárban, és ha ugyanazt a DoCache függvényt hívja meg, akkor a helyi gyorsítótárból kérdezi le az adatokat. Természetesen az érvénytelenítési üzeneteket is megfelelően kezeli.
Miért nem redis-go?
A redis-go sajnos nem támogatja a szerver által támogatott kliensoldali gyorsítótárazást hivatalos API-n keresztül. Sőt, amikor PubSub-ot hoz létre, és új kapcsolatot létesít, nincs olyan API, amely közvetlenül hozzáférne ahhoz a kapcsolathoz, így a kliens azonosítóját sem lehet megkapni. Ezért úgy döntöttünk, hogy a redis-go konfigurációja eleve lehetetlen, és kihagytuk.
Milyen szexi!
A kliensoldali gyorsítótár struktúrán keresztül
- Ha az adatok előre előkészíthetők, ez a struktúra minimalizálhatja a Redis-re irányuló lekérdezéseket és forgalmat, miközben mindig a legfrissebb adatokat biztosítja.
- Ezen keresztül egyfajta CQRS struktúra hozható létre, ami drasztikusan növelheti az olvasási teljesítményt.

Mennyivel lett szexibb?
Valójában a helyszínen már használják ezt a struktúrát, ezért megvizsgáltam a két API egyszerű késleltetését. Kérem, bocsássa meg, hogy csak nagyon absztrakt módon tudom megfogalmazni.
- Első API
- Első lekérdezéskor: átlagosan 14,63 ms
- Későbbi lekérdezéskor: átlagosan 2,82 ms
- Átlagos különbség: 10,98 ms
- Második API
- Első lekérdezéskor: átlagosan 14,05 ms
- Későbbi lekérdezéskor: átlagosan 1,60 ms
- Átlagos különbség: 11,57 ms
Akár 82%-os további késleltetés-javulás is tapasztalható volt!
Várhatóan a következő fejlesztések történtek:
- A kliens és a Redis közötti hálózati kommunikáció kihagyása és a forgalom megtakarítása.
- A Redis által végrehajtandó olvasási parancsok számának csökkenése.
- Ez a írási teljesítményt is javítja.
- A Redis protokoll elemzésének minimalizálása.
- A Redis protokoll elemzése sem nulla költségű. Ennek csökkentése nagy lehetőség.
Azonban minden kompromisszum. Ehhez legalább a következő két dolgot áldoztuk fel:
- A kliensoldali gyorsítótár-kezelő elem implementálása, üzemeltetése és karbantartása.
- Emiatt megnövekedett a kliens CPU- és memóriahasználata.
Konklúzió
Személy szerint elégedett voltam az architektúra felépítésével, és a késleltetés, valamint az API szerverre nehezedő stressz is nagyon alacsony volt. A jövőben is, ha lehetséges, ilyen struktúrával szeretném felépíteni az architektúrát.