GoSuda

Poprawa responsywności dzięki buforowaniu po stronie klienta w Redis

By snowmerak
views ...

Co to jest Redis?

Sądzę, że niewiele osób nie zna Redisa. Niemniej jednak, w skrócie, można go podsumować w kilku punktach:

  • Operacje są wykonywane w pojedynczym wątku, co sprawia, że wszystkie operacje są atomowe.
  • Dane są przechowywane i przetwarzane In-Memory, co sprawia, że wszystkie operacje są szybkie.
  • Redis, w zależności od opcji, może przechowywać WAL, co umożliwia szybkie tworzenie kopii zapasowych i odzyskiwanie najnowszego stanu.
  • Obsługuje różne typy danych, takie jak Set, Hash, Bit, List, co zapewnia wysoką produktywność.
  • Posiada dużą społeczność, dzięki czemu można dzielić się różnymi doświadczeniami, problemami i rozwiązaniami.
  • Jest rozwijany i eksploatowany od dawna, co zapewnia godną zaufania stabilność.

Przejdźmy do sedna

Wyobraź sobie?

Co by było, gdyby pamięć podręczna Twojej usługi spełniała następujące dwa warunki?

  1. Konieczność dostarczania użytkownikowi najnowszych danych często przeglądanych, lecz z nieregularnymi aktualizacjami, co wymaga częstego odświeżania pamięci podręcznej.
  2. Brak aktualizacji, lecz konieczność częstego dostępu do tych samych danych w pamięci podręcznej w celu ich odczytu.

Pierwszy przypadek może dotyczyć rankingów popularności w czasie rzeczywistym w sklepie internetowym. Gdy rankingi popularności w czasie rzeczywistym są przechowywane jako sorted set, odczytywanie ich z Redis za każdym razem, gdy użytkownik uzyskuje dostęp do strony głównej, jest nieefektywne. Drugi przypadek dotyczy danych kursów walut, gdzie dane kursów walut są ogłaszane w przybliżeniu co 10 minut, ale rzeczywiste zapytania występują bardzo często. Co więcej, w przypadku kursów won-dolar, won-jen i won-juan, pamięć podręczna jest odpytywana bardzo często. W takich przypadkach, serwer API posiadający oddzielną pamięć podręczną lokalnie i odświeżający ją poprzez ponowne odpytywanie Redis po zmianie danych, byłby bardziej efektywnym rozwiązaniem.

W jaki sposób można zatem zaimplementować to działanie w architekturze baza danych - Redis - serwer API?

Czy nie można użyć Redis PubSub?

Gdy używasz pamięci podręcznej, subskrybuj kanał, aby otrzymywać powiadomienia o aktualizacjach!

  • W takim przypadku należy utworzyć logikę wysyłania wiadomości podczas aktualizacji.
  • Dodatkowe operacje wynikające z PubSub wpływają na wydajność.

pubsub-write

pubsub-read

Co jeśli Redis wykryje zmiany?

Co, jeśli użyjemy Keyspace Notification do otrzymywania powiadomień o poleceniach dla danego klucza?

  • Istnieje uciążliwość związana z koniecznością wcześniejszego przechowywania i udostępniania kluczy i poleceń używanych do aktualizacji.
  • Na przykład, dla niektórych kluczy, prosty Set jest poleceniem aktualizacji, podczas gdy dla innych kluczy, LPush, RPush, SAdd lub SRem stają się poleceniami aktualizacji, co komplikuje sprawę.
  • Znacząco zwiększa to prawdopodobieństwo błędów komunikacyjnych i błędów ludzkich w kodowaniu podczas procesu rozwoju.

Co, jeśli użyjemy Keyevent Notification do otrzymywania powiadomień na poziomie poleceń?

  • Wymaga to subskrypcji wszystkich poleceń używanych do aktualizacji. Wymagane jest odpowiednie filtrowanie kluczy, które są odbierane.
  • Na przykład, dla niektórych kluczy przychodzących z Del, dany klient prawdopodobnie nie będzie miał lokalnej pamięci podręcznej.
  • Może to prowadzić do niepotrzebnego marnowania zasobów.

Dlatego potrzebne są Invalidation Message!

Czym są Invalidation Message?

Invalidation Messages to koncepcja wprowadzona w Redis 6.0 jako część Server Assisted Client-Side Cache. Invalidation Message są dostarczane w następującym schemacie:

  1. Zakłada się, że KlientB już raz odczytał klucz.
  2. KlientA ustawia ten klucz na nowo.
  3. Redis wykrywa zmianę i publikuje Invalidation Message do KlientaB, informując go o konieczności usunięcia pamięci podręcznej.
  4. KlientB odbiera tę wiadomość i podejmuje odpowiednie działania.

invalidation-message

Jak tego używać?

Podstawowa struktura działania

Klient podłączony do Redis odbiera wiadomości unieważniające, wykonując CLIENT TRACKING ON REDIRECT <client-id>. Klient, który ma odbierać wiadomości, subskrybuje odbieranie wiadomości unieważniających za pomocą SUBSCRIBE __redis__:invalidate.

domyślne śledzenie

1# client 1
2> SET a 100
1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2> CLIENT TRACKING ON REDIRECT 12
3> GET a # tracking
1# client 1
2> SET a 200
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "a"

śledzenie rozgłoszeniowe

1# client 3
2> CLIENT ID
312
4> SUBSCRIBE __redis__:invalidate
51) "subscribe"
62) "__redis__:invalidate"
73) (integer) 1
1# client 2
2CLIENT TRACKING ON BCAST PREFIX cache: REDIRECT 12
1# client 1
2> SET cache:name "Alice"
3> SET cache:age 26
1# client 3
21) "message"
32) "__redis__:invalidate"
43) 1) "cache:name"
51) "message"
62) "__redis__:invalidate"
73) 1) "cache:age"

Implementacja! Implementacja! Implementacja!

Redigo + Ristretto

Samo takie wyjaśnienie może być niejasne, jak używać tego w kodzie. Zatem, najpierw skonfigurujmy to za pomocą redigo i ristretto.

Najpierw zainstaluj dwie zależności.

  • github.com/gomodule/redigo
  • github.com/dgraph-io/ristretto
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log/slog"
 7	"time"
 8
 9	"github.com/dgraph-io/ristretto"
10	"github.com/gomodule/redigo/redis"
11)
12
13type RedisClient struct {
14	conn  redis.Conn
15	cache *ristretto.Cache[string, any]
16	addr  string
17}
18
19func NewRedisClient(addr string) (*RedisClient, error) {
20	cache, err := ristretto.NewCache(&ristretto.Config[string, any]{
21		NumCounters: 1e7,     // liczba kluczy do śledzenia częstotliwości (10M).
22		MaxCost:     1 << 30, // maksymalny koszt pamięci podręcznej (1GB).
23		BufferItems: 64,      // liczba kluczy na bufor Get.
24	})
25	if err != nil {
26		return nil, fmt.Errorf("failed to generate cache: %w", err)
27	}
28
29	conn, err := redis.Dial("tcp", addr)
30	if err != nil {
31		return nil, fmt.Errorf("failed to connect to redis: %w", err)
32	}
33
34	return &RedisClient{
35		conn:  conn,
36		cache: cache,
37		addr:  addr,
38	}, nil
39}
40
41func (r *RedisClient) Close() error {
42	err := r.conn.Close()
43	if err != nil {
44		return fmt.Errorf("failed to close redis connection: %w", err)
45	}
46
47	return nil
48}

Najpierw tworzymy prosty RedisClient zawierający ristretto i redigo.

 1func (r *RedisClient) Tracking(ctx context.Context) error {
 2	psc, err := redis.Dial("tcp", r.addr)
 3	if err != nil {
 4		return fmt.Errorf("failed to connect to redis: %w", err)
 5	}
 6
 7	clientId, err := redis.Int64(psc.Do("CLIENT", "ID"))
 8	if err != nil {
 9		return fmt.Errorf("failed to get client id: %w", err)
10	}
11	slog.Info("client id", "id", clientId)
12
13	subscriptionResult, err := redis.String(r.conn.Do("CLIENT", "TRACKING", "ON", "REDIRECT", clientId))
14	if err != nil {
15		return fmt.Errorf("failed to enable tracking: %w", err)
16	}
17	slog.Info("subscription result", "result", subscriptionResult)
18
19	if err := psc.Send("SUBSCRIBE", "__redis__:invalidate"); err != nil {
20		return fmt.Errorf("failed to subscribe: %w", err)
21	}
22	psc.Flush()
23
24	for {
25		msg, err := psc.Receive()
26		if err != nil {
27			return fmt.Errorf("failed to receive message: %w", err)
28		}
29
30		switch msg := msg.(type) {
31		case redis.Message:
32			slog.Info("received message", "channel", msg.Channel, "data", msg.Data)
33			key := string(msg.Data)
34			r.cache.Del(key)
35		case redis.Subscription:
36			slog.Info("subscription", "kind", msg.Kind, "channel", msg.Channel, "count", msg.Count)
37		case error:
38			return fmt.Errorf("error: %w", msg)
39		case []interface{}:
40			if len(msg) != 3 || string(msg[0].([]byte)) != "message" || string(msg[1].([]byte)) != "__redis__:invalidate" {
41				slog.Warn("unexpected message", "message", msg)
42				continue
43			}
44
45			contents := msg[2].([]interface{})
46			keys := make([]string, len(contents))
47			for i, key := range contents {
48				keys[i] = string(key.([]byte))
49				r.cache.Del(keys[i])
50			}
51			slog.Info("received invalidation message", "keys", keys)
52		default:
53			slog.Warn("unexpected message", "type", fmt.Sprintf("%T", msg))
54		}
55	}
56}

Kod jest nieco skomplikowany.

  • Aby wykonać Tracking, nawiązujemy dodatkowe połączenie. Jest to działanie podjęte z uwagi na to, że PubSub może zakłócić inne operacje.
  • Po pobraniu ID dodatkowego połączenia, Tracking z połączenia zapytującego dane jest przekierowywany na to połączenie.
  • Następnie subskrybujemy wiadomości unieważniające.
  • Kod obsługujący subskrypcję jest nieco skomplikowany. Ponieważ redigo nie parsuje wiadomości unieważniających, należy przetworzyć odpowiedź przed jej sparsowaniem.
 1func (r *RedisClient) Get(key string) (any, error) {
 2	val, found := r.cache.Get(key)
 3	if found {
 4		switch v := val.(type) {
 5		case int64:
 6			slog.Info("cache hit", "key", key)
 7			return v, nil
 8		default:
 9			slog.Warn("unexpected type", "type", fmt.Sprintf("%T", v))
10		}
11	}
12	slog.Info("cache miss", "key", key)
13
14	val, err := redis.Int64(r.conn.Do("GET", key))
15	if err != nil {
16		return nil, fmt.Errorf("failed to get key: %w", err)
17	}
18
19	r.cache.SetWithTTL(key, val, 1, 10*time.Second)
20	return val, nil
21}

Wiadomość Get najpierw odpytuje ristretto, a jeśli nie znajdzie, pobiera dane z Redis.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9)
10
11func main() {
12	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
13	defer cancel()
14
15	client, err := NewRedisClient("localhost:6379")
16	if err != nil {
17		panic(err)
18	}
19	defer client.Close()
20
21	go func() {
22		if err := client.Tracking(ctx); err != nil {
23			slog.Error("failed to track invalidation message", "error", err)
24		}
25	}()
26
27	ticker := time.NewTicker(1 * time.Second)
28	defer ticker.Stop()
29	done := ctx.Done()
30
31	for {
32		select {
33		case <-done:
34			slog.Info("shutting down")
35			return
36		case <-ticker.C:
37			v, err := client.Get("key")
38			if err != nil {
39				slog.Error("failed to get key", "error", err)
40				return
41			}
42			slog.Info("got key", "value", v)
43		}
44	}
45}

Kod do testowania jest powyżej. Po przetestowaniu będziesz mógł zauważyć, że Redis odświeża wartości za każdym razem, gdy dane są aktualizowane.

Jednakże jest to zbyt złożone. Przede wszystkim, aby rozszerzyć się na klaster, nieuchronnie trzeba będzie włączyć Tracking dla wszystkich masterów lub replik.

Rueidis

Dla tych, którzy używają języka Go, mamy najbardziej nowoczesny i zaawansowany rueidis. Napiszmy kod, który używa rueidis do obsługi server assisted client side cache w środowisku klastra Redis.

Najpierw zainstaluj zależności.

  • github.com/redis/rueidis

Następnie napisz kod, który odpytuje Redis o dane.

 1package main
 2
 3import (
 4	"context"
 5	"log/slog"
 6	"os"
 7	"os/signal"
 8	"time"
 9
10	"github.com/redis/rueidis"
11)
12
13func main() {
14	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
15	defer cancel()
16
17	client, err := rueidis.NewClient(rueidis.ClientOption{
18		InitAddress: []string{"localhost:6379"},
19	})
20	if err != nil {
21		panic(err)
22	}
23
24	ticker := time.NewTicker(1 * time.Second)
25	defer ticker.Stop()
26	done := ctx.Done()
27
28	for {
29		select {
30		case <-done:
31			slog.Info("shutting down")
32			return
33		case <-ticker.C:
34			const key = "key"
35			resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), 10*time.Second)
36			if resp.Error() != nil {
37				slog.Error("failed to get key", "error", resp.Error())
38				continue
39			}
40			i, err := resp.AsInt64()
41			if err != nil {
42				slog.Error("failed to convert response to int64", "error", err)
43				continue
44			}
45			switch resp.IsCacheHit() {
46			case true:
47				slog.Info("cache hit", "key", key)
48			case false:
49				slog.Info("missed key", "key", key)
50			}
51			slog.Info("got key", "value", i)
52		}
53	}
54}

W przypadku rueidis, aby użyć client side cache, wystarczy wywołać DoCache. Wówczas dane zostaną dodane do lokalnej pamięci podręcznej z określonym czasem życia, a kolejne wywołania DoCache będą pobierać dane z lokalnej pamięci podręcznej. Oczywiście, wiadomości unieważniające są również prawidłowo obsługiwane.

Dlaczego nie redis-go?

redis-go, niestety, nie obsługuje oficjalnie Server Assisted Client-Side Cache. Co więcej, podczas tworzenia PubSub, nie ma API do bezpośredniego dostępu do nowego połączenia, co uniemożliwia poznanie client ID. Z tego powodu uznałem, że redis-go nie może być skonfigurowany i pominąłem go.

Seksowne!

Poprzez strukturę pamięci podręcznej po stronie klienta

  • Jeśli dane mogą być przygotowane z wyprzedzeniem, ta struktura umożliwi dostarczanie najnowszych danych przy minimalizacji zapytań i ruchu do Redis.
  • Dzięki temu można stworzyć rodzaj struktury CQRS, znacznie zwiększając wydajność odczytu.

cqrs

Jak bardzo to się poprawiło?

W rzeczywistości, ponieważ taka struktura jest już używana w praktyce, zbadałem krótko opóźnienia dla dwóch API. Proszę wybaczyć, że mogę to opisać tylko w bardzo abstrakcyjny sposób.

  1. Pierwsze API
    1. Pierwsze zapytanie: średnio 14.63ms
    2. Kolejne zapytania: średnio 2.82ms
    3. Średnia różnica: 10.98ms
  2. Drugie API
    1. Pierwsze zapytanie: średnio 14.05ms
    2. Kolejne zapytania: średnio 1.60ms
    3. Średnia różnica: 11.57ms

Nastąpiła dodatkowa poprawa opóźnień o nawet 82%!

Prawdopodobnie oczekiwano następujących ulepszeń:

  • Eliminacja komunikacji sieciowej między klientem a Redis oraz oszczędność ruchu.
  • Zmniejszenie liczby poleceń odczytu, które musi wykonać sam Redis.
    • Ma to również wpływ na zwiększenie wydajności zapisu.
  • Minimalizacja parsowania protokołu Redis.
    • Parsowanie protokołu Redis nie jest darmowe. Zmniejszenie tego jest dużą szansą.

Jednak wszystko jest kompromisem. W tym celu poświęciliśmy co najmniej dwie rzeczy:

  • Konieczność implementacji, obsługi i utrzymania elementów zarządzania pamięcią podręczną po stronie klienta.
  • Zwiększone zużycie procesora i pamięci po stronie klienta z tego powodu.

Wniosek

Osobiście byłem zadowolony z tego komponentu architektury, a opóźnienia i obciążenie serwera API były bardzo niskie. W przyszłości, jeśli to możliwe, chciałbym budować architekturę w ten sposób.