GoSuda

Bagaimana NATS yang di-embed berkomunikasi dengan aplikasi Go?

By prravda
views ...

Memulai

Tentang NATS

Aplikasi dan layanan perangkat lunak perlu bertukar data. NATS adalah infrastruktur yang memungkinkan pertukaran data semacam itu, yang tersegmentasi dalam bentuk pesan. Kami menyebutnya sebagai "middleware berorientasi pesan".

Dengan NATS, pengembang aplikasi dapat:

  • Dengan mudah membangun aplikasi client-server yang terdistribusi dan skalabel.
  • Menyimpan dan mendistribusikan data secara realtime secara umum. Hal ini dapat dicapai secara fleksibel di berbagai lingkungan, bahasa, penyedia cloud, dan sistem on-premises.

Apa itu NATS, NATS docs

  • NATS adalah message broker yang dibangun dengan Go.

NATS Tersemat (Embedded NATS)

Apabila aplikasi Anda dalam Go, dan jika sesuai dengan kasus penggunaan serta skenario deployment Anda, Anda bahkan dapat menyematkan server NATS di dalam aplikasi Anda.

Menyematkan NATS, NATS docs

  • Selain itu, terdapat fitur unik NATS, yaitu dukungan mode tersemat untuk aplikasi yang dibangun dengan Go.
  • Artinya, alih-alih menggunakan pendekatan message broker konvensional yang melibatkan pengoperasian server broker terpisah dan komunikasi melalui klien aplikasi ke server tersebut, broker itu sendiri dapat disematkan (embed) langsung ke dalam aplikasi yang dibuat dengan Go.

Manfaat dan kasus penggunaan NATS tersemat

  • Terdapat video Youtube yang menjelaskan dengan baik, sehingga tautan video tersebut dapat digunakan sebagai referensi.
  • Tanpa perlu men-deploy server message broker terpisah, kita dapat membangun modular monolith application untuk mencapai separation of concern sekaligus memanfaatkan keuntungan menyematkan NATS secara embedded. Lebih lanjut, single binary deployment juga menjadi mungkin.
  • Hal ini dapat digunakan secara bermanfaat tidak hanya pada platform tanpa jaringan (WASM), tetapi juga pada aplikasi offline-first.

Contoh pada dokumen resmi

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Inisialisasi server baru dengan opsi
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Mulai server melalui goroutine
22    go ns.Start()
23
24    // Tunggu hingga server siap untuk koneksi
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Sambungkan ke server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Berlangganan ke subject
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Cetak data pesan
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Matikan server (opsional)
45        ns.Shutdown()
46    })
47
48    // Publikasikan data ke subject
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Tunggu hingga server mati
52    ns.WaitForShutdown()
53}
  • Contoh Embedded NATS yang disediakan dalam dokumentasi resmi NATS ini, apabila dijalankan sesuai kode contoh tersebut, tidak akan menghasilkan komunikasi dalam mode embedding.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Apabila Anda menjalankan go run . untuk mengeksekusi berkas Go tersebut sambil memantau lalu lintas jaringan yang melalui localhost (127.0.0.1) dengan perintah watch 'netstat -an | grep 127.0.0.1', Anda akan melihat penambahan permintaan jaringan baru yang berasal dari port default NATS, yaitu 4222.

Konfigurasi yang tepat untuk mode penyematan

  • Untuk berkomunikasi dalam mode tersemat sesuai yang diinginkan, diperlukan dua opsi berikut:

    • Klien: Opsi InProcessServer harus ditambahkan.
    • Server: Flag DontListen dalam Server.Options harus ditetapkan ke true.
  • Bagian-bagian ini tidak didokumentasikan secara resmi, dan permulaan fitur ini dapat ditemukan melalui PR ini.

    PR ini menambahkan tiga hal:

    1. Fungsi InProcessConn() ke Server yang membangun net.Pipe untuk mendapatkan koneksi ke server NATS tanpa menggunakan soket TCP
    2. Opsi DontListen yang memberitahu server NATS untuk tidak mendengarkan pada listener TCP biasa
    3. Saluran startupComplete, yang ditutup tepat sebelum kita memulai AcceptLoop, dan readyForConnections akan menunggunya

    Motivasi utama untuk ini adalah bahwa kami memiliki aplikasi yang dapat berjalan dalam mode monolit (proses tunggal) atau mode polilit (multi-proses). Kami ingin dapat menggunakan NATS untuk kedua mode demi kesederhanaan, tetapi mode monolit harus dapat mengakomodasi berbagai platform di mana pembukaan koneksi soket tidak masuk akal (seluler) atau tidak mungkin (WASM). Perubahan ini akan memungkinkan kami untuk menggunakan NATS sepenuhnya dalam proses.

    PR yang menyertainya nats-io/nats.go#774 menambahkan dukungan di sisi klien.

    Ini adalah PR pertama saya untuk proyek ini, jadi mohon maaf sebelumnya jika saya melewatkan sesuatu yang jelas di mana pun.

    /cc @nats-io/core

Contoh Kerja untuk mode tersemat

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// untuk mengkonfigurasi server NATS yang disematkan
14		// atur DonListen sebagai true
15		DontListen: true,
16	}
17
18	// Inisialisasi server baru dengan opsi
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Mulai server melalui goroutine
26	go ns.Start()
27
28	// Tunggu hingga server siap untuk koneksi
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Sambungkan ke server melalui koneksi in-process
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Berlangganan ke subject
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Cetak data pesan
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Matikan server (opsional)
49		ns.Shutdown()
50	})
51
52	// Publikasikan data ke subject
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Tunggu hingga server mati
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Kini dapat diamati bahwa tidak ada lagi hop jaringan tambahan yang terjadi sesuai dengan yang diharapkan.

Di balik layar

TL;DR

diagram1

  • Diagram urutan ini menggambarkan fungsi-fungsi internal yang beroperasi saat kode dieksekusi dari main.go, dengan poin-poin utama sebagai berikut:
    • Melalui DontListen: true, server melewati fase client listening yang disebut AcceptLoop.
    • Apabila opsi InProcessServer dari klien diaktifkan, koneksi in-memory akan dibuat, sebuah pipe akan dibangun melalui net.Pipe, dan ujung pipe akan dikembalikan kepada klien sebagai tipe net.Conn.
    • Klien dan server kemudian melakukan komunikasi in-process melalui koneksi tersebut.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Tunggu klien.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Pertama, jika DontListen bernilai true, fase client listening yang disebut AcceptLoop akan dilewati.
 1// nats-server/server/server.go
 2
 3// AcceptLoop diekspor untuk pengujian yang lebih mudah.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Jika kita keluar sebelum listener diatur dengan benar,
 6	// pastikan kita menutup saluran.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot opsi server.
18	opts := s.getOpts()
19
20	// Siapkan status yang dapat mengaktifkan shutdown
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Mendengarkan koneksi klien pada %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Peringatan TLS diaktifkan.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS diperlukan untuk koneksi klien")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Klien yang tidak menggunakan opsi \"TLS Handshake First\" akan gagal terhubung")
38		}
39	}
40
41	// Jika server dimulai dengan RANDOM_PORT (-1), opts.Port akan sama dengan
42	// 0 pada awal fungsi ini. Jadi kita perlu mendapatkan port yang sebenarnya
43	if opts.Port == 0 {
44		// Tulis port yang telah diselesaikan kembali ke opsi.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Sekarang setelah port diatur (jika diatur ke RANDOM), atur
49	// info Host/Port server dengan nilai dari Opsi atau
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Catat URL koneksi klien. Kita mungkin membutuhkannya nanti.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Sinyalkan bahwa kami tidak menerima klien baru
65				s.ldmCh <- true
66				// Sekarang tunggu Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Beri tahu pemanggil bahwa kami siap
75	close(clr)
76	clr = nil
77}
  • Sebagai informasi, fungsi AcceptLoop menjalankan proses-proses berikut. Bagian-bagian yang berkaitan dengan komunikasi jaringan, seperti TLS atau hostPort, tidak diperlukan dalam komunikasi in-process sehingga dapat diabaikan.

Klien

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect akan mencoba terhubung ke sistem NATS.
 5// URL dapat berisi semantik username/password. misal nats://derek:pass@localhost:4222
 6// Array yang dipisahkan koma juga didukung, misal urlA, urlB.
 7// Opsi dimulai dengan nilai default tetapi dapat ditimpa.
 8// Untuk terhubung ke port websocket Server NATS, gunakan skema `ws` atau `wss`, seperti
 9// `ws://localhost:8080`. Perhatikan bahwa skema websocket tidak dapat dicampur dengan yang lain (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Opsi dapat digunakan untuk membuat koneksi yang disesuaikan.
 4type Options struct {
 5	// Url merepresentasikan URL server NATS tunggal tempat klien
 6	// akan terhubung. Jika opsi Servers juga diatur, maka
 7	// itu menjadi server pertama dalam array Servers.
 8	Url string
 9
10	// InProcessServer merepresentasikan server NATS yang berjalan di dalam
11	// proses yang sama. Jika ini diatur maka kita akan mencoba untuk terhubung
12	// ke server secara langsung daripada menggunakan koneksi TCP eksternal.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Fungsi Connect yang digunakan untuk membangun koneksi antara NATS server dan NATS client dapat mengkonfigurasi URL klien dan opsi koneksi. Dalam struktur Options yang berisi kumpulan opsi tersebut, terdapat field InProcessServer dengan tipe antarmuka InProcessConnProvider.
1// main.go dari kode contoh
2
3// Inisialisasi server baru dengan opsi
4ns, err := server.NewServer(opts)
5
6//...
7
8// Sambungkan ke server melalui koneksi in-process
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Apabila klien NATS melakukan Connect, dan nats.InProcessServer(ns) dilewatkan sebagai field InProcessServer, maka
 1// nats-go/nats.go
 2
 3// createConn akan terhubung ke server dan membungkus struktur
 4// bufio yang sesuai. Ini akan melakukan hal yang benar ketika
 5// koneksi yang ada sudah ada.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Jika kita memiliki referensi ke server in-process, maka bangun
15	// koneksi menggunakan itu.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • Antarmuka tersebut kemudian akan memanggil InProcesConn dari InProcessServer yang ada di opsi, jika opsi InProcessServer dalam fungsi createConn yang membuat koneksi tidak nil (valid).
 1// nats-server/server/server.go
 2
 3// InProcessConn mengembalikan koneksi in-process ke server,
 4// menghindari kebutuhan untuk menggunakan listener TCP untuk konektivitas lokal
 5// dalam proses yang sama. Ini dapat digunakan terlepas dari
 6// status opsi DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Kemudian, fungsi tersebut memanggil InProcessConn yang diimplementasikan pada server.
  • Fungsi ini akan dipanggil apabila InProcessServer dari nc (koneksi NATS) dalam klien Go NATS, nats.go, tidak bernilai nil. Fungsi ini akan membuat koneksi (net.Conn) dan mengikatnya ke koneksi server.

Antarmuka yang didorong oleh konsumen dari Go

Sebuah tipe mengimplementasikan sebuah antarmuka dengan mengimplementasikan metode-metodenya. Tidak ada deklarasi niat eksplisit, tidak ada kata kunci "implements". Antarmuka implisit memisahkan definisi sebuah antarmuka dari implementasinya, yang kemudian dapat muncul dalam paket apa pun tanpa pengaturan sebelumnya.

Antarmuka diimplementasikan secara implisit, A Tour of Go

Jika sebuah tipe hanya ada untuk mengimplementasikan sebuah antarmuka dan tidak akan pernah memiliki metode yang diekspor di luar antarmuka tersebut, maka tidak perlu mengekspor tipe itu sendiri.

Generalitas, Effective Go

  • Desain antarmuka ini mencakup dengan baik konsep consumer defined interface dan structural typing (duck typing) yang sering dibahas dalam Go, sehingga saya ingin memperkenalkan topik ini juga.
 1// nats-go/nats.go
 2
 3// Opsi dapat digunakan untuk membuat koneksi yang disesuaikan.
 4type Options struct {
 5	// Url merepresentasikan URL server NATS tunggal tempat klien
 6	// akan terhubung. Jika opsi Servers juga diatur, maka
 7	// itu menjadi server pertama dalam array Servers.
 8	Url string
 9
10	// InProcessServer merepresentasikan server NATS yang berjalan di dalam
11	// proses yang sama. Jika ini diatur maka kita akan mencoba untuk terhubung
12	// ke server secara langsung daripada menggunakan koneksi TCP eksternal.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Mari kita kembali ke kode. Field struktur opsi InProcessServer dalam klien nats.go didefinisikan sebagai antarmuka InProcessConnProvider yang hanya menjalankan InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn mengembalikan koneksi in-process ke server,
 4// menghindari kebutuhan untuk menggunakan listener TCP untuk konektivitas lokal
 5// dalam proses yang sama. Ini dapat digunakan terlepas dari
 6// status opsi DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Namun, tipe yang dimasukkan ke dalamnya adalah Server dari nats-server, yang menjalankan berbagai fungsi di samping InProcessConn.
  • Hal ini karena perhatian klien dalam situasi tersebut hanyalah apakah antarmuka InProcessConn telah disediakan atau tidak, dan hal-hal lain tidak terlalu penting.
  • Oleh karena itu, klien nats.go hanya membuat dan menggunakan antarmuka yang didefinisikan oleh konsumen, yaitu InProcessConnProvider, yang hanya mendefinisikan fungsi InProcessConn() (net.Conn, error).

Kesimpulan

  • Saya telah secara singkat membahas mode tersemat NATS, cara kerjanya, serta antarmuka yang didefinisikan oleh konsumen (consumer defined interface) dalam Go yang dapat dilihat melalui kode NATS.
  • Saya berharap informasi ini dapat bermanfaat bagi mereka yang menggunakan NATS untuk tujuan serupa, dan dengan ini saya mengakhiri tulisan ini.