Meningkatkan Responsivitas dengan Redis Client-Side Caching
Apa itu Redis?
Saya rasa tidak banyak orang yang tidak mengenal Redis. Namun, jika saya harus meringkasnya secara singkat dengan beberapa karakteristik, maka dapat diuraikan sebagai berikut:
- Operasi dilakukan dalam satu thread, sehingga semua operasi memiliki atomisitas.
- Data disimpan dan dioperasikan dalam In-Memory, sehingga semua operasi cepat.
- Redis dapat menyimpan WAL tergantung pada opsi, sehingga dapat dengan cepat mencadangkan dan memulihkan status terbaru.
- Mendukung berbagai tipe seperti Set, Hash, Bit, List, sehingga memiliki produktivitas yang tinggi.
- Memiliki komunitas besar, sehingga berbagai pengalaman, isu, dan solusi dapat dibagikan.
- Telah dikembangkan dan dioperasikan untuk waktu yang lama, sehingga memiliki stabilitas yang dapat diandalkan.
Jadi, ke intinya
Bayangkan?
Bagaimana jika cache layanan Anda memenuhi dua kondisi berikut?
- Data yang sering diakses harus disediakan kepada pengguna dalam kondisi terbaru, tetapi pembaharuannya tidak teratur sehingga cache harus sering diperbarui.
- Pembaharuan tidak terjadi, tetapi data cache yang sama sering diakses dan dicari.
Kasus pertama dapat mempertimbangkan peringkat popularitas real-time di pusat perbelanjaan. Ketika peringkat popularitas real-time pusat perbelanjaan disimpan sebagai sorted set, tidak efisien jika Redis membacanya setiap kali pengguna mengakses halaman utama. Kasus kedua adalah data nilai tukar, meskipun data nilai tukar diumumkan kira-kira setiap 10 menit, pencarian aktual terjadi sangat sering. Terutama untuk Won-Dolar, Won-Yen, dan Won-Yuan, cache akan sering dicari. Dalam kasus-kasus seperti ini, akan lebih efisien jika server API memiliki cache terpisah secara lokal, dan ketika data berubah, ia mencari Redis lagi untuk memperbarui.
Lalu, bagaimana kita bisa mengimplementasikan operasi semacam ini dalam struktur database - Redis - server API?
Apakah tidak bisa dengan Redis PubSub?
Saat menggunakan cache, berlanggananlah saluran yang dapat menerima status pembaruan!
- Maka Anda harus membuat logika untuk mengirim pesan saat pembaruan.
- Ada operasi tambahan karena PubSub, yang memengaruhi kinerja.
Lalu bagaimana jika Redis mendeteksi perubahan?
Bagaimana jika kita menggunakan Keyspace Notification untuk menerima notifikasi perintah untuk kunci tersebut?
- Ada kerumitan dalam menyimpan dan membagikan kunci dan perintah yang digunakan untuk pembaruan terlebih dahulu.
- Misalnya, untuk beberapa kunci, Set sederhana adalah perintah pembaruan, sementara untuk kunci lain, LPush, RPush, SAdd, atau SRem adalah perintah pembaruan, yang membuatnya kompleks.
- Ini secara signifikan meningkatkan kemungkinan terjadinya kesalahan komunikasi dan kesalahan manusia dalam pengkodean selama proses pengembangan.
Bagaimana jika kita menggunakan Keyevent Notification untuk menerima notifikasi per perintah?
- Diperlukan langganan untuk semua perintah yang digunakan dalam pembaruan. Diperlukan penyaringan yang tepat untuk kunci yang masuk dari sana.
- Misalnya, dari semua kunci yang masuk dengan Del, ada kemungkinan besar bahwa klien tertentu tidak memiliki cache lokal.
- Ini dapat menyebabkan pemborosan sumber daya yang tidak perlu.
Maka yang dibutuhkan adalah Invalidation Message!
Apa itu Invalidation Message?
Invalidation Messages adalah konsep yang disediakan sebagai bagian dari Server Assisted Client-Side Cache yang ditambahkan sejak Redis 6.0. Invalidation Message disampaikan dalam alur berikut.
- Asumsikan ClientB sudah membaca kunci sekali.
- ClientA mengatur kunci tersebut dengan nilai baru.
- Redis mendeteksi perubahan dan menerbitkan Invalidation Message ke ClientB untuk memberi tahu ClientB agar menghapus cache.
- ClientB menerima pesan tersebut dan mengambil tindakan yang sesuai.
Cara menggunakannya
Struktur Operasi Dasar
Klien yang terhubung ke Redis menerima pesan invalidation dengan menjalankan CLIENT TRACKING ON REDIRECT <client-id>
. Dan klien yang harus menerima pesan berlangganan SUBSCRIBE __redis__:invalidate
untuk menerima pesan invalidation.
default tracking
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
broadcasting tracking
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Implementasi! Implementasi! Implementasi!
Redigo + Ristretto
Penjelasan seperti itu mungkin agak ambigu tentang bagaimana cara menggunakannya dalam kode sebenarnya. Jadi, mari kita konfigurasikan terlebih dahulu dengan redigo
dan ristretto
secara sederhana.
Pertama, instal dua dependensi.
github.com/gomodule/redigo
github.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // jumlah kunci yang akan dilacak frekuensinya (10M).
22 MaxCost: 1 << 30, // biaya maksimum cache (1GB).
23 BufferItems: 64, // jumlah kunci per Get buffer.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Pertama, buat RedisClient
yang secara sederhana menyertakan ristretto dan redigo.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
Kodenya sedikit rumit.
- Untuk melakukan Tracking, satu koneksi lagi dibuat. Ini adalah tindakan yang diambil dengan mempertimbangkan bahwa PubSub akan mengganggu operasi lain.
- ID koneksi yang ditambahkan dicari, dan koneksi yang akan mencari data diarahkan untuk melacak ke koneksi tersebut.
- Dan berlangganan pesan invalidation.
- Kode untuk menangani langganan sedikit rumit. Karena redigo tidak dapat mengurai pesan invalidation, respons harus diterima dan diproses sebelum penguraian.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Pesan Get
pertama-tama mencari ristretto, dan jika tidak ditemukan, mengambilnya dari Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
Kode untuk pengujian adalah seperti di atas. Jika Anda mengujinya, Anda akan dapat melihat bahwa nilai diperbarui setiap kali data diperbarui di Redis.
Namun, ini terlalu rumit. Yang terpenting, untuk memperluas ke klaster, mau tidak mau perlu mengaktifkan Tracking untuk setiap master, atau replika.
Rueidis
Selama kita menggunakan bahasa Go, kita memiliki rueidis
yang paling modern dan maju. Mari kita tulis kode yang menggunakan cache sisi klien yang dibantu server di lingkungan klaster Redis menggunakan rueidis.
Pertama, instal dependensinya.
github.com/redis/rueidis
Kemudian, tulis kode untuk mencari data di Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
Untuk menggunakan cache sisi klien di rueidis, Anda hanya perlu DoCache
. Kemudian, ia akan menambahkan ke cache lokal, termasuk berapa lama akan disimpan di cache lokal, dan jika DoCache
dipanggil lagi, ia akan mengambil data dari cache lokal. Tentu saja, pesan invalidation juga ditangani dengan benar.
Mengapa bukan redis-go?
redis-go
sayangnya tidak mendukung cache sisi klien yang dibantu server sebagai API resmi. Bahkan, tidak ada API untuk langsung mengakses koneksi baru yang dibuat saat membuat PubSub, sehingga ID klien tidak dapat diketahui. Oleh karena itu, redis-go
dianggap tidak mungkin untuk dikonfigurasi dan dilewati.
Sungguh Menarik
Melalui struktur cache sisi klien
- Jika data dapat disiapkan terlebih dahulu, struktur ini dapat meminimalkan kueri dan lalu lintas ke Redis sambil selalu menyediakan data terbaru.
- Melalui ini, semacam struktur CQRS dapat dibuat untuk meningkatkan kinerja pembacaan secara drastis.
Seberapa menariknya sekarang?
Sebenarnya, karena struktur semacam ini sedang digunakan di lapangan, saya mencari latensi sederhana untuk kedua API tersebut. Mohon maaf karena saya hanya bisa menulisnya secara abstrak.
- API pertama
- Saat pencarian awal: rata-rata 14.63ms
- Saat pencarian berikutnya: rata-rata 2.82ms
- Perbedaan rata-rata: 10.98ms
- API kedua
- Saat pencarian awal: rata-rata 14.05ms
- Saat pencarian berikutnya: rata-rata 1.60ms
- Perbedaan rata-rata: 11.57ms
Ada peningkatan latensi tambahan hingga sekitar 82%!
Diharapkan ada peningkatan berikut:
- Penghapusan proses komunikasi jaringan antara klien dan Redis serta penghematan lalu lintas
- Pengurangan jumlah perintah baca yang harus dilakukan oleh Redis itu sendiri
- Ini juga memiliki efek meningkatkan kinerja penulisan.
- Minimalisasi parsing protokol Redis
- Parsing protokol Redis juga memiliki biaya. Menguranginya adalah peluang besar.
Namun, semuanya adalah trade-off. Untuk ini, kita mengorbankan setidaknya dua hal berikut:
- Kebutuhan untuk mengimplementasikan, mengoperasikan, dan memelihara elemen manajemen cache sisi klien
- Peningkatan penggunaan CPU dan memori klien akibat hal ini
Kesimpulan
Secara pribadi, ini adalah komponen arsitektur yang memuaskan, dan latensi serta tekanan pada server API sangat rendah. Saya berpikir bahwa jika memungkinkan, akan baik untuk mengkonfigurasi arsitektur dengan struktur seperti ini di masa mendatang.