Reaktiivisuuden parantaminen Redis-asiakaspuolen välimuistilla
Mitä Redis on?
Oletan, että harva ei tunne Redisiä. Kuitenkin, jos haluan lyhyesti mainita joitakin sen ominaisuuksia, ne voidaan tiivistää seuraavasti:
- Toiminnot suoritetaan yhdessä säikeessä, joten kaikki toiminnot ovat atomisia.
- Tiedot tallennetaan ja käsitellään In-Memory-muistiin, joten kaikki toiminnot ovat nopeita.
- Redis voi tallentaa WAL-tiedostoja asetuksista riippuen, mikä mahdollistaa uusimman tilan nopean varmuuskopioinnin ja palauttamisen.
- Se tukee monia eri tyyppejä, kuten Set, Hash, Bit, List, mikä takaa korkean tuottavuuden.
- Sillä on suuri yhteisö, jonka kautta voi jakautua erilaisia kokemuksia, ongelmia ja ratkaisuja.
- Sitä on kehitetty ja operoitu pitkään, mikä takaa luotettavan vakauden.
Sitten asiaan
Kuvitelkaa?
Mitä jos palvelunne välimuisti täyttäisi seuraavat kaksi ehtoa?
- Usein kysyttyjä tietoja on tarjottava käyttäjille tuoreimmassa tilassaan, mutta päivitykset ovat epäsäännöllisiä, mikä vaatii usein välimuistin päivitystä.
- Päivityksiä ei tapahdu, mutta samaa välimuistitietoa on haettava ja kysyttävä usein.
Ensimmäinen tapaus voisi olla verkkokaupan reaaliaikainen suosituimpien tuotteiden lista. Kun verkkokaupan reaaliaikainen suosituimpien tuotteiden lista tallennetaan sorted set -muotoon, on tehotonta, jos Redis lukee sen aina, kun käyttäjä avaa pääsivun. Toisessa tapauksessa, valuuttakurssitietojen osalta, vaikka valuuttakurssit julkaistaankin noin 10 minuutin välein, todellisia kyselyjä tapahtuu erittäin usein. Erityisesti won-dollari-, won-jeni- ja won-juan-kursseja kysytään välimuistista erittäin usein. Näissä tapauksissa olisi tehokkaampaa, jos API-palvelimella olisi erillinen paikallinen välimuisti, joka päivittyisi Redisistä, kun tieto muuttuu.
Miten tällainen toiminta voitaisiin toteuttaa tietokanta-Redis-API-palvelin -rakenteessa?
Eikö Redis PubSub riitä?
Kun käytät välimuistia, tilaa kanava, josta voit vastaanottaa päivityksiä!
- Silloin on luotava logiikka, joka lähettää viestejä päivityksen yhteydessä.
- PubSubin aiheuttama lisätoiminto vaikuttaa suorituskykyyn.
Entä jos Redis havaitsee muutokset?
Entä jos käytetään Keyspace Notificationia ja vastaanotetaan komentohälytyksiä kyseisestä avaimesta?
- On olemassa vaivaa, joka liittyy päivityksissä käytettävien avainten ja komentojen etukäteiseen tallentamiseen ja jakamiseen.
- Esimerkiksi, jollekin avaimelle yksinkertainen Set on päivityskomento, kun taas toiselle avaimelle LPush, RPush, SAdd tai SRem on päivityskomento, mikä tekee siitä monimutkaista.
- Tämä lisää merkittävästi mahdollisuutta viestintävirheisiin ja inhimillisiin virheisiin koodauksessa kehitysprosessin aikana.
Entä jos käytetään Keyevent Notificationia ja vastaanotetaan ilmoituksia komentoyksikköinä?
- Kaikki päivityksissä käytettävät komennot on tilattava. Saapuvista avaimista on suoritettava asianmukainen suodatus.
- Esimerkiksi, on todennäköistä, että joillakin asiakkailla ei ole paikallista välimuistia kaikista Del-komennolla saapuvista avaimista.
- Tämä voi johtaa tarpeettomaan resurssien tuhlaukseen.
Siksi Invalidation Message tarvitaan!
Mikä on Invalidation Message?
Invalidation Messages on Redis 6.0:sta alkaen tarjottu käsite, joka on osa Server Assisted Client-Side Cache -toimintoa. Invalidation Message välitetään seuraavalla tavalla:
- Oletetaan, että ClientB on jo lukenut avaimen kerran.
- ClientA asettaa kyseisen avaimen uudelleen.
- Redis havaitsee muutoksen ja julkaisee Invalidation Message -viestin ClientB:lle ilmoittaen ClientB:lle, että sen tulee tyhjentää välimuisti.
- ClientB vastaanottaa viestin ja ryhtyy asianmukaisiin toimiin.
Miten sitä käytetään?
Perustoimintarakenne
Redikseen yhdistetty asiakasohjelma suorittaa CLIENT TRACKING ON REDIRECT <client-id>
vastaanottaakseen invalidation message -viestejä. Viestejä vastaanottavan asiakasohjelman on tilattava SUBSCRIBE __redis__:invalidate
vastaanottaakseen invalidation message -viestejä.
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Toteutus! Toteutus! Toteutus!
Redigo + Ristretto
Jos selitän sen vain näin, on epäselvää, miten sitä tulisi käyttää koodissa. Siksi aloitan yksinkertaisella redigon
ja ristretton
kokoonpanolla.
Asenna ensin kaksi riippuvuutta.
github.com/gomodule/redigo
github.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // avainten lukumäärä, joiden taajuutta seurataan (10M).
22 MaxCost: 1 << 30, // välimuistin enimmäiskustannus (1GB).
23 BufferItems: 64, // avainten lukumäärä per Get-puskuri.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Ensin luodaan yksinkertainen RedisClient
, joka sisältää ristretton ja redigon.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
Koodi on hieman monimutkainen.
- Trackingin suorittamiseksi luodaan toinen yhteys. Tämä on toimenpide, jolla varmistetaan, ettei PubSub häiritse muita toimintoja.
- Lisätyn yhteyden ID haetaan, ja datan hakemiseen käytettävä yhteys ohjataan kyseiseen yhteyteen.
- Tämän jälkeen tilataan invalidation message -viestit.
- Tilausten käsittelykoodi on hieman monimutkainen. Koska redigo ei jäsenä invalidointiviestejä, on käsittely tehtävä ennen jäsennyksen jälkeistä vastausta.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Get
-viesti tarkistaa ensin ristretton, ja jos sitä ei löydy, se hakee sen Redisistä.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
Yllä oleva koodi on tarkoitettu testaamiseen. Jos testaat sitä, huomaat, että tiedot päivittyvät Redisistä aina, kun ne päivittyvät.
Mutta tämä on liian monimutkaista. Mikä tärkeintä, klusterin laajentamiseksi on välttämätöntä aktivoida seuranta kaikille mastereille tai replikoille.
Rueidis
Go-kieltä käyttäville meillä on käytettävissämme nykyaikaisin ja kehittynein rueidis
. Kirjoitan koodin, joka käyttää rueidisia Redis-klusteriympäristössä server assisted client side cache -toiminnolla.
Asenna ensin riippuvuudet.
github.com/redis/rueidis
Ja kirjoita koodi, joka hakee tietoja Redisistä.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
Rueidisissa client side cachea varten riittää DoCache
-kutsu. Silloin se lisätään paikalliseen välimuistiin sen säilytysajan kanssa, ja samalla DoCache
-kutsulla haetaan tiedot paikallisesta välimuistista. Luonnollisesti se käsittelee myös invalidointiviestit asianmukaisesti.
Miksi ei redis-go?
redis-go
ei valitettavasti tue virallista APIa server assisted client side cachelle. Se ei edes tarjoa APIa, jolla pääsisi suoraan käsiksi uuteen yhteyteen PubSubin luomisen yhteydessä, joten client ID:tä ei voi tietää. Siksi redis-go
päätettiin jättää väliin, koska sen konfigurointi katsottiin mahdottomaksi.
Seksikästä
client side cache -rakenteen kautta
- Jos tietoja voidaan valmistella etukäteen, tämä rakenne mahdollistaa uusimpien tietojen tarjoamisen minimoiden Rediksen kyselyt ja liikenteen.
- Tämä mahdollistaa eräänlaisen CQRS-rakenteen luomisen, joka parantaa merkittävästi lukusuorituskykyä.
Kuinka paljon seksikkäämmäksi se on muuttunut?
Olen todella käyttänyt tällaista rakennetta paikan päällä, joten tarkastelin lyhyesti kahden API:n latenssia. Pyydän ymmärrystä siitä, että en voi kirjoittaa siitä kovin abstraktisti.
- Ensimmäinen API
- Ensimmäinen kysely: keskimäärin 14.63ms
- Seuraavat kyselyt: keskimäärin 2.82ms
- Keskimääräinen ero: 10.98ms
- Toinen API
- Ensimmäinen kysely: keskimäärin 14.05ms
- Seuraavat kyselyt: keskimäärin 1.60ms
- Keskimääräinen ero: 11.57ms
Lisäviiveen parannus oli jopa 82%!
Odotan seuraavia parannuksia:
- Asiakkaan ja Rediksen välisen verkkoliikenteen ohittaminen ja liikenteen säästäminen
- Rediksen suoritettavien lukukomentojen määrän väheneminen
- Tämä parantaa myös kirjoitussuorituskykyä.
- Rediksen protokollan jäsennyksen minimointi
- Rediksen protokollan jäsennyksellä on myös kustannuksia, jotka eivät ole nolla. Tämän vähentäminen on suuri mahdollisuus.
Kuitenkin kaikki on kompromissi. Tämän vuoksi olemme joutuneet uhraamaan vähintään kaksi asiaa:
- Asiakaspuolen välimuistin hallintakomponentin toteutus, käyttö ja ylläpito ovat tarpeen.
- Tämä lisää asiakkaan CPU- ja muistinkäyttöä.
Johtopäätös
Henkilökohtaisesti olin tyytyväinen arkkitehtuurin komponenttiin, ja latenssi sekä API-palvelimen kuormitus olivat erittäin alhaiset. Toivon voivani jatkossakin rakentaa arkkitehtuuria tällä tavalla, jos mahdollista.