GoSuda

Gömülü NATS, Go uygulamasıyla nasıl iletişim kurar?

By prravda
views ...

Başlarken

NATS Hakkında

Yazılım uygulamaları ve hizmetleri veri alışverişinde bulunmak durumundadır. NATS, mesajlar şeklinde segmentlere ayrılmış bu tür bir veri alışverişini sağlayan bir altyapıdır. Bunu "mesaj odaklı bir middleware" olarak adlandırırız.

NATS ile uygulama geliştiricileri şunları yapabilir:

  • Dağıtılmış ve ölçeklenebilir istemci-sunucu uygulamalarını zahmetsizce inşa edebilirler.
  • Verileri gerçek zamanlı olarak genel bir şekilde depolayabilir ve dağıtabilirler. Bu, çeşitli ortamlar, diller, bulut sağlayıcıları ve şirket içi sistemler arasında esnek bir şekilde gerçekleştirilebilir.

NATS Nedir, NATS dokümanları

  • NATS, Go ile oluşturulmuş bir mesaj brokeridir.

Embedded NATS

Uygulamanız Go dilinde ise ve kullanım senaryolarınız ile dağıtım durumlarınıza uygunsa, bir NATS sunucusunu uygulamanızın içine bile gömebilirsiniz.

NATS'ı Gömme, NATS dokümanları

  • Ayrıca NATS'ın ilginç bir özelliği var: Go ile oluşturulmuş uygulamalar için embedded mode desteği sunması.
  • Yani, mesaj brokerlerinin genel yöntemi olan ayrı bir broker sunucusu çalıştırıp bu sunucuyla uygulamanın istemcisi aracılığıyla iletişim kurmak yerine, broker'ın kendisini Go ile yazılmış bir uygulamaya gömebilme (embed) imkanı sunuyor.

Embedded NATS'ın Faydaları ve Kullanım Alanları

  • İyi açıklanmış bir Youtube videosu bulunduğundan, videonun bağlantısı yeterli olacaktır.
  • Ayrı bir mesaj broker sunucusu dağıtmak zorunda kalmadan, concerns ayrımını (separation of concern) sağlayarak modular monolith application oluşturup NATS'ı embedded olarak yerleştirmenin avantajından faydalanmak mümkündür. Ayrıca, single binary deployment de mümkün hale gelir.
  • Yalnızca no network (wasm) platformlarında değil, offline-first application'larda da kullanışlı bir şekilde kullanılabilir.

Resmi Dokümanlardaki Örnek

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Seçeneklerle yeni sunucuyu başlat
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Goroutine aracılığıyla sunucuyu başlat
22    go ns.Start()
23
24    // Sunucunun bağlantılar için hazır olmasını bekle
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("bağlantı için hazır değil")
27    }
28
29    // Sunucuya bağlan
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Konuya abone ol
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Mesaj verisini yazdır
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Sunucuyu kapat (isteğe bağlı)
45        ns.Shutdown()
46    })
47
48    // Konuya veri yayınla
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Sunucunun kapanmasını bekle
52    ns.WaitForShutdown()
53}
  • NATS resmi dokümanlarının işaret ettiği Embedded NATS örneği budur, ancak bu örnek kodla ilerlendiğinde embedding mode ile iletişim kurulamaz.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • watch 'netstat -an | grep 127.0.0.1' komutu aracılığıyla localhost (127.0.0.1) üzerinden gerçekleşen ağ trafiğini kontrol ederken, ilgili go dosyasını go run . ile çalıştırdığınızda, NATS'ın varsayılan portu olan 4222'den kaynaklanan yeni ağ isteklerinin eklendiği görülebilir.

Embedded Mode İçin Doğru Konfigürasyonlar

  • Embedded mode ile beklenen şekilde iletişim kurmak için aşağıdaki iki seçeneğe ihtiyaç duyulmaktadır:

    • Client: InProcessServer seçeneği eklenmelidir.
    • Server: Server.Options içinde DontListen flag'i true olarak belirtilmelidir.
  • Bu kısımlar resmi olarak belgelenmemiş olup, bu özelliğin başlangıcı ilgili PR aracılığıyla anlaşılabilir.

    Bu PR üç şey ekler:

    1. TCP soketleri kullanmadan NATS sunucusuna bağlantı kurmak için bir net.Pipe oluşturan Server'a InProcessConn() fonksiyonu
    2. NATS sunucusuna normal TCP dinleyicisinde dinlememesini söyleyen DontListen seçeneği
    3. AcceptLoop başlamadan hemen önce kapanan ve readyForConnections'ın bekleyeceği startupComplete kanalı

    Bunun temel motivasyonu, bir monolith (tek süreçli) modda veya bir polylith (çok süreçli) modda çalışabilen bir uygulamamızın olmasıdır. Basitlik adına her iki mod için de NATS'ı kullanabilmek isteriz, ancak monolith modunun, soket bağlantılarının açılmasının ya mantıklı olmadığı (mobil) ya da mümkün olmadığı (WASM) çeşitli platformlara hitap edebilmesi gerekir. Bu değişiklikler, NATS'ı tamamen süreç içi kullanmamıza izin verecektir.

    Eşlik eden bir PR [nats-io/nats.go#774], istemci tarafına destek ekler.

    Bu projeye ilk PR'ım, bu yüzden herhangi bir şeyi gözden kaçırdıysam şimdiden özür dilerim.

    /cc @nats-io/core

Embedded Mode İçin Çalışan Örnek

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// gömülü NATS sunucusunu yapılandırmak için
14		// DonListen'i true olarak ayarla
15		DontListen: true,
16	}
17
18	// Seçeneklerle yeni sunucuyu başlat
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Goroutine aracılığıyla sunucuyu başlat
26	go ns.Start()
27
28	// Sunucunun bağlantılar için hazır olmasını bekle
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("bağlantı için hazır değil")
31	}
32
33	// Süreç içi bağlantı aracılığıyla sunucuya bağlan
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Konuya abone ol
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Mesaj verisini yazdır
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Sunucuyu kapat (isteğe bağlı)
49		ns.Shutdown()
50	})
51
52	// Konuya veri yayınla
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Sunucunun kapanmasını bekle
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Artık amaçlandığı gibi ek bir network hop oluşmadığı görülebilir.

Arka Plan

TL;DR

diagram1

  • Bu kod main.go içinde çalıştırıldığında, dahili olarak hangi fonksiyonların nasıl çalıştığını gösteren bir sequence diagramıdır ve ana hatlarıyla şunları açıklar:
    • DontListen: true sayesinde sunucu, client listening phase olan AcceptLoop'u atlar.
    • Client'ın Connect option'larından InProcessServer aktif hale gelirse, in-memory connection oluşturur ve net.Pipe aracılığıyla bir pipe oluşturduktan sonra pipe'ın ucunu client'a net.Conn tipi olarak geri döndürür.
    • Client ve server, bu bağlantı aracılığıyla in-process communication gerçekleştirir.

Sunucu

AcceptLoop

1// nats-server/server/server.go
2
3// İstemcileri bekle.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Öncelikle, DontListen true ise, AcceptLoop adındaki istemci dinleme aşaması atlanır.
 1// nats-server/server/server.go
 2
 3// AcceptLoop, daha kolay test için dışa aktarılmıştır.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Dinleyici düzgün bir şekilde kurulmadan önce çıkış yapmamız gerekirse,
 6	// kanalın kapatıldığından emin ol.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Sunucu seçeneklerini anlık görüntüle.
18	opts := s.getOpts()
19
20	// Kapanmayı etkinleştirebilecek durumu ayarla
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Portta dinlenirken hata oluştu: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("İstemci bağlantıları için %s üzerinde dinleniyor",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// TLS etkinse uyar.
34	if opts.TLSConfig != nil {
35		s.Noticef("İstemci bağlantıları için TLS gerekli")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("\"TLS Handshake First\" seçeneğini kullanmayan istemciler bağlanamayacak")
38		}
39	}
40
41	// Sunucu RANDOM_PORT (-1) ile başlatıldıysa, opts.Port bu fonksiyonun
42	// başında 0'a eşit olacaktır. Bu yüzden gerçek portu almamız gerekiyor
43	if opts.Port == 0 {
44		// Çözümlenmiş portu seçeneklere geri yaz.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Port ayarlandıktan sonra (RANDOM'a ayarlandıysa),
49	// sunucunun bilgi Host/Port'unu Options veya ClientAdvertise değerleri
50	// ile ayarla.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("ClientAdvertise değeri %s ile sunucu INFO ayarlanırken hata oluştu, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// İstemci bağlantı URL'lerini takip et. Daha sonra ihtiyacımız olabilir.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Yeni istemcileri kabul etmediğimizi bildir
65				s.ldmCh <- true
66				// Şimdi Kapanmayı bekle...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Çağıranı hazır olduğumuzu bildir
75	close(clr)
76	clr = nil
77}
  • Ayrıca, AcceptLoop fonksiyonu aşağıdaki süreçleri gerçekleştirir: TLS veya hostPort gibi ağ iletişimi ile ilgili kısımlar, in-process communication yapıldığında gereksiz olan bölümler olduğundan atlanabilir.

İstemci

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect, NATS sistemine bağlanmayı deneyecektir.
 5// URL, kullanıcı adı/şifre semantiği içerebilir. Örneğin: nats://derek:pass@localhost:4222
 6// Virgülle ayrılmış diziler de desteklenir, örneğin urlA, urlB.
 7// Seçenekler varsayılanlarla başlar ancak geçersiz kılınabilir.
 8// Bir NATS Sunucusunun websocket portuna bağlanmak için `ws` veya `wss` şemasını kullanın, örneğin
 9// `ws://localhost:8080`. Websocket şemalarının diğerleriyle (nats/tls) karıştırılamayacağını unutmayın.
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options, özelleştirilmiş bir bağlantı oluşturmak için kullanılabilir.
 4type Options struct {
 5	// Url, istemcinin bağlanacağı tek bir NATS sunucusu URL'sini temsil eder.
 6	// Servers seçeneği de ayarlanmışsa, o zaman Servers dizisindeki ilk sunucu olur.
 7	Url string
 8
 9	// InProcessServer, aynı süreç içinde çalışan bir NATS sunucusunu temsil eder.
10	// Bu ayarlanmışsa, harici TCP bağlantıları kullanmak yerine sunucuya doğrudan
11	// bağlanmaya çalışacağız.
12	InProcessServer InProcessConnProvider
13	
14	//...
15}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • nats server ve nats client arasındaki bağlantıyı sağlayan Connect fonksiyonu, client URL ve connect Option'ı ayarlayabilir ve bu Option'ları bir araya getiren Options struct'ı içinde InProcessConnProvider interface tipinde InProcessServer adında bir alan bulunur.
1// örnek kodun main.go'su
2
3// Seçeneklerle yeni sunucuyu başlat
4ns, err := server.NewServer(opts)
5
6//...
7
8// Süreç içi bağlantı aracılığıyla sunucuya bağlan
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • nats client'ta Connect işlemi gerçekleştirilirken, InProcessServer alanına nats.InProcessServer(ns) değeri aktarıldığında
 1// nats-go/nats.go
 2
 3// InProcessServer, TCP aracılığıyla bağlantı kurmak yerine, süreç içinde çalışan bir NATS sunucusuna
 4// doğrudan bağlantı kurmayı deneyecek bir Option'dır.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • option'ın InProcessServer'ı embedded nats server ile değiştirilir ve
 1// nats-go/nats.go
 2
 3// createConn, sunucuya bağlanacak ve uygun bufio yapılarını saracaktır.
 4// Mevcut bir bağlantı olduğunda doğru şeyi yapacaktır.
 5func (nc *Conn) createConn() (err error) {
 6	if nc.Opts.Timeout < 0 {
 7		return ErrBadTimeout
 8	}
 9	if _, cur := nc.currentServer(); cur == nil {
10		return ErrNoServers
11	}
12
13	// Eğer süreç içi bir sunucuya referansımız varsa, o zaman onu kullanarak bir bağlantı kur.
14	if nc.Opts.InProcessServer != nil {
15		conn, err := nc.Opts.InProcessServer.InProcessConn()
16		if err != nil {
17			return fmt.Errorf("süreç içi bağlantı alınamadı: %w", err)
18		}
19		nc.conn = conn
20		nc.bindToNewConn()
21		return nil
22	}
23	
24	//...
25}
  • Bu interface, bağlantı oluşturan createConn fonksiyonunda InProcessServer seçeneği nil değilse (geçerliyse), seçenekteki InProcessServer'ın InProcessConn'unu çalıştırarak
 1// nats-server/server/server.go
 2
 3// InProcessConn, sunucuya süreç içi bir bağlantı döndürür,
 4// aynı süreç içindeki yerel bağlantı için bir TCP dinleyicisi kullanma ihtiyacını ortadan kaldırır.
 5// Bu, DontListen seçeneğinin durumundan bağımsız olarak kullanılabilir.
 6func (s *Server) InProcessConn() (net.Conn, error) {
 7	pl, pr := net.Pipe()
 8	if !s.startGoRoutine(func() {
 9		s.createClientInProcess(pl)
10		s.grWG.Done()
11	}) {
12		pl.Close()
13		pr.Close()
14		return nil, fmt.Errorf("bağlantı oluşturulamadı")
15	}
16	return pr, nil
17}
  • sunucuda uygulanan InProcessConn'u çağırarak çalıştırır.
  • Bu fonksiyon, NATS'ın Go client'ı olan nats.go'da nc (nats connection)'nin InProcessServer'ı nil değilse çağrılır ve bir bağlantı (net.Conn) oluşturur, ardından bunu sunucunun bağlantısına bağlar.

Go'nun Consumer Driven Interface'i

Bir tür, metodlarını uygulayarak bir interface'i implemente eder. Açık bir niyet beyanı, yani "implements" anahtar kelimesi yoktur. Implicit interface'ler, bir interface'in tanımını, herhangi bir ön düzenleme olmaksızın herhangi bir pakette görünebilecek olan implementasyonundan ayırır.

Interface'ler implicit olarak implemente edilir, A Tour of Go

Eğer bir tür yalnızca bir interface'i implemente etmek için var ise ve o interface dışındaki hiçbir exported metoda sahip olmayacaksa, türün kendisini export etmeye gerek yoktur.

Genellik, Effective Go

  • Bu interface tasarımı, Go'da sıkça bahsedilen consumer defined interface ve structural typing (duck typing) kavramlarını iyi yansıttığı için bu konuyu da tanıtmak istedim.
 1// nats-go/nats.go
 2
 3// Options, özelleştirilmiş bir bağlantı oluşturmak için kullanılabilir.
 4type Options struct {
 5	// Url, istemcinin bağlanacağı tek bir NATS sunucusu URL'sini temsil eder.
 6	// Servers seçeneği de ayarlanmışsa, o zaman Servers dizisindeki ilk sunucu olur.
 7	Url string
 8
 9	// InProcessServer, aynı süreç içinde çalışan bir NATS sunucusunu temsil eder.
10	// Bu ayarlanmışsa, harici TCP bağlantıları kullanmak yerine sunucuya doğrudan
11	// bağlanmaya çalışacağız.
12	InProcessServer InProcessConnProvider
13	
14	//...
15}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Tekrar koda dönelim. nats.go client'ında InProcessServer option struct alanı, yalnızca InProcessConn'u gerçekleştiren InProcessConnProvider interface'i olarak tanımlanmıştır.
 1// nats-server/server/server.go
 2
 3// InProcessConn, sunucuya süreç içi bir bağlantı döndürür,
 4// aynı süreç içindeki yerel bağlantı için bir TCP dinleyicisi kullanma ihtiyacını ortadan kaldırır.
 5// Bu, DontListen seçeneğinin durumundan bağımsız olarak kullanılabilir.
 6func (s *Server) InProcessConn() (net.Conn, error) {
 7	pl, pr := net.Pipe()
 8	if !s.startGoRoutine(func() {
 9		s.createClientInProcess(pl)
10		s.grWG.Done()
11	}) {
12		pl.Close()
13		pr.Close()
14		return nil, fmt.Errorf("bağlantı oluşturulamadı")
15	}
16	return pr, nil
17}
  • Ancak, ona giren tip nats-server'ın Server'ıdır ve InProcessConn'un yanı sıra çeşitli işlevleri de yerine getirmektedir.
  • Çünkü bu durumda client'ın ilgi alanı, InProcessConn adında bir interface'in sağlanıp sağlanmadığıdır; diğer şeyler pek önemli değildir.
  • Bu nedenle, nats.go client'ı, yalnızca InProcessConn() (net.Conn, error) işlevini tanımlayan InProcessConnProvider adında bir consumer defined interface oluşturarak kullanmaktadır.

Sonuç

  • NATS'ın embedded mode'u ve çalışma şekli ile NATS kodunda görülebilecek Go'nun consumer defined interface'i kısaca ele alınmıştır.
  • Bu bilgilerin, NATS'ı yukarıdaki amaçlarla kullanan kişilere yardımcı olmasını umarak bu yazıyı bitirmek istiyorum.