GoSuda

Bagaimana NATS yang di-embed berkomunikasi dengan aplikasi Go?

By prravda
views ...

Memulai

Tentang NATS

Aplikasi dan layanan perangkat lunak perlu bertukar data. NATS adalah infrastruktur yang memungkinkan pertukaran data semacam itu, disegmentasi dalam bentuk pesan. Kami menyebutnya "middleware berorientasi pesan".

Dengan NATS, pengembang aplikasi dapat:

  • Dengan mudah membangun aplikasi client-server yang terdistribusi dan terukur.
  • Menyimpan dan mendistribusikan data secara realtime secara umum. Ini dapat dicapai secara fleksibel di berbagai lingkungan, bahasa, penyedia cloud, dan sistem on-premises.

Apa itu NATS, dokumen NATS

  • NATS adalah broker pesan yang terdiri dari Go.

NATS Tertanam

Jika aplikasi Anda dalam Go, dan jika itu sesuai dengan kasus penggunaan dan skenario deployment Anda, Anda bahkan dapat menanamkan server NATS di dalam aplikasi Anda.

Menanamkan NATS, dokumen NATS

  • Dan ada kekhasan NATS, yaitu mendukung mode tertanam untuk aplikasi yang terdiri dari Go.
  • Artinya, alih-alih metode umum broker pesan yang mengharuskan server broker terpisah untuk dijalankan dan kemudian berkomunikasi melalui klien aplikasi dengan server tersebut, broker itu sendiri dapat ditanamkan (embed) ke dalam aplikasi yang dibuat dengan Go.

Manfaat dan kasus penggunaan NATS tertanam

  • Ada video Youtube yang menjelaskan dengan baik, jadi saya akan menggunakan tautan video tersebut.
  • Bahkan tanpa deployment server broker pesan terpisah, kita dapat memperoleh keuntungan menanamkan NATS sebagai embedded, dengan membuat modular monolith application dan mencapai separation of concern. Selain itu, single binary deployment juga menjadi mungkin.
  • Ini dapat digunakan secara berguna tidak hanya untuk platform tanpa jaringan (WASM), tetapi juga untuk aplikasi offline-first.

Contoh pada dokumen resmi

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Inisialisasi server baru dengan opsi
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Mulai server melalui goroutine
22    go ns.Start()
23
24    // Tunggu server siap untuk koneksi
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("tidak siap untuk koneksi")
27    }
28
29    // Sambungkan ke server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Berlangganan ke subjek
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Cetak data pesan
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Matikan server (opsional)
45        ns.Shutdown()
46    })
47
48    // Publikasikan data ke subjek
49    nc.Publish(subject, []byte("Halo NATS tertanam!"))
50
51    // Tunggu server mati
52    ns.WaitForShutdown()
53}
  • Ini adalah contoh NATS Tertanam yang diposting di dokumentasi resmi NATS, tetapi jika Anda mengikuti kode contoh ini, komunikasi tidak akan terjadi dalam mode penanaman.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Jika Anda menjalankan file go tersebut dengan go run . sambil memeriksa jaringan yang mengalir ke localhost(127.0.0.1) melalui perintah watch 'netstat -an | grep 127.0.0.1', Anda akan melihat permintaan jaringan baru yang berasal dari port default NATS, yaitu 4222, ditambahkan.

Konfigurasi yang Tepat untuk Mode Penanaman

  • Untuk berkomunikasi dalam mode tertanam seperti yang diinginkan, diperlukan dua opsi berikut:

    • Klien: Opsi InProcessServer harus disertakan.
    • Server: Bendera DontListen harus secara eksplisit dinyatakan sebagai true dalam Server.Options.
  • Bagian-bagian ini tidak didokumentasikan secara resmi, dan awal dari fitur ini dapat dipahami melalui PR ini.

    PR ini menambahkan tiga hal:

    1. Fungsi InProcessConn() ke Server yang membangun net.Pipe untuk mendapatkan koneksi ke server NATS tanpa menggunakan soket TCP
    2. Opsi DontListen yang memberitahu server NATS untuk tidak mendengarkan pada pendengar TCP biasa
    3. Saluran startupComplete, yang ditutup tepat sebelum kita memulai AcceptLoop, dan readyForConnections akan menunggunya

    Motivasi utama untuk ini adalah bahwa kita memiliki aplikasi yang dapat berjalan dalam mode monolit (proses tunggal) atau mode polilit (multi-proses). Kami ingin dapat menggunakan NATS untuk kedua mode demi kesederhanaan, tetapi mode monolit harus dapat melayani berbagai platform di mana membuka koneksi soket tidak masuk akal (seluler) atau tidak mungkin (WASM). Perubahan ini akan memungkinkan kita untuk menggunakan NATS sepenuhnya dalam proses.

    PR yang menyertainya nats-io/nats.go#774 menambahkan dukungan untuk sisi klien.

    Ini adalah PR pertama saya untuk proyek ini, jadi mohon maaf sebelumnya jika saya melewatkan sesuatu yang jelas di mana pun.

    /cc @nats-io/core

Contoh Kerja untuk Mode Tertanam

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// untuk mengkonfigurasi server NATS yang tertanam
14		// atur DonListen menjadi true
15		DontListen: true,
16	}
17
18	// Inisialisasi server baru dengan opsi
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Mulai server melalui goroutine
26	go ns.Start()
27
28	// Tunggu server siap untuk koneksi
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("tidak siap untuk koneksi")
31	}
32
33	// Sambungkan ke server melalui koneksi in-process
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Berlangganan ke subjek
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Cetak data pesan
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Matikan server (opsional)
49		ns.Shutdown()
50	})
51
52	// Publikasikan data ke subjek
53	nc.Publish(subject, []byte("Halo NATS tertanam!"))
54
55	// Tunggu server mati
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs
4
  • Sekarang kita dapat melihat bahwa tidak ada hop jaringan tambahan yang terjadi seperti yang diinginkan.

Di Balik Layar

TL;DR

diagram1

  • Diagram urutan ini menunjukkan bagaimana fungsi-fungsi internal bekerja ketika kode ini dijalankan di main.go, dan intinya adalah sebagai berikut:
    • Dengan DontListen: true, server melewatkan fase mendengarkan klien yang disebut AcceptLoop.
    • Jika InProcessServer di antara opsi koneksi klien diaktifkan, koneksi in-memory dibuat, dan pipa dibuat melalui net.Pipe, kemudian ujung pipa dikembalikan ke klien sebagai tipe net.Conn.
    • Klien dan server berkomunikasi secara in-process melalui koneksi tersebut.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Tunggu klien.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Pertama, jika DontListen bernilai true, fase mendengarkan klien yang disebut AcceptLoop dilewati.
 1// nats-server/server/server.go
 2
 3// AcceptLoop diekspor untuk pengujian yang lebih mudah.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Jika kita keluar sebelum listener diatur dengan benar,
 6	// pastikan kita menutup channel.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot opsi server.
18	opts := s.getOpts()
19
20	// Siapkan status yang dapat mengaktifkan pematian
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Kesalahan mendengarkan pada port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Mendengarkan koneksi klien pada %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Peringatan TLS diaktifkan.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS diperlukan untuk koneksi klien")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Klien yang tidak menggunakan opsi \"TLS Handshake First\" akan gagal terhubung")
38		}
39	}
40
41	// Jika server dimulai dengan RANDOM_PORT (-1), opts.Port akan sama
42	// dengan 0 pada awal fungsi ini. Jadi kita perlu mendapatkan port yang sebenarnya
43	if opts.Port == 0 {
44		// Tulis port yang diselesaikan kembali ke opsi.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Sekarang setelah port diatur (jika diatur ke RANDOM), atur
49	// Host/Port info server dengan nilai dari Opsi atau
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Kesalahan mengatur INFO server dengan nilai ClientAdvertise %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Catat URL koneksi klien. Kita mungkin membutuhkannya nanti.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Memberi sinyal bahwa kita tidak menerima klien baru
65				s.ldmCh <- true
66				// Sekarang tunggu Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Beri tahu pemanggil bahwa kita sudah siap
75	close(clr)
76	clr = nil
77}
  • Sebagai referensi, fungsi AcceptLoop menjalankan proses-proses berikut. Ini adalah bagian-bagian yang terkait dengan komunikasi jaringan seperti TLS atau hostPort, dan dapat diabaikan karena tidak diperlukan jika komunikasi dilakukan secara in-process.

Klien

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect akan mencoba menyambung ke sistem NATS.
 5// URL dapat berisi semantik nama pengguna/kata sandi. misal nats://derek:pass@localhost:4222
 6// Array yang dipisahkan koma juga didukung, misal urlA, urlB.
 7// Opsi dimulai dengan nilai default tetapi dapat ditimpa.
 8// Untuk menyambung ke port websocket NATS Server, gunakan skema `ws` atau `wss`, seperti
 9// `ws://localhost:8080`. Perhatikan bahwa skema websocket tidak dapat dicampur dengan yang lain (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Opsi dapat digunakan untuk membuat koneksi yang disesuaikan.
 4type Options struct {
 5	// Url mewakili satu url server NATS tempat klien
 6	// akan terhubung. Jika opsi Servers juga diatur, itu
 7	// kemudian menjadi server pertama dalam array Servers.
 8	Url string
 9
10	// InProcessServer mewakili server NATS yang berjalan di dalam
11	// proses yang sama. Jika ini diatur maka kita akan mencoba untuk terhubung
12	// ke server secara langsung daripada menggunakan koneksi TCP eksternal.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Fungsi Connect yang melakukan koneksi antara server NATS dan klien NATS memungkinkan pengaturan URL klien dan opsi koneksi, dan struktur Opsi yang mengumpulkan opsi-opsi ini memiliki bidang InProcessServer dengan tipe antarmuka InProcessConnProvider.
1// main.go dari kode contoh
2
3// Inisialisasi server baru dengan opsi
4ns, err := server.NewServer(opts)
5
6//...
7
8// Sambungkan ke server melalui koneksi in-process
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Ketika klien NATS melakukan Connect, jika nats.InProcessServer(ns) diteruskan ke bidang InProcessServer, maka
 1// nats-go/nats.go
 2
 3// InProcessServer adalah Opsi yang akan mencoba membangun arah ke server NATS
 4// yang berjalan di dalam proses alih-alih melakukan dial melalui TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • InProcessServer dari opsi diganti dengan server NATS tertanam dan
 1// nats-go/nats.go
 2
 3// createConn akan terhubung ke server dan membungkus
 4// struktur bufio yang sesuai. Ini akan melakukan hal yang benar ketika
 5// koneksi yang ada sudah ada.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Jika kita memiliki referensi ke server in-process, maka buat
15	// koneksi menggunakan itu.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("gagal mendapatkan koneksi in-process: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • antarmuka ini menjalankan InProcesConn dari InProcessServer dalam opsi jika opsi InProcessServer tidak nil (valid) dalam fungsi createConn yang membuat koneksi, dan
 1// nats-server/server/server.go
 2
 3// InProcessConn mengembalikan koneksi in-process ke server,
 4// menghindari kebutuhan untuk menggunakan listener TCP untuk konektivitas lokal
 5// dalam proses yang sama. Ini dapat digunakan terlepas dari
 6// status opsi DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("gagal membuat koneksi")
16	}
17	return pr, nil
18}
  • ia memanggil dan menjalankan InProcessConn yang diimplementasikan di server.
  • Fungsi ini, ketika dipanggil oleh klien Go NATS (nats.go) jika InProcessServer dari nc (koneksi NATS) tidak nil, akan membuat koneksi (net.Conn) dan mengikatnya ke koneksi server.

Antarmuka yang Digerakkan Konsumen dari Go

Sebuah tipe mengimplementasikan antarmuka dengan mengimplementasikan metodenya. Tidak ada deklarasi eksplisit niat, tidak ada kata kunci "implements". Antarmuka implisit memisahkan definisi antarmuka dari implementasinya, yang kemudian dapat muncul dalam paket apa pun tanpa pengaturan awal.

Antarmuka diimplementasikan secara implisit, A Tour of Go

Jika suatu tipe hanya ada untuk mengimplementasikan antarmuka dan tidak akan pernah memiliki metode yang diekspor di luar antarmuka tersebut, maka tidak perlu mengekspor tipe itu sendiri.

Generalitas, Effective Go

  • Desain antarmuka ini dengan baik mewujudkan apa yang sering disebut di Go sebagai antarmuka yang ditentukan oleh konsumen dan structural typing (duck typing), jadi saya ingin memperkenalkan topik ini juga.
 1// nats-go/nats.go
 2
 3// Opsi dapat digunakan untuk membuat koneksi yang disesuaikan.
 4type Options struct {
 5	// Url mewakili satu url server NATS tempat klien
 6	// akan terhubung. Jika opsi Servers juga diatur, itu
 7	// kemudian menjadi server pertama dalam array Servers.
 8	Url string
 9
10	// InProcessServer mewakili server NATS yang berjalan di dalam
11	// proses yang sama. Jika ini diatur maka kita akan mencoba untuk terhubung
12	// ke server secara langsung daripada menggunakan koneksi TCP eksternal.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Mari kita kembali ke kode. Bidang struktur opsi InProcessServer di klien nats.go didefinisikan sebagai antarmuka InProcessConnProvider yang hanya menjalankan InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn mengembalikan koneksi in-process ke server,
 4// menghindari kebutuhan untuk menggunakan listener TCP untuk konektivitas lokal
 5// dalam proses yang sama. Ini dapat digunakan terlepas dari
 6// status opsi DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("gagal membuat koneksi")
16	}
17	return pr, nil
18}
  • Namun, tipe yang dimasukkan adalah Server dari nats-server, yang melakukan berbagai fungsi selain InProcessConn.
  • Ini karena dalam situasi ini, perhatian klien hanyalah apakah antarmuka InProcessConn telah disediakan atau tidak, dan hal-hal lain tidak terlalu penting.
  • Oleh karena itu, klien nats.go hanya membuat dan menggunakan antarmuka yang ditentukan oleh konsumen (InProcessConnProvider) yang hanya mendefinisikan fungsi InProcessConn() (net.Conn, error).

Kesimpulan

  • Saya telah membahas secara singkat mode tertanam NATS dan cara kerjanya, serta antarmuka Go yang ditentukan oleh konsumen yang dapat dikonfirmasi melalui kode NATS.
  • Saya berharap informasi ini bermanfaat bagi mereka yang menggunakan NATS untuk tujuan tersebut, dan dengan demikian saya mengakhiri artikel ini.