Bagaimana NATS yang di-embed berkomunikasi dengan aplikasi Go?
Memulai
Tentang NATS
Aplikasi dan layanan perangkat lunak perlu bertukar data. NATS adalah infrastruktur yang memungkinkan pertukaran data semacam itu, yang tersegmentasi dalam bentuk pesan. Kami menyebutnya sebagai "middleware berorientasi pesan".
Dengan NATS, pengembang aplikasi dapat:
- Dengan mudah membangun aplikasi client-server yang terdistribusi dan skalabel.
- Menyimpan dan mendistribusikan data secara realtime secara umum. Hal ini dapat dicapai secara fleksibel di berbagai lingkungan, bahasa, penyedia cloud, dan sistem on-premises.
- NATS adalah message broker yang dibangun dengan Go.
NATS Tersemat (Embedded NATS)
Apabila aplikasi Anda dalam Go, dan jika sesuai dengan kasus penggunaan serta skenario deployment Anda, Anda bahkan dapat menyematkan server NATS di dalam aplikasi Anda.
- Selain itu, terdapat fitur unik NATS, yaitu dukungan mode tersemat untuk aplikasi yang dibangun dengan Go.
- Artinya, alih-alih menggunakan pendekatan message broker konvensional yang melibatkan pengoperasian server broker terpisah dan komunikasi melalui klien aplikasi ke server tersebut, broker itu sendiri dapat disematkan (embed) langsung ke dalam aplikasi yang dibuat dengan Go.
Manfaat dan kasus penggunaan NATS tersemat
- Terdapat video Youtube yang menjelaskan dengan baik, sehingga tautan video tersebut dapat digunakan sebagai referensi.
- Tanpa perlu men-deploy server message broker terpisah, kita dapat membangun modular monolith application untuk mencapai separation of concern sekaligus memanfaatkan keuntungan menyematkan NATS secara embedded. Lebih lanjut, single binary deployment juga menjadi mungkin.
- Hal ini dapat digunakan secara bermanfaat tidak hanya pada platform tanpa jaringan (WASM), tetapi juga pada aplikasi offline-first.
Contoh pada dokumen resmi
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Inisialisasi server baru dengan opsi
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Mulai server melalui goroutine
22 go ns.Start()
23
24 // Tunggu hingga server siap untuk koneksi
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Sambungkan ke server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Berlangganan ke subject
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Cetak data pesan
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Matikan server (opsional)
45 ns.Shutdown()
46 })
47
48 // Publikasikan data ke subject
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Tunggu hingga server mati
52 ns.WaitForShutdown()
53}
- Contoh Embedded NATS yang disediakan dalam dokumentasi resmi NATS ini, apabila dijalankan sesuai kode contoh tersebut, tidak akan menghasilkan komunikasi dalam mode embedding.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Apabila Anda menjalankan
go run .untuk mengeksekusi berkas Go tersebut sambil memantau lalu lintas jaringan yang melalui localhost (127.0.0.1) dengan perintahwatch 'netstat -an | grep 127.0.0.1', Anda akan melihat penambahan permintaan jaringan baru yang berasal dari port default NATS, yaitu4222.
Konfigurasi yang tepat untuk mode penyematan
Untuk berkomunikasi dalam mode tersemat sesuai yang diinginkan, diperlukan dua opsi berikut:
- Klien: Opsi
InProcessServerharus ditambahkan. - Server: Flag
DontListendalamServer.Optionsharus ditetapkan ketrue.
- Klien: Opsi
Bagian-bagian ini tidak didokumentasikan secara resmi, dan permulaan fitur ini dapat ditemukan melalui PR ini.
PR ini menambahkan tiga hal:
- Fungsi
InProcessConn()keServeryang membangunnet.Pipeuntuk mendapatkan koneksi ke server NATS tanpa menggunakan soket TCP - Opsi
DontListenyang memberitahu server NATS untuk tidak mendengarkan pada listener TCP biasa - Saluran
startupComplete, yang ditutup tepat sebelum kita memulaiAcceptLoop, danreadyForConnectionsakan menunggunya
Motivasi utama untuk ini adalah bahwa kami memiliki aplikasi yang dapat berjalan dalam mode monolit (proses tunggal) atau mode polilit (multi-proses). Kami ingin dapat menggunakan NATS untuk kedua mode demi kesederhanaan, tetapi mode monolit harus dapat mengakomodasi berbagai platform di mana pembukaan koneksi soket tidak masuk akal (seluler) atau tidak mungkin (WASM). Perubahan ini akan memungkinkan kami untuk menggunakan NATS sepenuhnya dalam proses.
PR yang menyertainya nats-io/nats.go#774 menambahkan dukungan di sisi klien.
Ini adalah PR pertama saya untuk proyek ini, jadi mohon maaf sebelumnya jika saya melewatkan sesuatu yang jelas di mana pun.
/cc @nats-io/core
- Fungsi
Contoh Kerja untuk mode tersemat
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // untuk mengkonfigurasi server NATS yang disematkan
14 // atur DonListen sebagai true
15 DontListen: true,
16 }
17
18 // Inisialisasi server baru dengan opsi
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Mulai server melalui goroutine
26 go ns.Start()
27
28 // Tunggu hingga server siap untuk koneksi
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Sambungkan ke server melalui koneksi in-process
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Berlangganan ke subject
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Cetak data pesan
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Matikan server (opsional)
49 ns.Shutdown()
50 })
51
52 // Publikasikan data ke subject
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Tunggu hingga server mati
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Kini dapat diamati bahwa tidak ada lagi hop jaringan tambahan yang terjadi sesuai dengan yang diharapkan.
Di balik layar
TL;DR
- Diagram urutan ini menggambarkan fungsi-fungsi internal yang beroperasi saat kode dieksekusi dari
main.go, dengan poin-poin utama sebagai berikut:- Melalui
DontListen: true, server melewati fase client listening yang disebutAcceptLoop. - Apabila opsi
InProcessServerdari klien diaktifkan, koneksi in-memory akan dibuat, sebuah pipe akan dibangun melaluinet.Pipe, dan ujung pipe akan dikembalikan kepada klien sebagai tipenet.Conn. - Klien dan server kemudian melakukan komunikasi in-process melalui koneksi tersebut.
- Melalui
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Tunggu klien.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Pertama, jika
DontListenbernilai true, fase client listening yang disebutAcceptLoopakan dilewati.
1// nats-server/server/server.go
2
3// AcceptLoop diekspor untuk pengujian yang lebih mudah.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Jika kita keluar sebelum listener diatur dengan benar,
6 // pastikan kita menutup saluran.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Snapshot opsi server.
18 opts := s.getOpts()
19
20 // Siapkan status yang dapat mengaktifkan shutdown
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Error listening on port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Mendengarkan koneksi klien pada %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Peringatan TLS diaktifkan.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS diperlukan untuk koneksi klien")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Klien yang tidak menggunakan opsi \"TLS Handshake First\" akan gagal terhubung")
38 }
39 }
40
41 // Jika server dimulai dengan RANDOM_PORT (-1), opts.Port akan sama dengan
42 // 0 pada awal fungsi ini. Jadi kita perlu mendapatkan port yang sebenarnya
43 if opts.Port == 0 {
44 // Tulis port yang telah diselesaikan kembali ke opsi.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Sekarang setelah port diatur (jika diatur ke RANDOM), atur
49 // info Host/Port server dengan nilai dari Opsi atau
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Catat URL koneksi klien. Kita mungkin membutuhkannya nanti.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Sinyalkan bahwa kami tidak menerima klien baru
65 s.ldmCh <- true
66 // Sekarang tunggu Shutdown...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Beri tahu pemanggil bahwa kami siap
75 close(clr)
76 clr = nil
77}
- Sebagai informasi, fungsi AcceptLoop menjalankan proses-proses berikut. Bagian-bagian yang berkaitan dengan komunikasi jaringan, seperti
TLSatauhostPort, tidak diperlukan dalam komunikasi in-process sehingga dapat diabaikan.
Klien
InProcessServer
1
2// nats-go/nats.go
3
4// Connect akan mencoba terhubung ke sistem NATS.
5// URL dapat berisi semantik username/password. misal nats://derek:pass@localhost:4222
6// Array yang dipisahkan koma juga didukung, misal urlA, urlB.
7// Opsi dimulai dengan nilai default tetapi dapat ditimpa.
8// Untuk terhubung ke port websocket Server NATS, gunakan skema `ws` atau `wss`, seperti
9// `ws://localhost:8080`. Perhatikan bahwa skema websocket tidak dapat dicampur dengan yang lain (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Opsi dapat digunakan untuk membuat koneksi yang disesuaikan.
4type Options struct {
5 // Url merepresentasikan URL server NATS tunggal tempat klien
6 // akan terhubung. Jika opsi Servers juga diatur, maka
7 // itu menjadi server pertama dalam array Servers.
8 Url string
9
10 // InProcessServer merepresentasikan server NATS yang berjalan di dalam
11 // proses yang sama. Jika ini diatur maka kita akan mencoba untuk terhubung
12 // ke server secara langsung daripada menggunakan koneksi TCP eksternal.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Fungsi
Connectyang digunakan untuk membangun koneksi antara NATS server dan NATS client dapat mengkonfigurasi URL klien dan opsi koneksi. Dalam struktur Options yang berisi kumpulan opsi tersebut, terdapat fieldInProcessServerdengan tipe antarmukaInProcessConnProvider.
1// main.go dari kode contoh
2
3// Inisialisasi server baru dengan opsi
4ns, err := server.NewServer(opts)
5
6//...
7
8// Sambungkan ke server melalui koneksi in-process
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Apabila klien NATS melakukan Connect, dan
nats.InProcessServer(ns)dilewatkan sebagai fieldInProcessServer, maka
1// nats-go/nats.go
2
3// createConn akan terhubung ke server dan membungkus struktur
4// bufio yang sesuai. Ini akan melakukan hal yang benar ketika
5// koneksi yang ada sudah ada.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Jika kita memiliki referensi ke server in-process, maka bangun
15 // koneksi menggunakan itu.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("failed to get in-process connection: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- Antarmuka tersebut kemudian akan memanggil
InProcesConndariInProcessServeryang ada di opsi, jika opsiInProcessServerdalam fungsicreateConnyang membuat koneksi tidak nil (valid).
1// nats-server/server/server.go
2
3// InProcessConn mengembalikan koneksi in-process ke server,
4// menghindari kebutuhan untuk menggunakan listener TCP untuk konektivitas lokal
5// dalam proses yang sama. Ini dapat digunakan terlepas dari
6// status opsi DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- Kemudian, fungsi tersebut memanggil
InProcessConnyang diimplementasikan pada server. - Fungsi ini akan dipanggil apabila
InProcessServerdarinc(koneksi NATS) dalam klien Go NATS,nats.go, tidak bernilai nil. Fungsi ini akan membuat koneksi (net.Conn) dan mengikatnya ke koneksi server.
Antarmuka yang didorong oleh konsumen dari Go
Sebuah tipe mengimplementasikan sebuah antarmuka dengan mengimplementasikan metode-metodenya. Tidak ada deklarasi niat eksplisit, tidak ada kata kunci "implements". Antarmuka implisit memisahkan definisi sebuah antarmuka dari implementasinya, yang kemudian dapat muncul dalam paket apa pun tanpa pengaturan sebelumnya.
Antarmuka diimplementasikan secara implisit, A Tour of Go
Jika sebuah tipe hanya ada untuk mengimplementasikan sebuah antarmuka dan tidak akan pernah memiliki metode yang diekspor di luar antarmuka tersebut, maka tidak perlu mengekspor tipe itu sendiri.
- Desain antarmuka ini mencakup dengan baik konsep consumer defined interface dan structural typing (duck typing) yang sering dibahas dalam Go, sehingga saya ingin memperkenalkan topik ini juga.
1// nats-go/nats.go
2
3// Opsi dapat digunakan untuk membuat koneksi yang disesuaikan.
4type Options struct {
5 // Url merepresentasikan URL server NATS tunggal tempat klien
6 // akan terhubung. Jika opsi Servers juga diatur, maka
7 // itu menjadi server pertama dalam array Servers.
8 Url string
9
10 // InProcessServer merepresentasikan server NATS yang berjalan di dalam
11 // proses yang sama. Jika ini diatur maka kita akan mencoba untuk terhubung
12 // ke server secara langsung daripada menggunakan koneksi TCP eksternal.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Mari kita kembali ke kode. Field struktur opsi
InProcessServerdalam klien nats.go didefinisikan sebagai antarmukaInProcessConnProvideryang hanya menjalankanInProcessConn.
1// nats-server/server/server.go
2
3// InProcessConn mengembalikan koneksi in-process ke server,
4// menghindari kebutuhan untuk menggunakan listener TCP untuk konektivitas lokal
5// dalam proses yang sama. Ini dapat digunakan terlepas dari
6// status opsi DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- Namun, tipe yang dimasukkan ke dalamnya adalah
Serverdari nats-server, yang menjalankan berbagai fungsi di sampingInProcessConn. - Hal ini karena perhatian klien dalam situasi tersebut hanyalah apakah antarmuka
InProcessConntelah disediakan atau tidak, dan hal-hal lain tidak terlalu penting. - Oleh karena itu, klien nats.go hanya membuat dan menggunakan antarmuka yang didefinisikan oleh konsumen, yaitu
InProcessConnProvider, yang hanya mendefinisikan fungsiInProcessConn() (net.Conn, error).
Kesimpulan
- Saya telah secara singkat membahas mode tersemat NATS, cara kerjanya, serta antarmuka yang didefinisikan oleh konsumen (consumer defined interface) dalam Go yang dapat dilihat melalui kode NATS.
- Saya berharap informasi ini dapat bermanfaat bagi mereka yang menggunakan NATS untuk tujuan serupa, dan dengan ini saya mengakhiri tulisan ini.