Gömülü NATS, Go uygulamasıyla nasıl iletişim kurar?
Başlarken
NATS Hakkında
Yazılım uygulamaları ve hizmetlerinin veri alışverişinde bulunması gerekmektedir. NATS, mesajlar şeklinde segmentlere ayrılmış böyle bir veri alışverişini sağlayan bir altyapıdır. Buna "mesaj odaklı ara katman yazılımı" adını veriyoruz.
NATS ile uygulama geliştiricileri şunları yapabilir:
- Dağıtık ve ölçeklenebilir istemci-sunucu uygulamalarını zahmetsizce oluşturabilirler.
- Verileri genel bir şekilde gerçek zamanlı olarak depolayabilir ve dağıtabilirler. Bu, çeşitli ortamlar, diller, bulut sağlayıcıları ve şirket içi sistemler arasında esnek bir şekilde gerçekleştirilebilir.
- NATS, Go ile oluşturulmuş bir mesaj broker'ıdır.
Embedded NATS
Uygulamanız Go dilinde ise ve kullanım durumunuza ve dağıtım senaryolarınıza uyuyorsa, uygulamanızın içine bir NATS sunucusu bile gömebilirsiniz.
- NATS'ın özel bir özelliği vardır: Go ile oluşturulmuş uygulamalar için embedded mode desteği sunar.
- Yani, mesaj broker'larının genel yöntemi olan ayrı bir broker sunucusu çalıştırıp ardından bu sunucu ile uygulamanın istemcisi aracılığıyla iletişim kurmak yerine, broker'ın kendisini Go ile yazılmış bir uygulamaya gömebilirsiniz (embed).
Embedded NATS'ın Faydaları ve Kullanım Durumları
- İyi açıklanmış bir Youtube videosu bulunduğundan, video bağlantısı ile yetinilmektedir.
- Ayrı bir mesaj broker sunucusu dağıtmaya gerek kalmadan modular monolith application oluşturarak "separate of concern" ilkesini yerine getirirken, NATS'ı embedded olarak yerleştirmenin avantajından yararlanmak mümkündür. Ayrıca, single binary deployment da mümkün hale gelir.
- Platform with no network (wasm) ortamlarında olduğu gibi, offline-first application'larda da faydalı bir şekilde kullanılabilir.
Resmi belgelerdeki örnek
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Seçeneklerle yeni sunucuyu başlat
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Sunucuyu goroutine aracılığıyla başlat
22 go ns.Start()
23
24 // Sunucunun bağlantılar için hazır olmasını bekle
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("bağlantı için hazır değil")
27 }
28
29 // Sunucuya bağlan
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Konuya abone ol
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Mesaj verilerini yazdır
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Sunucuyu kapat (isteğe bağlı)
45 ns.Shutdown()
46 })
47
48 // Konuya veri yayınla
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Sunucunun kapanmasını bekle
52 ns.WaitForShutdown()
53}
- NATS resmi belgelerinde yer alan Embedded NATS örneği bu şekildedir; ancak bu örnek koda göre ilerlendiğinde embedding mode ile iletişim kurulmamaktadır.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
watch 'netstat -an | grep 127.0.0.1'komutu aracılığıyla localhost (127.0.0.1) üzerindeki ağ trafiğini kontrol ederken,go run .komutuyla ilgili go dosyasını çalıştırdığınızda, NATS'ın varsayılan portu olan4222'den kaynaklanan yeni ağ isteklerinin eklendiğini gözlemleyebilirsiniz.
Embedded mode için doğru yapılandırmalar
Embedded mode ile beklenen şekilde iletişim kurabilmek için aşağıdaki iki seçeneğe ihtiyaç duyulmaktadır.
- Client:
InProcessServerseçeneği eklenmelidir. - Server:
Server.OptionsiçerisindeDontListenflag'itrueolarak açıkça belirtilmelidir.
- Client:
Bu hususlar resmi olarak belgelenmemiştir ve bu özelliğin başlangıcı ilgili PR aracılığıyla tespit edilebilir.
Bu PR üç şey ekler:
Server'a, TCP soketleri kullanmadan NATS sunucusuna bağlantı kurmak için birnet.PipeoluşturanInProcessConn()fonksiyonu- NATS sunucusuna alışılagelmiş TCP dinleyicisi üzerinde dinleme yapmamasını söyleyen
DontListenseçeneği AcceptLoopbaşlamadan hemen önce kapananstartupCompletekanalı vereadyForConnections'ın bunu bekleyeceği
Bunun ana motivasyonu, hem monolit (tek süreçli) modda hem de polilit (çok süreçli) modda çalışabilen bir uygulamamız olmasıdır. Basitlik açısından her iki mod için de NATS'ı kullanabilmek istiyoruz, ancak monolit modun, soket bağlantılarının ya anlamsız olduğu (mobil) ya da mümkün olmadığı (WASM) çeşitli platformlara hitap edebilmesi gerekiyor. Bu değişiklikler, NATS'ı tamamen süreç içi kullanmamıza olanak sağlayacaktır.
Eşlik eden bir PR olan nats-io/nats.go#774, istemci tarafı desteği eklemektedir.
Bu, bu projeye yaptığım ilk PR'dır, bu yüzden herhangi bir şeyi gözden kaçırdıysam şimdiden özür dilerim.
/cc @nats-io/core
Embedded mode için Çalışan Örnek
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // gömülü NATS sunucusunu yapılandırmak için
14 // DonListen'ı true olarak ayarla
15 DontListen: true,
16 }
17
18 // Seçeneklerle yeni sunucuyu başlat
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Sunucuyu goroutine aracılığıyla başlat
26 go ns.Start()
27
28 // Sunucunun bağlantılar için hazır olmasını bekle
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("bağlantı için hazır değil")
31 }
32
33 // Süreç içi bağlantı aracılığıyla sunucuya bağlan
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Konuya abone ol
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Mesaj verilerini yazdır
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Sunucuyu kapat (isteğe bağlı)
49 ns.Shutdown()
50 })
51
52 // Konuya veri yayınla
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Sunucunun kapanmasını bekle
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Artık beklenen şekilde ek bir network hop'ının meydana gelmediği gözlemlenebilir.
Arka planda
TL;DR
- Bu kod
main.godosyasında çalıştırıldığında, dahili olarak hangi fonksiyonların nasıl çalıştığını gösteren bir sequence diagram'ıdır ve temel noktaları aşağıda açıklanmıştır.DontListen: truearacılığıyla sunucu,AcceptLoopadlı istemci dinleme aşamasını atlar.- İstemcinin Connect seçeneğindeki
InProcessServeretkinleştirilirse, bir in-memory bağlantı oluşturulur venet.Pipearacılığıyla bir pipe oluşturulduktan sonra pipe'ın sonu istemciyenet.Conntüründe döndürülür. - İstemci ve sunucu, bu bağlantı aracılığıyla in-process iletişimi gerçekleştirir.
Sunucu
AcceptLoop
1// nats-server/server/server.go
2
3// İstemcileri bekle.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Öncelikle,
DontListentrue olduğunda,AcceptLoopadlı istemci dinleme aşaması atlanır.
1// nats-server/server/server.go
2
3// AcceptLoop, daha kolay test için dışa aktarılmıştır.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Dinleyici düzgün bir şekilde kurulmadan çıkacak olsaydık,
6 // kanalın kapatıldığından emin ol.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Sunucu seçeneklerinin anlık görüntüsünü al.
18 opts := s.getOpts()
19
20 // Kapatmayı etkinleştirebilecek durumu ayarla
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Port üzerinde dinleme hatası: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("İstemci bağlantıları için dinleniyor: %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // TLS etkinse bildir.
34 if opts.TLSConfig != nil {
35 s.Noticef("İstemci bağlantıları için TLS gerekli")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("\"TLS Handshake First\" seçeneğini kullanmayan istemciler bağlanamayacaktır")
38 }
39 }
40
41 // Sunucu RANDOM_PORT (-1) ile başlatıldıysa, opts.Port bu fonksiyonun
42 // başında 0'a eşit olacaktır. Bu nedenle gerçek portu almamız gerekir.
43 if opts.Port == 0 {
44 // Çözümlenen portu seçeneklere geri yaz.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Port ayarlandıktan sonra (RANDOM olarak ayarlanmışsa),
49 // sunucunun bilgi Host/Port'unu Options veya
50 // ClientAdvertise değerleriyle ayarla.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Sunucu INFO'yu ClientAdvertise değeri %s ile ayarlarken hata oluştu, hata=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // İstemci bağlantı URL'lerini takip et. Bunlara daha sonra ihtiyacımız olabilir.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Yeni istemcileri kabul etmediğimizi bildir
65 s.ldmCh <- true
66 // Şimdi Kapanmayı bekle...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Çağırana hazır olduğumuzu bildir
75 close(clr)
76 clr = nil
77}
- Bu arada,
AcceptLoopfonksiyonu aşağıdaki süreçleri yürütür.TLSveyahostPortgibi ağ iletişimiyle ilgili kısımlar, in-process communication yapıldığında gereksiz hale geldiği için atlanabilir.
İstemci
InProcessServer
1
2// nats-go/nats.go
3
4// Connect, NATS sistemine bağlanmaya çalışacaktır.
5// URL, kullanıcı adı/şifre semantiği içerebilir. örn. nats://derek:pass@localhost:4222
6// Virgülle ayrılmış diziler de desteklenir, örn. urlA, urlB.
7// Seçenekler varsayılanlarla başlar ancak geçersiz kılınabilir.
8// Bir NATS Sunucusunun websocket portuna bağlanmak için `ws` veya `wss` şemasını kullanın, örneğin
9// `ws://localhost:8080`. Websocket şemalarının diğerleriyle (nats/tls) karıştırılamayacağını unutmayın.
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options, özelleştirilmiş bir bağlantı oluşturmak için kullanılabilir.
4type Options struct {
5 // Url, istemcinin bağlanacağı tek bir NATS sunucu URL'sini temsil eder.
6 // Eğer Servers seçeneği de ayarlanmışsa,
7 // bu durumda Servers dizisindeki ilk sunucu olur.
8 Url string
9
10 // InProcessServer, aynı süreç içinde çalışan bir NATS sunucusunu temsil eder.
11 // Bu ayarlanırsa, harici TCP bağlantıları kullanmak yerine
12 // doğrudan sunucuya bağlanmaya çalışacağız.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- NATS sunucusu ile NATS istemcisinin bağlantısını gerçekleştiren
Connectfonksiyonu, istemci URL'si ve bağlantı seçenekleri ayarlayabilmekte olup, bu seçenekleri barındıran Options struct'ındaInProcessConnProviderarayüz türündenInProcessServeradında bir alan bulunmaktadır.
1// main.go of example code
2
3// Seçeneklerle yeni sunucuyu başlat
4ns, err := server.NewServer(opts)
5
6//...
7
8// Süreç içi bağlantı aracılığıyla sunucuya bağlan
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- NATS istemcisinde Connect işlemi gerçekleştirilirken,
InProcessServeralanınanats.InProcessServer(ns)değeri aktarıldığında,
1// nats-go/nats.go
2
3// InProcessServer, TCP aracılığıyla bağlantı kurmak yerine,
4// süreç içinde çalışan bir NATS sunucusuna doğrudan bağlantı kurmayı deneyecek bir Seçenektir.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- seçeneğin InProcessServer'ı embedded NATS sunucusu ile değiştirilir ve
1// nats-go/nats.go
2
3// createConn, sunucuya bağlanacak ve uygun bufio yapılarını saracaktır.
4// Mevcut bir bağlantı olduğunda doğru şeyi yapacaktır.
5func (nc *Conn) createConn() (err error) {
6 if nc.Opts.Timeout < 0 {
7 return ErrBadTimeout
8 }
9 if _, cur := nc.currentServer(); cur == nil {
10 return ErrNoServers
11 }
12
13 // Eğer süreç içi bir sunucuya referansımız varsa,
14 // bu sunucuyu kullanarak bir bağlantı kur.
15 if nc.Opts.InProcessServer != nil {
16 conn, err := nc.Opts.InProcessServer.InProcessConn()
17 if err != nil {
18 return fmt.Errorf("süreç içi bağlantı alınamadı: %w", err)
19 }
20 nc.conn = conn
21 nc.bindToNewConn()
22 return nil
23 }
24
25 //...
26}
- ilgili arayüz, bağlantı oluşturan
createConnfonksiyonundaInProcessServerseçeneği nil değilse (geçerliyse) InProcessServer'ınInProcessConn'unu çalıştırarak
1// nats-server/server/server.go
2
3// InProcessConn, sunucuya süreç içi bir bağlantı döndürür,
4// aynı süreç içindeki yerel bağlantı için bir TCP dinleyicisi kullanma ihtiyacını ortadan kaldırır.
5// Bu, DontListen seçeneğinin durumundan bağımsız olarak kullanılabilir.
6func (s *Server) InProcessConn() (net.Conn, error) {
7 pl, pr := net.Pipe()
8 if !s.startGoRoutine(func() {
9 s.createClientInProcess(pl)
10 s.grWG.Done()
11 }) {
12 pl.Close()
13 pr.Close()
14 return nil, fmt.Errorf("bağlantı oluşturulamadı")
15 }
16 return pr, nil
17}
- sunucuda uygulanan
InProcessConnfonksiyonunu çağırır ve çalıştırır. - Bu fonksiyon, NATS'ın Go istemcisi olan
nats.go'da, nc (NATS bağlantısı)InProcessServer'ı nil değilse çağrılır, bir bağlantı (net.Conn) oluşturur ve bunu sunucunun bağlantısına bağlar.
Go'nun tüketici odaklı arayüzü
Bir tür, yöntemlerini uygulayarak bir arayüzü uygular. Açık bir niyet beyanı, bir "implements" anahtar kelimesi yoktur. Örtük arayüzler, bir arayüzün tanımını uygulamasından ayırır, bu da önceden düzenleme olmaksızın herhangi bir pakette görünebilir.
Arayüzler örtük olarak uygulanır, A Tour of Go
Bir tür yalnızca bir arayüzü uygulamak için var ise ve o arayüzün ötesinde hiçbir zaman dışa aktarılmış yöntemleri olmayacaksa, türün kendisini dışa aktarmaya gerek yoktur.
- Bu arayüz tasarımı, Go'da sıkça bahsedilen consumer defined interface ve structural typing (duck typing) kavramlarını iyi yansıttığı için bu konuyu da tanıtmak istedim.
1// nats-go/nats.go
2
3// Options, özelleştirilmiş bir bağlantı oluşturmak için kullanılabilir.
4type Options struct {
5 // Url, istemcinin bağlanacağı tek bir NATS sunucu URL'sini temsil eder.
6 // Eğer Servers seçeneği de ayarlanmışsa,
7 // bu durumda Servers dizisindeki ilk sunucu olur.
8 Url string
9
10 // InProcessServer, aynı süreç içinde çalışan bir NATS sunucusunu temsil eder.
11 // Bu ayarlanırsa, harici TCP bağlantıları kullanmak yerine
12 // doğrudan sunucuya bağlanmaya çalışacağız.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Koda geri dönelim.
nats.goistemcisindekiInProcessServerseçeneği struct alanı, yalnızcaInProcessConn'u gerçekleştirenInProcessConnProviderarayüzü olarak tanımlanmıştır.
1// nats-server/server/server.go
2
3// InProcessConn, sunucuya süreç içi bir bağlantı döndürür,
4// aynı süreç içindeki yerel bağlantı için bir TCP dinleyicisi kullanma ihtiyacını ortadan kaldırır.
5// Bu, DontListen seçeneğinin durumundan bağımsız olarak kullanılabilir.
6func (s *Server) InProcessConn() (net.Conn, error) {
7 pl, pr := net.Pipe()
8 if !s.startGoRoutine(func() {
9 s.createClientInProcess(pl)
10 s.grWG.Done()
11 }) {
12 pl.Close()
13 pr.Close()
14 return nil, fmt.Errorf("bağlantı oluşturulamadı")
15 }
16 return pr, nil
17}
- Ancak, içine giren tür, NATS-server'ın
Server'ıdır veInProcessConn'un yanı sıra çeşitli işlevleri de yerine getirmektedir. - Zira bu durumda istemcinin ilgilendiği tek şey
InProcessConnarayüzünün sağlanıp sağlanmadığıdır; diğer hususlar büyük ölçüde önemli değildir. - Bu nedenle,
nats.goistemcisi, yalnızcaInProcessConn() (net.Conn, error)işlevini tanımlayanInProcessConnProvideradlı bir consumer defined interface oluşturarak bunu kullanmaktadır.
Sonuç
- NATS'ın embedded mode'u, çalışma prensibi ve NATS kodunda gözlemlenebilen Go'nun consumer defined interface kavramı hakkında kısa bir inceleme sunulmuştur.
- Bu bilgilerin, NATS'ı yukarıdaki amaçlarla kullanan kişilere faydalı olmasını temenni ederek bu yazıyı sonlandırmak isterim.