GoSuda

Redis İstemci Tarafı Önbelleği ile Duyarlılığı Artırmak

By snowmerak
views ...

Redis Nedir?

Redis'i bilmeyen kimsenin olmadığını düşünüyorum. Ancak yine de bazı özellikleriyle kısaca değinecek olursak şu şekilde özetleyebiliriz:

  • İşlemler tek bir iş parçacığında gerçekleştirilir, bu nedenle tüm işlemler atomiktir.
  • Veriler In-Memory'de depolanır ve işlenir, bu nedenle tüm işlemler hızlıdır.
  • Redis, seçeneğe bağlı olarak WAL'ı kaydedebilir, bu da en son durumun hızlı bir şekilde yedeklenmesini ve kurtarılmasını sağlar.
  • Set, Hash, Bit, List gibi çeşitli tipleri destekleyerek yüksek üretkenlik sağlar.
  • Büyük bir topluluğa sahip olması sayesinde çeşitli deneyimler, sorunlar ve çözümler paylaşılabilir.
  • Uzun süredir geliştirilmekte ve işletilmekte olması nedeniyle güvenilir bir istikrara sahiptir.

Esas Konuya Gelelim

Hayal Edin

Hizmetinizin önbelleği aşağıdaki iki koşula uyuyorsa ne olurdu?

  1. Sıkça sorgulanan verilerin kullanıcıya en güncel haliyle sunulması gerektiğinde, ancak güncellemelerin düzensiz olması nedeniyle önbelleğin sık sık yenilenmesi gerektiğinde.
  2. Güncelleme olmasa da, aynı önbellek verisine sık sık erişilmesi ve sorgulanması gerektiğinde.

İlk durum için alışveriş sitelerindeki gerçek zamanlı popülerlik sıralamaları düşünülebilir. Alışveriş sitelerinin gerçek zamanlı popülerlik sıralaması sorted set olarak saklandığında, Redis'ten kullanıcının ana sayfaya her eriştiğinde okunması verimsiz olacaktır. İkinci durum için döviz kuru verileri ele alınabilir; yaklaşık olarak 10 dakikada bir döviz kuru ilan edilse de, gerçek sorgulamalar çok sık meydana gelir. Özellikle Won-Dolar, Won-Yen, Won-Yuan için önbellek çok sık sorgulanır. Bu gibi durumlarda, API sunucusunun yerel olarak ayrı bir önbelleğe sahip olması ve veriler değiştiğinde Redis'i tekrar sorgulayarak güncellemesi daha verimli bir işlem olacaktır.

Peki, Veritabanı - Redis - API sunucusu yapısında bu işlemi nasıl uygulayabiliriz?

Redis PubSub ile Olmaz mı?

Önbellek kullanırken, güncelleme durumunu alabileceğimiz bir kanala abone olalım!

  • O zaman güncelleme anında mesaj gönderme mantığı oluşturulmalıdır.
  • PubSub'ın ek bir işlem getirmesi nedeniyle performansı etkiler.

pubsub-write

pubsub-read

Peki Redis Değişikliği Algılarsa?

Keyspace Notification kullanarak ilgili anahtara ilişkin komut bildirimleri alsak?

  • Güncelleme için kullanılan anahtarların ve komutların önceden depolanması ve paylaşılması gibi bir zahmet vardır.
  • Örneğin, bazı anahtarlar için basit bir Set güncelleme komutuyken, bazı anahtarlar için LPush veya RPush, SAdd ve SRem gibi komutlar güncelleme komutu olabilir, bu da karmaşıklığı artırır.
  • Bu durum, geliştirme sürecinde iletişim hatalarını ve kodlamada insan hatası olasılığını büyük ölçüde artırır.

Keyevent Notification kullanarak komut bazında bildirim alsak?

  • Güncelleme için kullanılan tüm komutlara abone olmak gerekir. Oradan gelen anahtarlar için uygun filtreleme gereklidir.
  • Örneğin, Del ile gelen tüm anahtarların bir kısmı için ilgili istemcinin yerel önbelleği olmayabilir.
  • Bu durum, gereksiz kaynak israfına yol açabilir.

Bu Yüzden Gerekli Olan Şey Invalidation Message!

Invalidation Message Nedir?

Invalidation Messages, Redis 6.0'dan itibaren eklenen Server Assisted Client-Side Cache'in bir parçası olarak sunulan bir kavramdır. Invalidation Message aşağıdaki akışla iletilir:

  1. ClientB'nin daha önce bir anahtarı okuduğunu varsayalım.
  2. ClientA, ilgili anahtarı yeni bir değerle ayarlar.
  3. Redis, değişikliği algılar ve ClientB'ye bir Invalidation Message yayınlayarak ClientB'ye önbelleği silmesini bildirir.
  4. ClientB, bu mesajı alır ve uygun eylemi gerçekleştirir.

invalidation-message

Nasıl Kullanılır?

Temel Çalışma Yapısı

Redis'e bağlı istemci, CLIENT TRACKING ON REDIRECT <client-id> komutunu çalıştırarak invalidation message almasını sağlar. Mesaj alması gereken istemci ise SUBSCRIBE __redis__:invalidate komutuyla invalidation message'ları almak için abone olur.

varsayılan takip

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

yayın takibi

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Uygulama! Uygulama! Uygulama!

Redigo + Ristretto

Bu şekilde açıklamak, kodu gerçekte nasıl kullanacağımız konusunda belirsizlik yaratabilir. Bu yüzden basitçe redigo ve ristretto ile bir yapı oluşturalım.

Öncelikle iki bağımlılığı kuralım.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // sıklığı takip edilecek anahtar sayısı (10M).
22		MaxCost:     1 << 30, // önbelleğin maksimum maliyeti (1GB).
23		BufferItems: 64,      // Get tamponu başına anahtar sayısı.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("önbellek oluşturulamadı: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("redis'e bağlanılamadı: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("redis bağlantısı kapatılamadı: %w", err)
45	}
46
47	return nil
48}

Öncelikle ristretto ve redigo'yu içeren basit bir RedisClient oluştururuz.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("redis'e bağlanılamadı: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("istemci kimliği alınamadı: %w", err)
10	}
11	slog.Info("istemci kimliği", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("izleme etkinleştirilemedi: %w", err)
16	}
17	slog.Info("abonelik sonucu", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("abone olunamadı: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("mesaj alınamadı: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("mesaj alındı", "kanal", msg.Channel, "veri", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("abonelik", "tür", msg.Kind, "kanal", msg.Channel, "sayı", msg.Count)
37		case error:
38			return fmt.Errorf("hata: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("beklenmedik mesaj", "mesaj", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("geçersiz kılma mesajı alındı", "anahtarlar", keys)
52		default:
53			slog.Warn("beklenmedik mesaj", "tür", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Kod biraz karmaşık.

  • Takip için bir bağlantı daha kurarız. Bu, PubSub'ın diğer işlemlerin engellenmesini göz önünde bulunduran bir önlemdir.
  • Eklenen bağlantının kimliğini sorgulayarak, veri sorgulayacak bağlantının takibi bu bağlantıya yönlendirmesini sağlarız.
  • Ve invalidation message'a abone oluruz.
  • Aboneliği işleyen kod biraz karmaşıktır. redigo'nun geçersiz kılma mesajlarını ayrıştıramaması nedeniyle, ayrıştırmadan önce yanıtı alıp işlememiz gerekir.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("önbellek isabeti", "anahtar", key)
 7			return v, nil
 8		default:
 9			slog.Warn("beklenmedik tür", "tür", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("önbellek isabet etmedi", "anahtar", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("anahtar alınamadı: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Get mesajı, ristretto'yu öncelikle sorgular ve bulunamazsa Redis'ten getirir.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("geçersiz kılma mesajı izlenemedi", "hata", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("kapatılıyor")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("anahtar alınamadı", "hata", err)
40				return
41			}
42			slog.Info("anahtar alındı", "değer", v)
43		}
44	}
45}

Test kodu yukarıdaki gibidir. Bir kez test ederseniz, Redis'teki veriler her güncellendiğinde değerin de yenilendiğini göreceksiniz.

Ancak bu çok karmaşık. Özellikle bir küme için genişleme sağlamak amacıyla tüm ana veya replikalarda Tracking'i etkinleştirmek zorunluluktur.

Rueidis

Go dilini kullandığımız sürece, elimizde en modern ve gelişmiş rueidis bulunmaktadır. rueidis kullanarak Redis küme ortamında server assisted client side cache kullanan bir kod yazalım.

Öncelikle, bağımlılıkları kuralım.

  • github.com/redis/rueidis

Ve Redis'ten veri sorgulayan kodu yazalım.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("kapatılıyor")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("anahtar alınamadı", "hata", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("yanıt int64'e dönüştürülemedi", "hata", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("önbellek isabeti", "anahtar", key)
48			case false:
49				slog.Info("anahtar isabet etmedi", "anahtar", key)
50			}
51			slog.Info("anahtar alındı", "değer", i)
52		}
53	}
54}

Rueidis'te client side cache kullanmak için sadece DoCache yapmak yeterlidir. Bu durumda, yerel önbellekte ne kadar süreyle tutulacağı gibi bilgilerle yerel önbelleğe eklenir ve aynı DoCache çağrıldığında veriler yerel önbellekten sorgulanarak getirilir. Doğal olarak, geçersiz kılma mesajları da düzgün bir şekilde işlenir.

Neden redis-go değil?

redis-go ne yazık ki resmi API olarak server assisted client side cache'i desteklemiyor. Hatta PubSub oluştururken yeni bir bağlantı oluşturuyor ve bu bağlantıya doğrudan erişen bir API olmadığı için istemci kimliği de bilinemiyor. Bu nedenle redis-go'nun yapısı uygun olmadığına karar vererek pas geçtik.

Çekici

İstemci tarafı önbellek yapısı aracılığıyla

  • Önceden hazırlanabilecek veriler ise, bu yapı sayesinde Redis üzerindeki sorgu ve trafiği minimuma indirerek her zaman en güncel veriyi sağlayabiliriz.
  • Bu sayede bir nevi CQRS yapısı oluşturarak okuma performansını önemli ölçüde artırabiliriz.

cqrs

Ne kadar daha çekici hale geldi?

Gerçekten de, sahada bu yapıyı kullanan iki API'nin gecikme sürelerini basitçe inceledik. Çok soyut bir şekilde yazmak zorunda kaldığım için anlayışınızı rica ederim.

  1. Birinci API
    1. İlk sorgulama: Ortalama 14.63ms
    2. Sonraki sorgulamalar: Ortalama 2.82ms
    3. Ortalama fark: 10.98ms
  2. İkinci API
    1. İlk sorgulama: Ortalama 14.05ms
    2. Sonraki sorgulamalar: Ortalama 1.60ms
    3. Ortalama fark: 11.57ms

%82'ye varan ek bir gecikme iyileşmesi oldu!

Muhtemelen aşağıdaki iyileştirmeler bekleniyordu:

  • İstemci ve Redis arasındaki ağ iletişimi sürecinin atlanması ve trafik tasarrufu
  • Redis'in gerçekleştirmesi gereken okuma komut sayısının azalması
    • Bu, yazma performansını da artırıcı bir etkiye sahiptir.
  • Redis protokolünün ayrıştırılmasının minimize edilmesi
    • Redis protokolünü ayrıştırmanın maliyeti sıfır değildir. Bunu azaltabilmek büyük bir fırsattır.

Ancak her şey bir denge meselesidir. Bunun için en azından aşağıdaki iki şeyi feda ettik:

  • İstemci tarafı önbellek yönetimi bileşenlerinin uygulanması, işletilmesi ve bakımı gerekliliği
  • Bu durumdan kaynaklanan istemcinin CPU ve bellek kullanımında artış

Sonuç

Kişisel olarak tatmin edici bir mimari bileşendi ve gecikme süresi ile API sunucusu üzerindeki stres oldukça azdı. Gelecekte de mümkün olursa bu tür bir yapıyla mimariyi oluşturmayı düşünüyorum.