Bagaimana NATS yang di-embed berkomunikasi dengan aplikasi Go?
Memulai
Tentang NATS
Aplikasi dan layanan perangkat lunak perlu bertukar data. NATS adalah infrastruktur yang memungkinkan pertukaran data semacam itu, disegmentasi dalam bentuk pesan. Kami menyebutnya "middleware berorientasi pesan".
Dengan NATS, pengembang aplikasi dapat:
- Dengan mudah membangun aplikasi client-server yang terdistribusi dan terukur.
- Menyimpan dan mendistribusikan data secara realtime secara umum. Ini dapat dicapai secara fleksibel di berbagai lingkungan, bahasa, penyedia cloud, dan sistem on-premises.
- NATS adalah broker pesan yang terdiri dari Go.
NATS Tertanam
Jika aplikasi Anda dalam Go, dan jika itu sesuai dengan kasus penggunaan dan skenario deployment Anda, Anda bahkan dapat menanamkan server NATS di dalam aplikasi Anda.
- Dan ada kekhasan NATS, yaitu mendukung mode tertanam untuk aplikasi yang terdiri dari Go.
- Artinya, alih-alih metode umum broker pesan yang mengharuskan server broker terpisah untuk dijalankan dan kemudian berkomunikasi melalui klien aplikasi dengan server tersebut, broker itu sendiri dapat ditanamkan (embed) ke dalam aplikasi yang dibuat dengan Go.
Manfaat dan kasus penggunaan NATS tertanam
- Ada video Youtube yang menjelaskan dengan baik, jadi saya akan menggunakan tautan video tersebut.
- Bahkan tanpa deployment server broker pesan terpisah, kita dapat memperoleh keuntungan menanamkan NATS sebagai embedded, dengan membuat modular monolith application dan mencapai separation of concern. Selain itu, single binary deployment juga menjadi mungkin.
- Ini dapat digunakan secara berguna tidak hanya untuk platform tanpa jaringan (WASM), tetapi juga untuk aplikasi offline-first.
Contoh pada dokumen resmi
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Inisialisasi server baru dengan opsi
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Mulai server melalui goroutine
22 go ns.Start()
23
24 // Tunggu server siap untuk koneksi
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("tidak siap untuk koneksi")
27 }
28
29 // Sambungkan ke server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Berlangganan ke subjek
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Cetak data pesan
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Matikan server (opsional)
45 ns.Shutdown()
46 })
47
48 // Publikasikan data ke subjek
49 nc.Publish(subject, []byte("Halo NATS tertanam!"))
50
51 // Tunggu server mati
52 ns.WaitForShutdown()
53}
- Ini adalah contoh NATS Tertanam yang diposting di dokumentasi resmi NATS, tetapi jika Anda mengikuti kode contoh ini, komunikasi tidak akan terjadi dalam mode penanaman.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Jika Anda menjalankan file go tersebut dengan
go run .
sambil memeriksa jaringan yang mengalir ke localhost(127.0.0.1) melalui perintahwatch 'netstat -an | grep 127.0.0.1'
, Anda akan melihat permintaan jaringan baru yang berasal dari port default NATS, yaitu4222
, ditambahkan.
Konfigurasi yang Tepat untuk Mode Penanaman
Untuk berkomunikasi dalam mode tertanam seperti yang diinginkan, diperlukan dua opsi berikut:
- Klien: Opsi
InProcessServer
harus disertakan. - Server: Bendera
DontListen
harus secara eksplisit dinyatakan sebagaitrue
dalamServer.Options
.
- Klien: Opsi
Bagian-bagian ini tidak didokumentasikan secara resmi, dan awal dari fitur ini dapat dipahami melalui PR ini.
PR ini menambahkan tiga hal:
- Fungsi
InProcessConn()
keServer
yang membangunnet.Pipe
untuk mendapatkan koneksi ke server NATS tanpa menggunakan soket TCP - Opsi
DontListen
yang memberitahu server NATS untuk tidak mendengarkan pada pendengar TCP biasa - Saluran
startupComplete
, yang ditutup tepat sebelum kita memulaiAcceptLoop
, danreadyForConnections
akan menunggunya
Motivasi utama untuk ini adalah bahwa kita memiliki aplikasi yang dapat berjalan dalam mode monolit (proses tunggal) atau mode polilit (multi-proses). Kami ingin dapat menggunakan NATS untuk kedua mode demi kesederhanaan, tetapi mode monolit harus dapat melayani berbagai platform di mana membuka koneksi soket tidak masuk akal (seluler) atau tidak mungkin (WASM). Perubahan ini akan memungkinkan kita untuk menggunakan NATS sepenuhnya dalam proses.
PR yang menyertainya nats-io/nats.go#774 menambahkan dukungan untuk sisi klien.
Ini adalah PR pertama saya untuk proyek ini, jadi mohon maaf sebelumnya jika saya melewatkan sesuatu yang jelas di mana pun.
/cc @nats-io/core
- Fungsi
Contoh Kerja untuk Mode Tertanam
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // untuk mengkonfigurasi server NATS yang tertanam
14 // atur DonListen menjadi true
15 DontListen: true,
16 }
17
18 // Inisialisasi server baru dengan opsi
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Mulai server melalui goroutine
26 go ns.Start()
27
28 // Tunggu server siap untuk koneksi
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("tidak siap untuk koneksi")
31 }
32
33 // Sambungkan ke server melalui koneksi in-process
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Berlangganan ke subjek
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Cetak data pesan
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Matikan server (opsional)
49 ns.Shutdown()
50 })
51
52 // Publikasikan data ke subjek
53 nc.Publish(subject, []byte("Halo NATS tertanam!"))
54
55 // Tunggu server mati
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Sekarang kita dapat melihat bahwa tidak ada hop jaringan tambahan yang terjadi seperti yang diinginkan.
Di Balik Layar
TL;DR
- Diagram urutan ini menunjukkan bagaimana fungsi-fungsi internal bekerja ketika kode ini dijalankan di
main.go
, dan intinya adalah sebagai berikut:- Dengan
DontListen: true
, server melewatkan fase mendengarkan klien yang disebutAcceptLoop
. - Jika
InProcessServer
di antara opsi koneksi klien diaktifkan, koneksi in-memory dibuat, dan pipa dibuat melaluinet.Pipe
, kemudian ujung pipa dikembalikan ke klien sebagai tipenet.Conn
. - Klien dan server berkomunikasi secara in-process melalui koneksi tersebut.
- Dengan
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Tunggu klien.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Pertama, jika
DontListen
bernilai true, fase mendengarkan klien yang disebutAcceptLoop
dilewati.
1// nats-server/server/server.go
2
3// AcceptLoop diekspor untuk pengujian yang lebih mudah.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Jika kita keluar sebelum listener diatur dengan benar,
6 // pastikan kita menutup channel.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Snapshot opsi server.
18 opts := s.getOpts()
19
20 // Siapkan status yang dapat mengaktifkan pematian
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Kesalahan mendengarkan pada port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Mendengarkan koneksi klien pada %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Peringatan TLS diaktifkan.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS diperlukan untuk koneksi klien")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Klien yang tidak menggunakan opsi \"TLS Handshake First\" akan gagal terhubung")
38 }
39 }
40
41 // Jika server dimulai dengan RANDOM_PORT (-1), opts.Port akan sama
42 // dengan 0 pada awal fungsi ini. Jadi kita perlu mendapatkan port yang sebenarnya
43 if opts.Port == 0 {
44 // Tulis port yang diselesaikan kembali ke opsi.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Sekarang setelah port diatur (jika diatur ke RANDOM), atur
49 // Host/Port info server dengan nilai dari Opsi atau
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Kesalahan mengatur INFO server dengan nilai ClientAdvertise %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Catat URL koneksi klien. Kita mungkin membutuhkannya nanti.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Memberi sinyal bahwa kita tidak menerima klien baru
65 s.ldmCh <- true
66 // Sekarang tunggu Shutdown...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Beri tahu pemanggil bahwa kita sudah siap
75 close(clr)
76 clr = nil
77}
- Sebagai referensi, fungsi AcceptLoop menjalankan proses-proses berikut. Ini adalah bagian-bagian yang terkait dengan komunikasi jaringan seperti
TLS
atauhostPort
, dan dapat diabaikan karena tidak diperlukan jika komunikasi dilakukan secara in-process.
Klien
InProcessServer
1
2// nats-go/nats.go
3
4// Connect akan mencoba menyambung ke sistem NATS.
5// URL dapat berisi semantik nama pengguna/kata sandi. misal nats://derek:pass@localhost:4222
6// Array yang dipisahkan koma juga didukung, misal urlA, urlB.
7// Opsi dimulai dengan nilai default tetapi dapat ditimpa.
8// Untuk menyambung ke port websocket NATS Server, gunakan skema `ws` atau `wss`, seperti
9// `ws://localhost:8080`. Perhatikan bahwa skema websocket tidak dapat dicampur dengan yang lain (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Opsi dapat digunakan untuk membuat koneksi yang disesuaikan.
4type Options struct {
5 // Url mewakili satu url server NATS tempat klien
6 // akan terhubung. Jika opsi Servers juga diatur, itu
7 // kemudian menjadi server pertama dalam array Servers.
8 Url string
9
10 // InProcessServer mewakili server NATS yang berjalan di dalam
11 // proses yang sama. Jika ini diatur maka kita akan mencoba untuk terhubung
12 // ke server secara langsung daripada menggunakan koneksi TCP eksternal.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Fungsi
Connect
yang melakukan koneksi antara server NATS dan klien NATS memungkinkan pengaturan URL klien dan opsi koneksi, dan struktur Opsi yang mengumpulkan opsi-opsi ini memiliki bidangInProcessServer
dengan tipe antarmukaInProcessConnProvider
.
1// main.go dari kode contoh
2
3// Inisialisasi server baru dengan opsi
4ns, err := server.NewServer(opts)
5
6//...
7
8// Sambungkan ke server melalui koneksi in-process
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Ketika klien NATS melakukan Connect, jika
nats.InProcessServer(ns)
diteruskan ke bidangInProcessServer
, maka
1// nats-go/nats.go
2
3// InProcessServer adalah Opsi yang akan mencoba membangun arah ke server NATS
4// yang berjalan di dalam proses alih-alih melakukan dial melalui TCP.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
InProcessServer
dari opsi diganti dengan server NATS tertanam dan
1// nats-go/nats.go
2
3// createConn akan terhubung ke server dan membungkus
4// struktur bufio yang sesuai. Ini akan melakukan hal yang benar ketika
5// koneksi yang ada sudah ada.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Jika kita memiliki referensi ke server in-process, maka buat
15 // koneksi menggunakan itu.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("gagal mendapatkan koneksi in-process: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- antarmuka ini menjalankan
InProcesConn
dari InProcessServer dalam opsi jika opsiInProcessServer
tidak nil (valid) dalam fungsicreateConn
yang membuat koneksi, dan
1// nats-server/server/server.go
2
3// InProcessConn mengembalikan koneksi in-process ke server,
4// menghindari kebutuhan untuk menggunakan listener TCP untuk konektivitas lokal
5// dalam proses yang sama. Ini dapat digunakan terlepas dari
6// status opsi DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("gagal membuat koneksi")
16 }
17 return pr, nil
18}
- ia memanggil dan menjalankan
InProcessConn
yang diimplementasikan di server. - Fungsi ini, ketika dipanggil oleh klien Go NATS (
nats.go
) jikaInProcessServer
dari nc (koneksi NATS) tidak nil, akan membuat koneksi (net.Conn
) dan mengikatnya ke koneksi server.
Antarmuka yang Digerakkan Konsumen dari Go
Sebuah tipe mengimplementasikan antarmuka dengan mengimplementasikan metodenya. Tidak ada deklarasi eksplisit niat, tidak ada kata kunci "implements". Antarmuka implisit memisahkan definisi antarmuka dari implementasinya, yang kemudian dapat muncul dalam paket apa pun tanpa pengaturan awal.
Antarmuka diimplementasikan secara implisit, A Tour of Go
Jika suatu tipe hanya ada untuk mengimplementasikan antarmuka dan tidak akan pernah memiliki metode yang diekspor di luar antarmuka tersebut, maka tidak perlu mengekspor tipe itu sendiri.
- Desain antarmuka ini dengan baik mewujudkan apa yang sering disebut di Go sebagai antarmuka yang ditentukan oleh konsumen dan structural typing (duck typing), jadi saya ingin memperkenalkan topik ini juga.
1// nats-go/nats.go
2
3// Opsi dapat digunakan untuk membuat koneksi yang disesuaikan.
4type Options struct {
5 // Url mewakili satu url server NATS tempat klien
6 // akan terhubung. Jika opsi Servers juga diatur, itu
7 // kemudian menjadi server pertama dalam array Servers.
8 Url string
9
10 // InProcessServer mewakili server NATS yang berjalan di dalam
11 // proses yang sama. Jika ini diatur maka kita akan mencoba untuk terhubung
12 // ke server secara langsung daripada menggunakan koneksi TCP eksternal.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Mari kita kembali ke kode. Bidang struktur opsi
InProcessServer
di kliennats.go
didefinisikan sebagai antarmukaInProcessConnProvider
yang hanya menjalankanInProcessConn
.
1// nats-server/server/server.go
2
3// InProcessConn mengembalikan koneksi in-process ke server,
4// menghindari kebutuhan untuk menggunakan listener TCP untuk konektivitas lokal
5// dalam proses yang sama. Ini dapat digunakan terlepas dari
6// status opsi DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("gagal membuat koneksi")
16 }
17 return pr, nil
18}
- Namun, tipe yang dimasukkan adalah
Server
dari nats-server, yang melakukan berbagai fungsi selainInProcessConn
. - Ini karena dalam situasi ini, perhatian klien hanyalah apakah antarmuka
InProcessConn
telah disediakan atau tidak, dan hal-hal lain tidak terlalu penting. - Oleh karena itu, klien nats.go hanya membuat dan menggunakan antarmuka yang ditentukan oleh konsumen (
InProcessConnProvider
) yang hanya mendefinisikan fungsiInProcessConn() (net.Conn, error)
.
Kesimpulan
- Saya telah membahas secara singkat mode tertanam NATS dan cara kerjanya, serta antarmuka Go yang ditentukan oleh konsumen yang dapat dikonfirmasi melalui kode NATS.
- Saya berharap informasi ini bermanfaat bagi mereka yang menggunakan NATS untuk tujuan tersebut, dan dengan demikian saya mengakhiri artikel ini.