GoSuda

Redis İstemci Tarafı Önbelleği ile Duyarlılığı Artırma

By snowmerak
views ...

Redis Nedir?

Redis'i bilmeyen pek kimse olmadığını düşünüyorum. Ancak yine de birkaç özelliğini kısaca belirterek geçecek olursak, şu şekilde özetlenebilir:

  • İşlemler tek iş parçacığında (single thread) gerçekleştirildiği için tüm işlemler atomiktir.
  • Veriler In-Memory'de depolanıp işlendiği için tüm işlemler hızlıdır.
  • Redis, seçeneğe bağlı olarak WAL'ı depolayabilir, bu sayede en güncel durumu hızlıca yedekleyebilir ve kurtarabilir.
  • Set, Hash, Bit, List gibi çeşitli tipleri destekleyerek yüksek üretkenlik sağlar.
  • Büyük bir topluluğa sahip olduğu için çeşitli deneyimler, sorunlar ve çözüm yolları paylaşılabilir.
  • Uzun süredir geliştirilip işletildiği için güvenilir bir istikrara sahiptir.

Esas Konuya Gelecek Olursak

Hayal Edin

Hizmetinizin önbelleği aşağıdaki iki koşulu karşılıyorsa ne olurdu?

  1. Sıkça sorgulanan verileri kullanıcılara en güncel haliyle sunmak gerektiğinde, ancak güncellemeler düzensiz olduğu için önbelleğin sık sık yenilenmesi gerektiğinde.
  2. Güncelleme olmasa da aynı önbellek verilerine sıkça erişilmesi ve sorgulanması gerektiğinde.

İlk durum için alışveriş sitelerinin gerçek zamanlı popülerlik sıralamaları düşünülebilir. Alışveriş sitesinin gerçek zamanlı popülerlik sıralamasını sorted set olarak kaydettiğimizde, Redis'ten kullanıcının ana sayfaya her eriştiğinde okuması verimsiz olacaktır. İkinci durum için döviz kuru verileri ele alınabilir; yaklaşık 10 dakikada bir döviz kuru açıklansa bile, gerçek sorgulamalar çok sık gerçekleşir. Özellikle Türk Lirası-Dolar, Türk Lirası-Yen ve Türk Lirası-Yuan için önbellek çok sık sorgulanır. Bu gibi durumlarda, API sunucusunun yerel olarak ayrı bir önbelleğe sahip olması ve veri değiştiğinde Redis'i tekrar sorgulayarak güncellemeyi tercih etmesi daha verimli bir çalışma olacaktır.

Peki, veri tabanı - Redis - API sunucusu yapısında bu tür bir işlemi nasıl uygulayabiliriz?

Redis PubSub ile Olmaz mı?

Önbelleği kullanırken, güncelleme durumunu alabileceğimiz bir kanala abone olalım!

  • Bu durumda, güncelleme anında mesaj gönderecek bir mantık oluşturmak gerekir.
  • PubSub'dan kaynaklanan ek işlemler performansı etkiler.

pubsub-write

pubsub-read

Peki ya Redis Değişikliği Algılarsa?

Keyspace Notification kullanarak ilgili anahtar için komut bildirimleri alınırsa?

  • Güncelleme için kullanılan anahtarları ve komutları önceden kaydetme ve paylaşma zahmeti mevcuttur.
  • Örneğin, bazı anahtarlar için basit bir Set güncelleme komutu iken, bazıları için LPush, RPush veya SAdd ve SRem gibi komutlar güncelleme komutu haline gelebilir, bu da karmaşıklığı artırır.
  • Bu durum, geliştirme sürecinde iletişim hataları ve kodlamada insan hatası olasılığını önemli ölçüde artırır.

Keyevent Notification kullanarak komut bazında bildirimler alınırsa?

  • Güncelleme için kullanılan tüm komutlara abone olunması gerekir. Gelen anahtarlar için uygun filtreleme yapılması gereklidir.
  • Örneğin, Del ile gelen tüm anahtarların bir kısmı için ilgili istemcinin yerel önbelleği olmayabilir.
  • Bu durum, gereksiz kaynak israfına yol açabilir.

Bu Nedenle Invalidation Message'a İhtiyaç Duyulmaktadır!

Invalidation Message Nedir?

Invalidation Messages, Redis 6.0'dan itibaren Server Assisted Client-Side Cache'in bir parçası olarak sunulan bir kavramdır. Invalidation Message aşağıdaki akışla iletilir:

  1. ClientB'nin daha önce bir anahtarı okuduğunu varsayalım.
  2. ClientA, ilgili anahtarı yeniden ayarlar.
  3. Redis değişikliği algılar ve ClientB'ye bir Invalidation Message yayınlayarak ClientB'ye önbelleği temizlemesini bildirir.
  4. ClientB, bu mesajı alarak uygun önlemi alır.

invalidation-message

Nasıl Kullanılır?

Temel Çalışma Yapısı

Redis'e bağlı istemci, CLIENT TRACKING ON REDIRECT <client-id> komutunu çalıştırarak invalidation message'ları almasını sağlar. Mesajları alması gereken istemci ise SUBSCRIBE __redis__:invalidate ile invalidation message'ları almak üzere abone olur.

varsayılan takip

1# istemci 1
2> SET a 100
1# istemci 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# istemci 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # takip
1# istemci 1
2> SET a 200
1# istemci 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

yayın takibi

1# istemci 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# istemci 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# istemci 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# istemci 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Uygulama! Uygulama! Uygulama!

Redigo + Ristretto

Bu şekilde açıklamak, kod üzerinde pratikte nasıl kullanılacağı konusunda belirsizlik yaratabilir. Bu nedenle, redigo ve ristretto ile basit bir yapılandırma oluşturalım.

Önce iki bağımlılığı kurun.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // frekansı takip edilecek anahtar sayısı (10M).
22		MaxCost:     1 << 30, // önbelleğin maksimum maliyeti (1GB).
23		BufferItems: 64,      // Get arabelleği başına anahtar sayısı.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("önbellek oluşturulamadı: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("redis'e bağlanılamadı: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("redis bağlantısı kapatılamadı: %w", err)
45	}
46
47	return nil
48}

Öncelikle, ristretto ve redigo'yu içeren basit bir RedisClient oluşturulur.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("redis'e bağlanılamadı: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("istemci kimliği alınamadı: %w", err)
10	}
11	slog.Info("istemci kimliği", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("izleme etkinleştirilemedi: %w", err)
16	}
17	slog.Info("abonelik sonucu", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("abone olunamadı: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("mesaj alınamadı: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("mesaj alındı", "kanal", msg.Channel, "veri", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("abonelik", "tür", msg.Kind, "kanal", msg.Channel, "sayı", msg.Count)
37		case error:
38			return fmt.Errorf("hata: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("beklenmedik mesaj", "mesaj", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("geçersizleştirme mesajı alındı", "anahtarlar", keys)
52		default:
53			slog.Warn("beklenmedik mesaj", "tür", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Kod biraz karmaşık.

  • Tracking yapmak için bir bağlantı daha kurulur. Bu, PubSub'ın diğer işlemleri engellemesini önlemek için alınan bir önlemdir.
  • Eklenen bağlantının kimliği sorgulanır ve veri sorgulayacak bağlantıda Tracking, bu bağlantıya Redirect edilir.
  • Ardından invalidation message'a abone olunur.
  • Abonelik işlemini yapan kod biraz karmaşıktır. Redigo, geçersizleştirme mesajlarının ayrıştırılmasını desteklemediği için, ayrıştırma öncesi yanıtı alıp işlemek gerekir.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("önbellek isabeti", "anahtar", key)
 7			return v, nil
 8		default:
 9			slog.Warn("beklenmedik tür", "tür", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("önbellek kaçırma", "anahtar", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("anahtar alınamadı: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Get mesajı, önce ristretto'yu sorgular; bulunamazsa Redis'ten alır.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("geçersizleştirme mesajı izlenemedi", "hata", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("kapatılıyor")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("anahtar alınamadı", "hata", err)
40				return
41			}
42			slog.Info("anahtar alındı", "değer", v)
43		}
44	}
45}

Test etmek için kullanılan kod yukarıdaki gibidir. Bir kez test ederseniz, Redis'teki veriler her güncellendiğinde değerin de yenilendiğini görebilirsiniz.

Ancak bu çok karmaşık. Her şeyden önce, bir kümeye genişletmek için tüm master veya replikalarda Tracking'i etkinleştirmek kaçınılmazdır.

Rueidis

Go dilini kullandığımız sürece, en modern ve gelişmiş rueidis elimizin altında. rueidis kullanarak Redis küme ortamında sunucu destekli istemci tarafı önbellek kullanan bir kod yazalım.

Öncelikle, bağımlılıkları kurun.

  • github.com/redis/rueidis

Ardından, Redis'ten veri sorgulayan kodu yazın.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("kapatılıyor")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("anahtar alınamadı", "hata", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("yanıt int64'e dönüştürülemedi", "hata", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("önbellek isabeti", "anahtar", key)
48			case false:
49				slog.Info("anahtar kaçırıldı", "anahtar", key)
50			}
51			slog.Info("anahtar alındı", "değer", i)
52		}
53	}
54}

rueidis'te istemci tarafı önbelleğini kullanmak için sadece DoCache çağırmak yeterlidir. Bu durumda, veriyi yerel önbellekte ne kadar süreyle tutulacağı gibi ayarlarla yerel önbelleğe ekler ve aynı DoCache çağrıldığında veriyi yerel önbellekten sorgulayarak alır. Doğal olarak, geçersizleştirme mesajlarını da sorunsuz bir şekilde işler.

Neden redis-go değil?

redis-go ne yazık ki resmi API olarak sunucu destekli istemci tarafı önbelleği desteklememektedir. Hatta PubSub oluşturulurken yeni bir bağlantı oluşturulduğunda, bu bağlantıya doğrudan erişim API'si olmadığından istemci kimliği de bilinememektedir. Bu nedenle, redis-go'nun yapılandırılamaz olduğu düşünülerek atlanmıştır.

Etkileyici

istemci tarafı önbellek yapısı aracılığıyla

  • Önceden hazırlanabilecek veriler söz konusu olduğunda, bu yapı Redis'e yönelik sorguları ve trafiği minimize ederken her zaman en güncel veriyi sağlayabilir.
  • Bu sayede bir tür CQRS yapısı oluşturularak okuma performansı önemli ölçüde artırılabilir.

cqrs

Ne Kadar Daha Etkileyici Oldu?

Gerçekten de sahada bu yapıyı kullanan iki API için basit gecikme sürelerine bakıldı. Oldukça soyut bir şekilde yazmak zorunda kaldığım için anlayışınızı rica ederim.

  1. İlk API
    1. İlk sorgulama: ortalama 14.63ms
    2. Sonraki sorgulamalar: ortalama 2.82ms
    3. Ortalama fark: 10.98ms
  2. İkinci API
    1. İlk sorgulama: ortalama 14.05ms
    2. Sonraki sorgulamalar: ortalama 1.60ms
    3. Ortalama fark: 11.57ms

Büyük ölçüde %82 civarında ek gecikme iyileşmesi kaydedildi!

Aşağıdaki iyileşme noktalarının gerçekleştiği beklenmektedir:

  • İstemci ile Redis arasındaki ağ iletişim sürecinin atlanması ve trafik tasarrufu
  • Redis'in gerçekleştirmesi gereken okuma komut sayısının azalması
    • Bu durum yazma performansını da artırıcı bir etki yapar.
  • Redis protokolünün ayrıştırılmasının minimize edilmesi
    • Redis protokolünü ayrıştırmanın maliyeti sıfır değildir. Bunu azaltabilmek büyük bir fırsattır.

Ancak her şey bir ödünleşmedir. Bunun için en azından aşağıdaki iki şeyi feda ettik:

  • İstemci tarafı önbellek yönetim öğelerinin uygulanması, işletilmesi ve bakımının gerekliliği
  • Bu durumdan kaynaklanan istemcinin CPU ve bellek kullanımının artması

Sonuç

Kişisel olarak tatmin edici bir mimari bileşeniydi ve gecikme süresi ile API sunucusu üzerindeki stres oldukça düşüktü. Gelecekte de mümkün olursa bu tür bir yapıyla mimariyi oluşturmayı düşünüyorum.