Meningkatkan Responsivitas dengan Redis Client-Side Caching
Apa itu Redis?
Saya rasa tidak banyak orang yang tidak mengenal Redis. Namun, jika saya harus meringkasnya secara singkat dengan beberapa karakteristik, saya dapat menyimpulkannya sebagai berikut:
- Operasi dilakukan dalam satu utas, sehingga semua operasi memiliki atomisitas.
- Data disimpan dan dioperasikan dalam memori, sehingga semua operasi cepat.
- Redis dapat menyimpan WAL tergantung pada opsi, sehingga status terbaru dapat dicadangkan dan dipulihkan dengan cepat.
- Mendukung berbagai tipe seperti Set, Hash, Bit, List, sehingga memiliki produktivitas yang tinggi.
- Memiliki komunitas besar, sehingga berbagai pengalaman, isu, dan solusi dapat dibagikan.
- Telah dikembangkan dan dioperasikan untuk waktu yang lama, sehingga memiliki stabilitas yang dapat diandalkan.
Jadi, ke pokok bahasan
Bayangkan?
Bagaimana jika cache layanan Anda memenuhi dua kondisi berikut?
- Anda perlu menyediakan data yang sering diakses kepada pengguna dalam kondisi terbaru, tetapi pembaruannya tidak teratur sehingga Anda harus sering memperbarui cache.
- Pembaruan tidak terjadi, tetapi Anda sering mengakses data cache yang sama untuk kueri.
Untuk kasus pertama, kita bisa mempertimbangkan peringkat popularitas real-time di pusat perbelanjaan. Jika peringkat popularitas real-time pusat perbelanjaan disimpan sebagai sorted set, tidak efisien jika Redis membacanya setiap kali pengguna mengakses halaman utama. Untuk kasus kedua, mengenai data nilai tukar, meskipun data nilai tukar diumumkan setiap sekitar 10 menit, kueri aktual terjadi sangat sering. Terlebih lagi, untuk Won-Dolar, Won-Yen, dan Won-Yuan, cache akan diakses sangat sering. Dalam kasus-kasus ini, akan lebih efisien jika server API memiliki cache terpisah secara lokal, dan memperbarui cache dengan melakukan kueri ulang ke Redis ketika data berubah.
Lalu, bagaimana kita bisa mengimplementasikan perilaku ini dalam struktur database - Redis - server API?
Tidak bisakah dengan Redis PubSub?
Saat menggunakan cache, berlanggananlah ke saluran yang dapat menerima status pembaruan!
- Maka kita perlu membuat logika untuk mengirim pesan saat pembaruan terjadi.
- Karena adanya operasi tambahan dari PubSub, hal ini akan memengaruhi kinerja.


Lalu bagaimana jika Redis mendeteksi perubahan?
Bagaimana jika menggunakan Keyspace Notification untuk menerima notifikasi perintah untuk kunci tertentu?
- Ada kerumitan dalam menyimpan dan membagikan kunci dan perintah yang digunakan untuk pembaruan sebelumnya.
- Misalnya, untuk beberapa kunci, Set sederhana adalah perintah pembaruan, dan untuk kunci lain, LPush, atau RPush, atau SAdd dan SRem menjadi perintah pembaruan, yang membuatnya rumit.
- Hal ini secara signifikan meningkatkan kemungkinan miskomunikasi dalam proses pengembangan dan kesalahan manusia dalam pengkodean.
Bagaimana jika menggunakan Keyevent Notification untuk menerima notifikasi berdasarkan perintah?
- Diperlukan langganan untuk semua perintah yang digunakan untuk pembaruan. Filter yang sesuai diperlukan untuk kunci yang masuk.
- Misalnya, kemungkinan besar klien tidak memiliki cache lokal untuk beberapa kunci dari semua kunci yang masuk dengan Del.
- Hal ini dapat menyebabkan pemborosan sumber daya yang tidak perlu.
Oleh karena itu, yang dibutuhkan adalah Invalidation Message!
Apa itu Invalidation Message?
Invalidation Messages adalah konsep yang diperkenalkan sebagai bagian dari Server Assisted Client-Side Cache yang ditambahkan sejak Redis 6.0. Invalidation Message dikirimkan dengan alur sebagai berikut:
- Asumsikan ClientB telah membaca kunci sekali.
- ClientA mengatur kunci tersebut dengan nilai baru.
- Redis mendeteksi perubahan dan menerbitkan Invalidation Message ke ClientB untuk memberitahu ClientB agar menghapus cache.
- ClientB menerima pesan tersebut dan mengambil tindakan yang sesuai.

Bagaimana cara menggunakannya?
Struktur Operasi Dasar
Klien yang terhubung ke Redis menerima invalidation message dengan menjalankan CLIENT TRACKING ON REDIRECT <client-id>. Dan klien yang harus menerima pesan berlangganan untuk menerima invalidation message dengan SUBSCRIBE __redis__:invalidate.
Pelacakan default
1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"
Pelacakan broadcasting
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"
Implementasi! Implementasi! Implementasi!
Redigo + Ristretto
Jika hanya dijelaskan seperti itu, mungkin agak ambigu bagaimana cara menggunakannya dalam kode. Jadi, mari kita buat konfigurasi sederhana terlebih dahulu dengan redigo dan ristretto.
Pertama, instal dua dependensi.
github.com/gomodule/redigogithub.com/dgraph-io/ristretto
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "github.com/dgraph-io/ristretto"
10 "github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14 conn redis.Conn
15 cache *ristretto.Cache[string, any]
16 addr string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20 cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21 NumCounters: 1e7, // jumlah kunci yang akan dilacak frekuensinya (10 juta).
22 MaxCost: 1 << 30, // biaya maksimum cache (1GB).
23 BufferItems: 64, // jumlah kunci per buffer Get.
24 })
25 if err != nil {
26 return nil, fmt.Errorf("failed to generate cache: %w", err)
27 }
28
29 conn, err := redis.Dial("tcp", addr)
30 if err != nil {
31 return nil, fmt.Errorf("failed to connect to redis: %w", err)
32 }
33
34 return &RedisClient{
35 conn: conn,
36 cache: cache,
37 addr: addr,
38 }, nil
39}
40
41func (r *RedisClient) Close() error {
42 err := r.conn.Close()
43 if err != nil {
44 return fmt.Errorf("failed to close redis connection: %w", err)
45 }
46
47 return nil
48}
Pertama, buat RedisClient yang secara sederhana mencakup ristretto dan redigo.
1func (r *RedisClient) Tracking(ctx context.Context) error {
2 psc, err := redis.Dial("tcp", r.addr)
3 if err != nil {
4 return fmt.Errorf("failed to connect to redis: %w", err)
5 }
6
7 clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
8 if err != nil {
9 return fmt.Errorf("failed to get client id: %w", err)
10 }
11 slog.Info("client id", "id", clientId)
12
13 subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14 if err != nil {
15 return fmt.Errorf("failed to enable tracking: %w", err)
16 }
17 slog.Info("subscription result", "result", subscriptionResult)
18
19 if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20 return fmt.Errorf("failed to subscribe: %w", err)
21 }
22 psc.Flush()
23
24 for {
25 msg, err := psc.Receive()
26 if err != nil {
27 return fmt.Errorf("failed to receive message: %w", err)
28 }
29
30 switch msg := msg.(type) {
31 case redis.Message:
32 slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33 key := string(msg.Data)
34 r.cache.Del(key)
35 case redis.Subscription:
36 slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37 case error:
38 return fmt.Errorf("error: %w", msg)
39 case []interface{}:
40 if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41 slog.Warn("unexpected message", "message", msg)
42 continue
43 }
44
45 contents := msg[2].([]interface{})
46 keys := make([]string, len(contents))
47 for i, key := range contents {
48 keys[i] = string(key.([]byte))
49 r.cache.Del(keys[i])
50 }
51 slog.Info("received invalidation message", "keys", keys)
52 default:
53 slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54 }
55 }
56}
Kodenya sedikit rumit.
- Untuk melakukan Tracking, satu koneksi tambahan dibuat. Ini adalah tindakan yang diambil dengan mempertimbangkan bahwa PubSub dapat mengganggu operasi lain.
- ID koneksi yang ditambahkan dicari, dan Tracking dialihkan ke koneksi tersebut dari koneksi yang akan melakukan kueri data.
- Dan kemudian berlangganan invalidation message.
- Kode untuk memproses langganan agak rumit. Karena redigo tidak dapat mengurai pesan invalidasi, perlu untuk menerima respons sebelum penguraian dan memprosesnya.
1func (r *RedisClient) Get(key string) (any, error) {
2 val, found := r.cache.Get(key)
3 if found {
4 switch v := val.(type) {
5 case int64:
6 slog.Info("cache hit", "key", key)
7 return v, nil
8 default:
9 slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10 }
11 }
12 slog.Info("cache miss", "key", key)
13
14 val, err := redis.Int64(r.conn.Do("GET", key))
15 if err != nil {
16 return nil, fmt.Errorf("failed to get key: %w", err)
17 }
18
19 r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20 return val, nil
21}
Pesan Get terlebih dahulu mengkueri Ristretto seperti ini, dan jika tidak ditemukan, mengambilnya dari Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9)
10
11func main() {
12 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13 defer cancel()
14
15 client, err := NewRedisClient("localhost:6379")
16 if err != nil {
17 panic(err)
18 }
19 defer client.Close()
20
21 go func() {
22 if err := client.Tracking(ctx); err != nil {
23 slog.Error("failed to track invalidation message", "error", err)
24 }
25 }()
26
27 ticker := time.NewTicker(1 * time.Second)
28 defer ticker.Stop()
29 done := ctx.Done()
30
31 for {
32 select {
33 case <-done:
34 slog.Info("shutting down")
35 return
36 case <-ticker.C:
37 v, err := client.Get("key")
38 if err != nil {
39 slog.Error("failed to get key", "error", err)
40 return
41 }
42 slog.Info("got key", "value", v)
43 }
44 }
45}
Kode untuk pengujian adalah seperti di atas. Jika Anda mencobanya, Anda akan dapat mengonfirmasi bahwa nilai diperbarui setiap kali data diperbarui di Redis.
Namun, ini terlalu rumit. Yang terpenting, untuk memperluas ke kluster, pasti ada kebutuhan untuk mengaktifkan Tracking pada semua master, atau replika.
Rueidis
Bagi kita yang menggunakan bahasa Go, kita memiliki rueidis yang paling modern dan berkembang. Mari kita tulis kode yang menggunakan cache sisi klien yang dibantu server di lingkungan kluster Redis menggunakan rueidis.
Pertama, instal dependensinya.
github.com/redis/rueidis
Kemudian, tulis kode untuk mengambil data dari Redis.
1package main
2
3import (
4 "context"
5 "log/slog"
6 "os"
7 "os/signal"
8 "time"
9
10 "github.com/redis/rueidis"
11)
12
13func main() {
14 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15 defer cancel()
16
17 client, err := rueidis.NewClient(rueidis.ClientOption{
18 InitAddress: []string{"localhost:6379"},
19 })
20 if err != nil {
21 panic(err)
22 }
23
24 ticker := time.NewTicker(1 * time.Second)
25 defer ticker.Stop()
26 done := ctx.Done()
27
28 for {
29 select {
30 case <-done:
31 slog.Info("shutting down")
32 return
33 case <-ticker.C:
34 const key = "key"
35 resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36 if resp.Error() != nil {
37 slog.Error("failed to get key", "error", resp.Error())
38 continue
39 }
40 i, err := resp.AsInt64()
41 if err != nil {
42 slog.Error("failed to convert response to int64", "error", err)
43 continue
44 }
45 switch resp.IsCacheHit() {
46 case true:
47 slog.Info("cache hit", "key", key)
48 case false:
49 slog.Info("missed key", "key", key)
50 }
51 slog.Info("got key", "value", i)
52 }
53 }
54}
Di rueidis, untuk menggunakan cache sisi klien, kita hanya perlu melakukan DoCache. Kemudian, data ditambahkan ke cache lokal dengan berapa lama akan disimpan, dan jika DoCache dipanggil lagi, data akan diambil dari dalam cache lokal. Tentu saja, pesan invalidasi juga ditangani dengan benar.
Mengapa tidak redis-go?
redis-go sayangnya tidak mendukung cache sisi klien yang dibantu server sebagai API resmi. Bahkan, saat membuat PubSub, tidak ada API untuk mengakses koneksi baru secara langsung, sehingga client id tidak dapat diketahui. Oleh karena itu, kami memutuskan untuk melewati redis-go karena dianggap tidak mungkin untuk dikonfigurasi.
Sangat menarik
Melalui struktur cache sisi klien
- Jika data dapat disiapkan sebelumnya, struktur ini dapat meminimalkan kueri dan lalu lintas ke Redis, sambil selalu menyediakan data terbaru.
- Melalui ini, semacam struktur CQRS dapat dibuat untuk secara signifikan meningkatkan kinerja baca.

Seberapa menariknya sekarang?
Sebenarnya, karena struktur ini sudah digunakan di lapangan, saya mencari latensi sederhana untuk dua API. Mohon maaf karena saya hanya bisa menuliskannya secara sangat abstrak.
- API pertama
- Kueri pertama: rata-rata 14.63ms
- Kueri berikutnya: rata-rata 2.82ms
- Perbedaan rata-rata: 10.98ms
- API kedua
- Kueri pertama: rata-rata 14.05ms
- Kueri berikutnya: rata-rata 1.60ms
- Perbedaan rata-rata: 11.57ms
Ada peningkatan latensi tambahan hingga sekitar 82%!
Saya berharap ada peningkatan sebagai berikut:
- Penghilangan proses komunikasi jaringan antara klien dan Redis serta penghematan lalu lintas
- Pengurangan jumlah perintah baca yang harus dilakukan oleh Redis itu sendiri
- Ini juga memiliki efek meningkatkan kinerja tulis.
- Minimalisasi penguraian protokol Redis
- Mengurai protokol Redis juga memiliki biaya. Menguranginya adalah peluang besar.
Namun, semuanya adalah trade-off. Untuk ini, kami mengorbankan setidaknya dua hal di bawah ini:
- Kebutuhan untuk mengimplementasikan, mengoperasikan, dan memelihara elemen manajemen cache sisi klien
- Peningkatan penggunaan CPU dan memori klien akibat hal ini
Kesimpulan
Secara pribadi, ini adalah komponen arsitektur yang memuaskan, dan latensi serta stres pada server API sangat rendah. Saya berpikir bahwa akan baik jika dapat mengkonfigurasi arsitektur dengan struktur seperti ini di masa mendatang.