Gömülü NATS, Go uygulamasıyla nasıl iletişim kurar?
Başlarken
NATS Hakkında
Yazılım uygulamaları ve hizmetleri veri alışverişinde bulunmak durumundadır. NATS, mesajlar şeklinde segmentlere ayrılmış bu tür bir veri alışverişini sağlayan bir altyapıdır. Bunu "mesaj odaklı bir middleware" olarak adlandırırız.
NATS ile uygulama geliştiricileri şunları yapabilir:
- Dağıtılmış ve ölçeklenebilir istemci-sunucu uygulamalarını zahmetsizce inşa edebilirler.
- Verileri gerçek zamanlı olarak genel bir şekilde depolayabilir ve dağıtabilirler. Bu, çeşitli ortamlar, diller, bulut sağlayıcıları ve şirket içi sistemler arasında esnek bir şekilde gerçekleştirilebilir.
- NATS, Go ile oluşturulmuş bir mesaj brokeridir.
Embedded NATS
Uygulamanız Go dilinde ise ve kullanım senaryolarınız ile dağıtım durumlarınıza uygunsa, bir NATS sunucusunu uygulamanızın içine bile gömebilirsiniz.
NATS'ı Gömme, NATS dokümanları
- Ayrıca NATS'ın ilginç bir özelliği var: Go ile oluşturulmuş uygulamalar için embedded mode desteği sunması.
- Yani, mesaj brokerlerinin genel yöntemi olan ayrı bir broker sunucusu çalıştırıp bu sunucuyla uygulamanın istemcisi aracılığıyla iletişim kurmak yerine, broker'ın kendisini Go ile yazılmış bir uygulamaya gömebilme (embed) imkanı sunuyor.
Embedded NATS'ın Faydaları ve Kullanım Alanları
- İyi açıklanmış bir Youtube videosu bulunduğundan, videonun bağlantısı yeterli olacaktır.
- Ayrı bir mesaj broker sunucusu dağıtmak zorunda kalmadan, concerns ayrımını (separation of concern) sağlayarak modular monolith application oluşturup NATS'ı embedded olarak yerleştirmenin avantajından faydalanmak mümkündür. Ayrıca, single binary deployment de mümkün hale gelir.
- Yalnızca no network (wasm) platformlarında değil, offline-first application'larda da kullanışlı bir şekilde kullanılabilir.
Resmi Dokümanlardaki Örnek
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Seçeneklerle yeni sunucuyu başlat
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Goroutine aracılığıyla sunucuyu başlat
22 go ns.Start()
23
24 // Sunucunun bağlantılar için hazır olmasını bekle
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("bağlantı için hazır değil")
27 }
28
29 // Sunucuya bağlan
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Konuya abone ol
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Mesaj verisini yazdır
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Sunucuyu kapat (isteğe bağlı)
45 ns.Shutdown()
46 })
47
48 // Konuya veri yayınla
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Sunucunun kapanmasını bekle
52 ns.WaitForShutdown()
53}
- NATS resmi dokümanlarının işaret ettiği Embedded NATS örneği budur, ancak bu örnek kodla ilerlendiğinde embedding mode ile iletişim kurulamaz.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
watch 'netstat -an | grep 127.0.0.1'
komutu aracılığıyla localhost (127.0.0.1) üzerinden gerçekleşen ağ trafiğini kontrol ederken, ilgili go dosyasınıgo run .
ile çalıştırdığınızda, NATS'ın varsayılan portu olan4222
'den kaynaklanan yeni ağ isteklerinin eklendiği görülebilir.
Embedded Mode İçin Doğru Konfigürasyonlar
Embedded mode ile beklenen şekilde iletişim kurmak için aşağıdaki iki seçeneğe ihtiyaç duyulmaktadır:
- Client:
InProcessServer
seçeneği eklenmelidir. - Server:
Server.Options
içindeDontListen
flag'itrue
olarak belirtilmelidir.
- Client:
Bu kısımlar resmi olarak belgelenmemiş olup, bu özelliğin başlangıcı ilgili PR aracılığıyla anlaşılabilir.
Bu PR üç şey ekler:
- TCP soketleri kullanmadan NATS sunucusuna bağlantı kurmak için bir
net.Pipe
oluşturanServer
'aInProcessConn()
fonksiyonu - NATS sunucusuna normal TCP dinleyicisinde dinlememesini söyleyen
DontListen
seçeneği AcceptLoop
başlamadan hemen önce kapanan vereadyForConnections
'ın bekleyeceğistartupComplete
kanalı
Bunun temel motivasyonu, bir monolith (tek süreçli) modda veya bir polylith (çok süreçli) modda çalışabilen bir uygulamamızın olmasıdır. Basitlik adına her iki mod için de NATS'ı kullanabilmek isteriz, ancak monolith modunun, soket bağlantılarının açılmasının ya mantıklı olmadığı (mobil) ya da mümkün olmadığı (WASM) çeşitli platformlara hitap edebilmesi gerekir. Bu değişiklikler, NATS'ı tamamen süreç içi kullanmamıza izin verecektir.
Eşlik eden bir PR [nats-io/nats.go#774], istemci tarafına destek ekler.
Bu projeye ilk PR'ım, bu yüzden herhangi bir şeyi gözden kaçırdıysam şimdiden özür dilerim.
/cc @nats-io/core
- TCP soketleri kullanmadan NATS sunucusuna bağlantı kurmak için bir
Embedded Mode İçin Çalışan Örnek
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // gömülü NATS sunucusunu yapılandırmak için
14 // DonListen'i true olarak ayarla
15 DontListen: true,
16 }
17
18 // Seçeneklerle yeni sunucuyu başlat
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Goroutine aracılığıyla sunucuyu başlat
26 go ns.Start()
27
28 // Sunucunun bağlantılar için hazır olmasını bekle
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("bağlantı için hazır değil")
31 }
32
33 // Süreç içi bağlantı aracılığıyla sunucuya bağlan
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Konuya abone ol
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Mesaj verisini yazdır
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Sunucuyu kapat (isteğe bağlı)
49 ns.Shutdown()
50 })
51
52 // Konuya veri yayınla
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Sunucunun kapanmasını bekle
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Artık amaçlandığı gibi ek bir network hop oluşmadığı görülebilir.
Arka Plan
TL;DR
- Bu kod
main.go
içinde çalıştırıldığında, dahili olarak hangi fonksiyonların nasıl çalıştığını gösteren bir sequence diagramıdır ve ana hatlarıyla şunları açıklar:DontListen: true
sayesinde sunucu, client listening phase olanAcceptLoop
'u atlar.- Client'ın Connect option'larından
InProcessServer
aktif hale gelirse, in-memory connection oluşturur venet.Pipe
aracılığıyla bir pipe oluşturduktan sonra pipe'ın ucunu client'anet.Conn
tipi olarak geri döndürür. - Client ve server, bu bağlantı aracılığıyla in-process communication gerçekleştirir.
Sunucu
AcceptLoop
1// nats-server/server/server.go
2
3// İstemcileri bekle.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Öncelikle,
DontListen
true ise,AcceptLoop
adındaki istemci dinleme aşaması atlanır.
1// nats-server/server/server.go
2
3// AcceptLoop, daha kolay test için dışa aktarılmıştır.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Dinleyici düzgün bir şekilde kurulmadan önce çıkış yapmamız gerekirse,
6 // kanalın kapatıldığından emin ol.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Sunucu seçeneklerini anlık görüntüle.
18 opts := s.getOpts()
19
20 // Kapanmayı etkinleştirebilecek durumu ayarla
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Portta dinlenirken hata oluştu: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("İstemci bağlantıları için %s üzerinde dinleniyor",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // TLS etkinse uyar.
34 if opts.TLSConfig != nil {
35 s.Noticef("İstemci bağlantıları için TLS gerekli")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("\"TLS Handshake First\" seçeneğini kullanmayan istemciler bağlanamayacak")
38 }
39 }
40
41 // Sunucu RANDOM_PORT (-1) ile başlatıldıysa, opts.Port bu fonksiyonun
42 // başında 0'a eşit olacaktır. Bu yüzden gerçek portu almamız gerekiyor
43 if opts.Port == 0 {
44 // Çözümlenmiş portu seçeneklere geri yaz.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Port ayarlandıktan sonra (RANDOM'a ayarlandıysa),
49 // sunucunun bilgi Host/Port'unu Options veya ClientAdvertise değerleri
50 // ile ayarla.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("ClientAdvertise değeri %s ile sunucu INFO ayarlanırken hata oluştu, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // İstemci bağlantı URL'lerini takip et. Daha sonra ihtiyacımız olabilir.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Yeni istemcileri kabul etmediğimizi bildir
65 s.ldmCh <- true
66 // Şimdi Kapanmayı bekle...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Çağıranı hazır olduğumuzu bildir
75 close(clr)
76 clr = nil
77}
- Ayrıca, AcceptLoop fonksiyonu aşağıdaki süreçleri gerçekleştirir:
TLS
veyahostPort
gibi ağ iletişimi ile ilgili kısımlar, in-process communication yapıldığında gereksiz olan bölümler olduğundan atlanabilir.
İstemci
InProcessServer
1
2// nats-go/nats.go
3
4// Connect, NATS sistemine bağlanmayı deneyecektir.
5// URL, kullanıcı adı/şifre semantiği içerebilir. Örneğin: nats://derek:pass@localhost:4222
6// Virgülle ayrılmış diziler de desteklenir, örneğin urlA, urlB.
7// Seçenekler varsayılanlarla başlar ancak geçersiz kılınabilir.
8// Bir NATS Sunucusunun websocket portuna bağlanmak için `ws` veya `wss` şemasını kullanın, örneğin
9// `ws://localhost:8080`. Websocket şemalarının diğerleriyle (nats/tls) karıştırılamayacağını unutmayın.
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options, özelleştirilmiş bir bağlantı oluşturmak için kullanılabilir.
4type Options struct {
5 // Url, istemcinin bağlanacağı tek bir NATS sunucusu URL'sini temsil eder.
6 // Servers seçeneği de ayarlanmışsa, o zaman Servers dizisindeki ilk sunucu olur.
7 Url string
8
9 // InProcessServer, aynı süreç içinde çalışan bir NATS sunucusunu temsil eder.
10 // Bu ayarlanmışsa, harici TCP bağlantıları kullanmak yerine sunucuya doğrudan
11 // bağlanmaya çalışacağız.
12 InProcessServer InProcessConnProvider
13
14 //...
15}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- nats server ve nats client arasındaki bağlantıyı sağlayan
Connect
fonksiyonu, client URL ve connect Option'ı ayarlayabilir ve bu Option'ları bir araya getiren Options struct'ı içindeInProcessConnProvider
interface tipindeInProcessServer
adında bir alan bulunur.
1// örnek kodun main.go'su
2
3// Seçeneklerle yeni sunucuyu başlat
4ns, err := server.NewServer(opts)
5
6//...
7
8// Süreç içi bağlantı aracılığıyla sunucuya bağlan
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- nats client'ta Connect işlemi gerçekleştirilirken,
InProcessServer
alanınanats.InProcessServer(ns)
değeri aktarıldığında
1// nats-go/nats.go
2
3// InProcessServer, TCP aracılığıyla bağlantı kurmak yerine, süreç içinde çalışan bir NATS sunucusuna
4// doğrudan bağlantı kurmayı deneyecek bir Option'dır.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- option'ın InProcessServer'ı embedded nats server ile değiştirilir ve
1// nats-go/nats.go
2
3// createConn, sunucuya bağlanacak ve uygun bufio yapılarını saracaktır.
4// Mevcut bir bağlantı olduğunda doğru şeyi yapacaktır.
5func (nc *Conn) createConn() (err error) {
6 if nc.Opts.Timeout < 0 {
7 return ErrBadTimeout
8 }
9 if _, cur := nc.currentServer(); cur == nil {
10 return ErrNoServers
11 }
12
13 // Eğer süreç içi bir sunucuya referansımız varsa, o zaman onu kullanarak bir bağlantı kur.
14 if nc.Opts.InProcessServer != nil {
15 conn, err := nc.Opts.InProcessServer.InProcessConn()
16 if err != nil {
17 return fmt.Errorf("süreç içi bağlantı alınamadı: %w", err)
18 }
19 nc.conn = conn
20 nc.bindToNewConn()
21 return nil
22 }
23
24 //...
25}
- Bu interface, bağlantı oluşturan
createConn
fonksiyonundaInProcessServer
seçeneği nil değilse (geçerliyse), seçenekteki InProcessServer'ınInProcessConn
'unu çalıştırarak
1// nats-server/server/server.go
2
3// InProcessConn, sunucuya süreç içi bir bağlantı döndürür,
4// aynı süreç içindeki yerel bağlantı için bir TCP dinleyicisi kullanma ihtiyacını ortadan kaldırır.
5// Bu, DontListen seçeneğinin durumundan bağımsız olarak kullanılabilir.
6func (s *Server) InProcessConn() (net.Conn, error) {
7 pl, pr := net.Pipe()
8 if !s.startGoRoutine(func() {
9 s.createClientInProcess(pl)
10 s.grWG.Done()
11 }) {
12 pl.Close()
13 pr.Close()
14 return nil, fmt.Errorf("bağlantı oluşturulamadı")
15 }
16 return pr, nil
17}
- sunucuda uygulanan
InProcessConn
'u çağırarak çalıştırır. - Bu fonksiyon, NATS'ın Go client'ı olan
nats.go
'da nc (nats connection)'ninInProcessServer
'ı nil değilse çağrılır ve bir bağlantı (net.Conn
) oluşturur, ardından bunu sunucunun bağlantısına bağlar.
Go'nun Consumer Driven Interface'i
Bir tür, metodlarını uygulayarak bir interface'i implemente eder. Açık bir niyet beyanı, yani "implements" anahtar kelimesi yoktur. Implicit interface'ler, bir interface'in tanımını, herhangi bir ön düzenleme olmaksızın herhangi bir pakette görünebilecek olan implementasyonundan ayırır.
Interface'ler implicit olarak implemente edilir, A Tour of Go
Eğer bir tür yalnızca bir interface'i implemente etmek için var ise ve o interface dışındaki hiçbir exported metoda sahip olmayacaksa, türün kendisini export etmeye gerek yoktur.
- Bu interface tasarımı, Go'da sıkça bahsedilen consumer defined interface ve structural typing (duck typing) kavramlarını iyi yansıttığı için bu konuyu da tanıtmak istedim.
1// nats-go/nats.go
2
3// Options, özelleştirilmiş bir bağlantı oluşturmak için kullanılabilir.
4type Options struct {
5 // Url, istemcinin bağlanacağı tek bir NATS sunucusu URL'sini temsil eder.
6 // Servers seçeneği de ayarlanmışsa, o zaman Servers dizisindeki ilk sunucu olur.
7 Url string
8
9 // InProcessServer, aynı süreç içinde çalışan bir NATS sunucusunu temsil eder.
10 // Bu ayarlanmışsa, harici TCP bağlantıları kullanmak yerine sunucuya doğrudan
11 // bağlanmaya çalışacağız.
12 InProcessServer InProcessConnProvider
13
14 //...
15}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Tekrar koda dönelim. nats.go client'ında
InProcessServer
option struct alanı, yalnızcaInProcessConn
'u gerçekleştirenInProcessConnProvider
interface'i olarak tanımlanmıştır.
1// nats-server/server/server.go
2
3// InProcessConn, sunucuya süreç içi bir bağlantı döndürür,
4// aynı süreç içindeki yerel bağlantı için bir TCP dinleyicisi kullanma ihtiyacını ortadan kaldırır.
5// Bu, DontListen seçeneğinin durumundan bağımsız olarak kullanılabilir.
6func (s *Server) InProcessConn() (net.Conn, error) {
7 pl, pr := net.Pipe()
8 if !s.startGoRoutine(func() {
9 s.createClientInProcess(pl)
10 s.grWG.Done()
11 }) {
12 pl.Close()
13 pr.Close()
14 return nil, fmt.Errorf("bağlantı oluşturulamadı")
15 }
16 return pr, nil
17}
- Ancak, ona giren tip nats-server'ın
Server
'ıdır veInProcessConn
'un yanı sıra çeşitli işlevleri de yerine getirmektedir. - Çünkü bu durumda client'ın ilgi alanı,
InProcessConn
adında bir interface'in sağlanıp sağlanmadığıdır; diğer şeyler pek önemli değildir. - Bu nedenle, nats.go client'ı, yalnızca
InProcessConn() (net.Conn, error)
işlevini tanımlayanInProcessConnProvider
adında bir consumer defined interface oluşturarak kullanmaktadır.
Sonuç
- NATS'ın embedded mode'u ve çalışma şekli ile NATS kodunda görülebilecek Go'nun consumer defined interface'i kısaca ele alınmıştır.
- Bu bilgilerin, NATS'ı yukarıdaki amaçlarla kullanan kişilere yardımcı olmasını umarak bu yazıyı bitirmek istiyorum.