GoSuda

Gömülü NATS, Go uygulamasıyla nasıl iletişim kurar?

By prravda
views ...

Başlarken

NATS Hakkında

Yazılım uygulamaları ve hizmetlerinin veri alışverişinde bulunması gerekmektedir. NATS, mesajlar şeklinde segmentlere ayrılmış böyle bir veri alışverişini sağlayan bir altyapıdır. Buna "mesaj odaklı ara katman yazılımı" adını veriyoruz.

NATS ile uygulama geliştiricileri şunları yapabilir:

  • Dağıtık ve ölçeklenebilir istemci-sunucu uygulamalarını zahmetsizce oluşturabilirler.
  • Verileri genel bir şekilde gerçek zamanlı olarak depolayabilir ve dağıtabilirler. Bu, çeşitli ortamlar, diller, bulut sağlayıcıları ve şirket içi sistemler arasında esnek bir şekilde gerçekleştirilebilir.

NATS Nedir, NATS belgeleri

  • NATS, Go ile oluşturulmuş bir mesaj broker'ıdır.

Embedded NATS

Uygulamanız Go dilinde ise ve kullanım durumunuza ve dağıtım senaryolarınıza uyuyorsa, uygulamanızın içine bir NATS sunucusu bile gömebilirsiniz.

NATS'ı Gömmek, NATS belgeleri

  • NATS'ın özel bir özelliği vardır: Go ile oluşturulmuş uygulamalar için embedded mode desteği sunar.
  • Yani, mesaj broker'larının genel yöntemi olan ayrı bir broker sunucusu çalıştırıp ardından bu sunucu ile uygulamanın istemcisi aracılığıyla iletişim kurmak yerine, broker'ın kendisini Go ile yazılmış bir uygulamaya gömebilirsiniz (embed).

Embedded NATS'ın Faydaları ve Kullanım Durumları

  • İyi açıklanmış bir Youtube videosu bulunduğundan, video bağlantısı ile yetinilmektedir.
  • Ayrı bir mesaj broker sunucusu dağıtmaya gerek kalmadan modular monolith application oluşturarak "separate of concern" ilkesini yerine getirirken, NATS'ı embedded olarak yerleştirmenin avantajından yararlanmak mümkündür. Ayrıca, single binary deployment da mümkün hale gelir.
  • Platform with no network (wasm) ortamlarında olduğu gibi, offline-first application'larda da faydalı bir şekilde kullanılabilir.

Resmi belgelerdeki örnek

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Seçeneklerle yeni sunucuyu başlat
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Sunucuyu goroutine aracılığıyla başlat
22    go ns.Start()
23
24    // Sunucunun bağlantılar için hazır olmasını bekle
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("bağlantı için hazır değil")
27    }
28
29    // Sunucuya bağlan
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Konuya abone ol
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Mesaj verilerini yazdır
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Sunucuyu kapat (isteğe bağlı)
45        ns.Shutdown()
46    })
47
48    // Konuya veri yayınla
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Sunucunun kapanmasını bekle
52    ns.WaitForShutdown()
53}
  • NATS resmi belgelerinde yer alan Embedded NATS örneği bu şekildedir; ancak bu örnek koda göre ilerlendiğinde embedding mode ile iletişim kurulmamaktadır.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • watch 'netstat -an | grep 127.0.0.1' komutu aracılığıyla localhost (127.0.0.1) üzerindeki ağ trafiğini kontrol ederken, go run . komutuyla ilgili go dosyasını çalıştırdığınızda, NATS'ın varsayılan portu olan 4222'den kaynaklanan yeni ağ isteklerinin eklendiğini gözlemleyebilirsiniz.

Embedded mode için doğru yapılandırmalar

  • Embedded mode ile beklenen şekilde iletişim kurabilmek için aşağıdaki iki seçeneğe ihtiyaç duyulmaktadır.

    • Client: InProcessServer seçeneği eklenmelidir.
    • Server: Server.Options içerisinde DontListen flag'i true olarak açıkça belirtilmelidir.
  • Bu hususlar resmi olarak belgelenmemiştir ve bu özelliğin başlangıcı ilgili PR aracılığıyla tespit edilebilir.

    Bu PR üç şey ekler:

    1. Server'a, TCP soketleri kullanmadan NATS sunucusuna bağlantı kurmak için bir net.Pipe oluşturan InProcessConn() fonksiyonu
    2. NATS sunucusuna alışılagelmiş TCP dinleyicisi üzerinde dinleme yapmamasını söyleyen DontListen seçeneği
    3. AcceptLoop başlamadan hemen önce kapanan startupComplete kanalı ve readyForConnections'ın bunu bekleyeceği

    Bunun ana motivasyonu, hem monolit (tek süreçli) modda hem de polilit (çok süreçli) modda çalışabilen bir uygulamamız olmasıdır. Basitlik açısından her iki mod için de NATS'ı kullanabilmek istiyoruz, ancak monolit modun, soket bağlantılarının ya anlamsız olduğu (mobil) ya da mümkün olmadığı (WASM) çeşitli platformlara hitap edebilmesi gerekiyor. Bu değişiklikler, NATS'ı tamamen süreç içi kullanmamıza olanak sağlayacaktır.

    Eşlik eden bir PR olan nats-io/nats.go#774, istemci tarafı desteği eklemektedir.

    Bu, bu projeye yaptığım ilk PR'dır, bu yüzden herhangi bir şeyi gözden kaçırdıysam şimdiden özür dilerim.

    /cc @nats-io/core

Embedded mode için Çalışan Örnek

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// gömülü NATS sunucusunu yapılandırmak için
14		// DonListen'ı true olarak ayarla
15		DontListen: true,
16	}
17
18	// Seçeneklerle yeni sunucuyu başlat
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Sunucuyu goroutine aracılığıyla başlat
26	go ns.Start()
27
28	// Sunucunun bağlantılar için hazır olmasını bekle
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("bağlantı için hazır değil")
31	}
32
33	// Süreç içi bağlantı aracılığıyla sunucuya bağlan
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Konuya abone ol
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Mesaj verilerini yazdır
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Sunucuyu kapat (isteğe bağlı)
49		ns.Shutdown()
50	})
51
52	// Konuya veri yayınla
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Sunucunun kapanmasını bekle
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Artık beklenen şekilde ek bir network hop'ının meydana gelmediği gözlemlenebilir.

Arka planda

TL;DR

diagram1

  • Bu kod main.go dosyasında çalıştırıldığında, dahili olarak hangi fonksiyonların nasıl çalıştığını gösteren bir sequence diagram'ıdır ve temel noktaları aşağıda açıklanmıştır.
    • DontListen: true aracılığıyla sunucu, AcceptLoop adlı istemci dinleme aşamasını atlar.
    • İstemcinin Connect seçeneğindeki InProcessServer etkinleştirilirse, bir in-memory bağlantı oluşturulur ve net.Pipe aracılığıyla bir pipe oluşturulduktan sonra pipe'ın sonu istemciye net.Conn türünde döndürülür.
    • İstemci ve sunucu, bu bağlantı aracılığıyla in-process iletişimi gerçekleştirir.

Sunucu

AcceptLoop

1// nats-server/server/server.go
2
3// İstemcileri bekle.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Öncelikle, DontListen true olduğunda, AcceptLoop adlı istemci dinleme aşaması atlanır.
 1// nats-server/server/server.go
 2
 3// AcceptLoop, daha kolay test için dışa aktarılmıştır.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Dinleyici düzgün bir şekilde kurulmadan çıkacak olsaydık,
 6	// kanalın kapatıldığından emin ol.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Sunucu seçeneklerinin anlık görüntüsünü al.
18	opts := s.getOpts()
19
20	// Kapatmayı etkinleştirebilecek durumu ayarla
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Port üzerinde dinleme hatası: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("İstemci bağlantıları için dinleniyor: %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// TLS etkinse bildir.
34	if opts.TLSConfig != nil {
35		s.Noticef("İstemci bağlantıları için TLS gerekli")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("\"TLS Handshake First\" seçeneğini kullanmayan istemciler bağlanamayacaktır")
38		}
39	}
40
41	// Sunucu RANDOM_PORT (-1) ile başlatıldıysa, opts.Port bu fonksiyonun
42	// başında 0'a eşit olacaktır. Bu nedenle gerçek portu almamız gerekir.
43	if opts.Port == 0 {
44		// Çözümlenen portu seçeneklere geri yaz.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Port ayarlandıktan sonra (RANDOM olarak ayarlanmışsa),
49	// sunucunun bilgi Host/Port'unu Options veya
50	// ClientAdvertise değerleriyle ayarla.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Sunucu INFO'yu ClientAdvertise değeri %s ile ayarlarken hata oluştu, hata=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// İstemci bağlantı URL'lerini takip et. Bunlara daha sonra ihtiyacımız olabilir.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Yeni istemcileri kabul etmediğimizi bildir
65				s.ldmCh <- true
66				// Şimdi Kapanmayı bekle...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Çağırana hazır olduğumuzu bildir
75	close(clr)
76	clr = nil
77}
  • Bu arada, AcceptLoop fonksiyonu aşağıdaki süreçleri yürütür. TLS veya hostPort gibi ağ iletişimiyle ilgili kısımlar, in-process communication yapıldığında gereksiz hale geldiği için atlanabilir.

İstemci

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect, NATS sistemine bağlanmaya çalışacaktır.
 5// URL, kullanıcı adı/şifre semantiği içerebilir. örn. nats://derek:pass@localhost:4222
 6// Virgülle ayrılmış diziler de desteklenir, örn. urlA, urlB.
 7// Seçenekler varsayılanlarla başlar ancak geçersiz kılınabilir.
 8// Bir NATS Sunucusunun websocket portuna bağlanmak için `ws` veya `wss` şemasını kullanın, örneğin
 9// `ws://localhost:8080`. Websocket şemalarının diğerleriyle (nats/tls) karıştırılamayacağını unutmayın.
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options, özelleştirilmiş bir bağlantı oluşturmak için kullanılabilir.
 4type Options struct {
 5	// Url, istemcinin bağlanacağı tek bir NATS sunucu URL'sini temsil eder.
 6	// Eğer Servers seçeneği de ayarlanmışsa,
 7	// bu durumda Servers dizisindeki ilk sunucu olur.
 8	Url string
 9
10	// InProcessServer, aynı süreç içinde çalışan bir NATS sunucusunu temsil eder.
11	// Bu ayarlanırsa, harici TCP bağlantıları kullanmak yerine
12	// doğrudan sunucuya bağlanmaya çalışacağız.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • NATS sunucusu ile NATS istemcisinin bağlantısını gerçekleştiren Connect fonksiyonu, istemci URL'si ve bağlantı seçenekleri ayarlayabilmekte olup, bu seçenekleri barındıran Options struct'ında InProcessConnProvider arayüz türünden InProcessServer adında bir alan bulunmaktadır.
1// main.go of example code
2
3// Seçeneklerle yeni sunucuyu başlat
4ns, err := server.NewServer(opts)
5
6//...
7
8// Süreç içi bağlantı aracılığıyla sunucuya bağlan
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • NATS istemcisinde Connect işlemi gerçekleştirilirken, InProcessServer alanına nats.InProcessServer(ns) değeri aktarıldığında,
 1// nats-go/nats.go
 2
 3// InProcessServer, TCP aracılığıyla bağlantı kurmak yerine,
 4// süreç içinde çalışan bir NATS sunucusuna doğrudan bağlantı kurmayı deneyecek bir Seçenektir.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • seçeneğin InProcessServer'ı embedded NATS sunucusu ile değiştirilir ve
 1// nats-go/nats.go
 2
 3// createConn, sunucuya bağlanacak ve uygun bufio yapılarını saracaktır.
 4// Mevcut bir bağlantı olduğunda doğru şeyi yapacaktır.
 5func (nc *Conn) createConn() (err error) {
 6	if nc.Opts.Timeout < 0 {
 7		return ErrBadTimeout
 8	}
 9	if _, cur := nc.currentServer(); cur == nil {
10		return ErrNoServers
11	}
12
13	// Eğer süreç içi bir sunucuya referansımız varsa,
14	// bu sunucuyu kullanarak bir bağlantı kur.
15	if nc.Opts.InProcessServer != nil {
16		conn, err := nc.Opts.InProcessServer.InProcessConn()
17		if err != nil {
18			return fmt.Errorf("süreç içi bağlantı alınamadı: %w", err)
19		}
20		nc.conn = conn
21		nc.bindToNewConn()
22		return nil
23	}
24	
25	//...
26}
  • ilgili arayüz, bağlantı oluşturan createConn fonksiyonunda InProcessServer seçeneği nil değilse (geçerliyse) InProcessServer'ın InProcessConn'unu çalıştırarak
 1// nats-server/server/server.go
 2
 3// InProcessConn, sunucuya süreç içi bir bağlantı döndürür,
 4// aynı süreç içindeki yerel bağlantı için bir TCP dinleyicisi kullanma ihtiyacını ortadan kaldırır.
 5// Bu, DontListen seçeneğinin durumundan bağımsız olarak kullanılabilir.
 6func (s *Server) InProcessConn() (net.Conn, error) {
 7	pl, pr := net.Pipe()
 8	if !s.startGoRoutine(func() {
 9		s.createClientInProcess(pl)
10		s.grWG.Done()
11	}) {
12		pl.Close()
13		pr.Close()
14		return nil, fmt.Errorf("bağlantı oluşturulamadı")
15	}
16	return pr, nil
17}
  • sunucuda uygulanan InProcessConn fonksiyonunu çağırır ve çalıştırır.
  • Bu fonksiyon, NATS'ın Go istemcisi olan nats.go'da, nc (NATS bağlantısı) InProcessServer'ı nil değilse çağrılır, bir bağlantı (net.Conn) oluşturur ve bunu sunucunun bağlantısına bağlar.

Go'nun tüketici odaklı arayüzü

Bir tür, yöntemlerini uygulayarak bir arayüzü uygular. Açık bir niyet beyanı, bir "implements" anahtar kelimesi yoktur. Örtük arayüzler, bir arayüzün tanımını uygulamasından ayırır, bu da önceden düzenleme olmaksızın herhangi bir pakette görünebilir.

Arayüzler örtük olarak uygulanır, A Tour of Go

Bir tür yalnızca bir arayüzü uygulamak için var ise ve o arayüzün ötesinde hiçbir zaman dışa aktarılmış yöntemleri olmayacaksa, türün kendisini dışa aktarmaya gerek yoktur.

Genellik, Effective Go

  • Bu arayüz tasarımı, Go'da sıkça bahsedilen consumer defined interface ve structural typing (duck typing) kavramlarını iyi yansıttığı için bu konuyu da tanıtmak istedim.
 1// nats-go/nats.go
 2
 3// Options, özelleştirilmiş bir bağlantı oluşturmak için kullanılabilir.
 4type Options struct {
 5	// Url, istemcinin bağlanacağı tek bir NATS sunucu URL'sini temsil eder.
 6	// Eğer Servers seçeneği de ayarlanmışsa,
 7	// bu durumda Servers dizisindeki ilk sunucu olur.
 8	Url string
 9
10	// InProcessServer, aynı süreç içinde çalışan bir NATS sunucusunu temsil eder.
11	// Bu ayarlanırsa, harici TCP bağlantıları kullanmak yerine
12	// doğrudan sunucuya bağlanmaya çalışacağız.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Koda geri dönelim. nats.go istemcisindeki InProcessServer seçeneği struct alanı, yalnızca InProcessConn'u gerçekleştiren InProcessConnProvider arayüzü olarak tanımlanmıştır.
 1// nats-server/server/server.go
 2
 3// InProcessConn, sunucuya süreç içi bir bağlantı döndürür,
 4// aynı süreç içindeki yerel bağlantı için bir TCP dinleyicisi kullanma ihtiyacını ortadan kaldırır.
 5// Bu, DontListen seçeneğinin durumundan bağımsız olarak kullanılabilir.
 6func (s *Server) InProcessConn() (net.Conn, error) {
 7	pl, pr := net.Pipe()
 8	if !s.startGoRoutine(func() {
 9		s.createClientInProcess(pl)
10		s.grWG.Done()
11	}) {
12		pl.Close()
13		pr.Close()
14		return nil, fmt.Errorf("bağlantı oluşturulamadı")
15	}
16	return pr, nil
17}
  • Ancak, içine giren tür, NATS-server'ın Server'ıdır ve InProcessConn'un yanı sıra çeşitli işlevleri de yerine getirmektedir.
  • Zira bu durumda istemcinin ilgilendiği tek şey InProcessConn arayüzünün sağlanıp sağlanmadığıdır; diğer hususlar büyük ölçüde önemli değildir.
  • Bu nedenle, nats.go istemcisi, yalnızca InProcessConn() (net.Conn, error) işlevini tanımlayan InProcessConnProvider adlı bir consumer defined interface oluşturarak bunu kullanmaktadır.

Sonuç

  • NATS'ın embedded mode'u, çalışma prensibi ve NATS kodunda gözlemlenebilen Go'nun consumer defined interface kavramı hakkında kısa bir inceleme sunulmuştur.
  • Bu bilgilerin, NATS'ı yukarıdaki amaçlarla kullanan kişilere faydalı olmasını temenni ederek bu yazıyı sonlandırmak isterim.