Cum comunică NATS-ul embedded cu aplicația go?
Noțiuni introductive
Despre NATS
Aplicațiile și serviciile software trebuie să schimbe date. NATS este o infrastructură care permite un astfel de schimb de date, segmentat sub formă de mesaje. Numim aceasta un „middleware orientat pe mesaje”.
Cu NATS, dezvoltatorii de aplicații pot:
- Construi fără efort aplicații client-server distribuite și scalabile.
- Stoca și distribui date în timp real într-o manieră generală. Acest lucru poate fi realizat flexibil în diverse medii, limbaje, furnizori de cloud și sisteme on-premises.
- NATS este un broker de mesaje configurat în Go.
NATS încorporat
Dacă aplicația dumneavoastră este în Go și dacă se potrivește cazului dumneavoastră de utilizare și scenariilor de implementare, puteți chiar încorpora un server NATS în interiorul aplicației dumneavoastră.
Încorporarea NATS, documentația NATS
- O caracteristică distinctivă a NATS este suportul pentru modul embedded în cazul aplicațiilor dezvoltate în Go.
- Aceasta înseamnă că, în loc de abordarea obișnuită a brokerului de mesaje, care implică pornirea unui server de broker separat și comunicarea cu acesta prin intermediul unui client al aplicației, brokerul însuși poate fi încorporat (embed) direct în aplicația creată în Go.
Beneficiile și cazurile de utilizare ale NATS încorporat
- Există un videoclip pe Youtube bine explicat, așa că voi folosi linkul către videoclip.
- Chiar și fără a implementa un server de broker de mesaje separat, se poate crea o aplicație modular monolith, realizând separarea preocupărilor, și se poate beneficia de avantajul de a încorpora NATS. În plus, devine posibilă și o implementare single binary.
- Poate fi utilizat util nu numai pe platforme fără rețea (WASM), ci și în aplicații offline-first.
Exemplu în documentația oficială
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Inițializați un nou server cu opțiuni
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Porniți serverul printr-o goroutine
22 go ns.Start()
23
24 // Așteptați ca serverul să fie gata pentru conexiuni
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("nu este gata pentru conexiune")
27 }
28
29 // Conectați-vă la server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Abonați-vă la subiect
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Afișați datele mesajului
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Opriți serverul (opțional)
45 ns.Shutdown()
46 })
47
48 // Publicați date către subiect
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Așteptați oprirea serverului
52 ns.WaitForShutdown()
53}
- Acesta este un exemplu de NATS încorporat furnizat în documentația oficială NATS, dar executarea codului conform acestui exemplu nu va realiza comunicarea în modul embedding.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Când rulați fișierul Go cu
go run .și verificați traficul de rețea de la localhost (127.0.0.1) folosind comandawatch 'netstat -an | grep 127.0.0.1', puteți observa că se adaugă noi cereri de rețea care provin de la portul implicit al NATS,4222.
Configurații corecte pentru modul embedding
Pentru a realiza comunicarea în modul embedded conform intenției, sunt necesare următoarele două opțiuni:
- Client: Trebuie inclusă opțiunea
InProcessServer. - Server: În
Server.Options, flag-ulDontListentrebuie specificat catrue.
- Client: Trebuie inclusă opțiunea
Aceste aspecte nu au fost documentate oficial, iar inițierea acestei funcționalități poate fi înțeleasă prin intermediul PR-ului respectiv.
Acest PR adaugă trei lucruri:
- Funcția
InProcessConn()laServercare construiește unnet.Pipepentru a obține o conexiune la serverul NATS fără a utiliza socket-uri TCP - Opțiunea
DontListencare indică serverului NATS să nu asculte pe ascultătorul TCP obișnuit - Canalul
startupComplete, care este închis chiar înainte de a începeAcceptLoop, iarreadyForConnectionsva aștepta acest lucru
Principala motivație pentru aceasta este că avem o aplicație care poate rula fie într-un mod monolit (single-process), fie într-un mod polilit (multi-process). Am dori să putem utiliza NATS pentru ambele moduri, pentru simplitate, dar modul monolit trebuie să poată satisface o varietate de platforme unde deschiderea conexiunilor socket fie nu are sens (mobil), fie pur și simplu nu este posibilă (WASM). Aceste modificări ne vor permite să utilizăm NATS în întregime in-process.
Un PR însoțitor nats-io/nats.go#774 adaugă suport pentru partea de client.
Acesta este primul meu PR la acest proiect, așa că îmi cer scuze în avans dacă am omis ceva evident undeva.
/cc @nats-io/core
- Funcția
Exemplu funcțional pentru modul embedded
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // pentru configurarea serverului NATS încorporat
14 // setați DontListen ca true
15 DontListen: true,
16 }
17
18 // Inițializați un nou server cu opțiuni
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Porniți serverul printr-o goroutine
26 go ns.Start()
27
28 // Așteptați ca serverul să fie gata pentru conexiuni
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("nu este gata pentru conexiune")
31 }
32
33 // Conectați-vă la server printr-o conexiune in-process
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Abonați-vă la subiect
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Afișați datele mesajului
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Opriți serverul (opțional)
49 ns.Shutdown()
50 })
51
52 // Publicați date către subiect
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Așteptați oprirea serverului
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Acum se poate observa că nu mai apar hop-uri de rețea suplimentare, conform intenției.
Sub capotă
TL;DR
- Aceasta este o diagramă secvențială care ilustrează modul în care funcțiile interne operează la executarea codului în
main.go, iar esența poate fi explicată după cum urmează.- Prin
DontListen: true, serverul omite faza de ascultare a clientului denumităAcceptLoop. - Dacă opțiunea de conectare a clientului
InProcessServereste activată, se creează o conexiune in-memory, se stabilește o conductă prinnet.Pipe, iar capătul conductei este returnat clientului ca tipnet.Conn. - Clientul și serverul realizează comunicarea in-process prin intermediul acestei conexiuni.
- Prin
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Așteaptă clienții.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- În primul rând, dacă
DontListeneste true, faza de ascultare a clientului denumităAcceptLoopeste omisă.
1// nats-server/server/server.go
2
3// AcceptLoop este exportată pentru o testare mai ușoară.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Dacă am ieși înainte ca ascultătorul să fie configurat corect,
6 // asigurați-vă că închideți canalul.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Faceți un snapshot al opțiunilor serverului.
18 opts := s.getOpts()
19
20 // Configurați starea care poate permite oprirea
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Eroare la ascultarea pe port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Ascultă conexiuni client pe %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Alertează dacă TLS este activat.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS necesar pentru conexiunile client")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clienții care nu utilizează opțiunea \"TLS Handshake First\" nu se vor putea conecta")
38 }
39 }
40
41 // Dacă serverul a fost pornit cu RANDOM_PORT (-1), opts.Port ar fi egal
42 // cu 0 la începutul acestei funcții. Deci trebuie să obținem portul real
43 if opts.Port == 0 {
44 // Scrie portul rezolvat înapoi în opțiuni.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Acum că portul a fost setat (dacă a fost setat la RANDOM), setați
49 // Host/Port-ul informațiilor serverului cu valorile din Options sau
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Eroare la setarea INFO serverului cu valoarea ClientAdvertise de %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Păstrați evidența URL-urilor de conectare ale clientului. S-ar putea să avem nevoie de ele mai târziu.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Semnalați că nu acceptăm clienți noi
65 s.ldmCh <- true
66 // Acum așteptați oprirea...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Anunțați apelantul că suntem gata
75 close(clr)
76 clr = nil
77}
- De altfel, funcția AcceptLoop efectuează următoarele procese. Acestea sunt aspecte legate de comunicarea în rețea, cum ar fi
TLSsauhostPort, și pot fi omise deoarece nu sunt necesare în cazul comunicării in-process.
Client
InProcessServer
1
2// nats-go/nats.go
3
4// Connect va încerca să se conecteze la sistemul NATS.
5// URL-ul poate conține semantică de nume de utilizator/parolă. de exemplu nats://derek:pass@localhost:4222
6// Sunt suportate și array-uri separate prin virgulă, de exemplu urlA, urlB.
7// Opțiunile încep cu valorile implicite, dar pot fi suprascrise.
8// Pentru a vă conecta la un port websocket al unui server NATS, utilizați schema `ws` sau `wss`, cum ar fi
9// `ws://localhost:8080`. Rețineți că schemele websocket nu pot fi amestecate cu altele (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options pot fi utilizate pentru a crea o conexiune personalizată.
4type Options struct {
5 // Url reprezintă un singur URL de server NATS la care clientul
6 // se va conecta. Dacă este setată și opțiunea Servers, aceasta
7 // devine primul server din array-ul Servers.
8 Url string
9
10 // InProcessServer reprezintă un server NATS care rulează în același
11 // proces. Dacă aceasta este setată, vom încerca să ne conectăm
12 // direct la server, în loc să utilizăm conexiuni TCP externe.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Funcția
Connect, care realizează conexiunea dintre serverul NATS și clientul NATS, permite configurarea URL-ului clientului și a opțiunilor de conectare. Structura Options, care conține aceste opțiuni, include un câmpInProcessServerde tip interfațăInProcessConnProvider.
1// main.go din codul exemplu
2
3// Inițializați un nou server cu opțiuni
4ns, err := server.NewServer(opts)
5
6//...
7
8// Conectați-vă la server printr-o conexiune in-process
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Când clientul NATS inițiază o conexiune și transmite
nats.InProcessServer(ns)către câmpulInProcessServeral opțiunii,
1// nats-go/nats.go
2
3// InProcessServer este o opțiune care va încerca să stabilească o direcție către un server NATS
4// care rulează în cadrul procesului, în loc să stabilească o conexiune prin TCP.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
InProcessServeral opțiunii este înlocuit cu serverul NATS încorporat și
1// nats-go/nats.go
2
3// createConn se va conecta la server și va încapsula structurile
4// bufio corespunzătoare. Va acționa corect atunci când există o
5// conexiune existentă.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Dacă avem o referință la un server in-process, atunci stabilim o
15 // conexiune utilizând acel server.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("nu s-a putut obține conexiunea in-process: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- Această interfață este executată în funcția
createConncare creează conexiunea, atunci când opțiuneaInProcessServernu este nil (validă), rulândInProcessConndinInProcessServeral opțiunii.
1// nats-server/server/server.go
2
3// InProcessConn returnează o conexiune in-process la server,
4// evitând necesitatea utilizării unui ascultător TCP pentru conectivitate locală
5// în cadrul aceluiași proces. Aceasta poate fi utilizată indiferent de
6// starea opțiunii DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("nu s-a putut crea conexiunea")
16 }
17 return pr, nil
18}
- Este apelată și executată implementarea
InProcessConndin server. - Această funcție, atunci când
InProcessServeral nc (conexiunea NATS) nu este nil înnats.go, clientul Go al NATS, este apelată pentru a crea o conexiune (net.Conn) și a o lega la conexiunea serverului.
Interfață orientată pe consumator în Go
Un tip implementează o interfață prin implementarea metodelor sale. Nu există o declarație explicită de intenție, niciun cuvânt cheie „implements”. Interfețele implicite decuplează definiția unei interfețe de implementarea sa, care ar putea apărea apoi în orice pachet fără o aranjare prealabilă.
Interfețele sunt implementate implicit, Un tur al Go
Dacă un tip există doar pentru a implementa o interfață și nu va avea niciodată metode exportate în afara acelei interfețe, nu este necesar să exportați tipul însuși.
- Acest design de interfață încorporează bine ceea ce este adesea denumit în Go interfața definită de consumator și structural typing (duck typing), așa că aș dori să introduc și acest subiect.
1// nats-go/nats.go
2
3// Options pot fi utilizate pentru a crea o conexiune personalizată.
4type Options struct {
5 // Url reprezintă un singur URL de server NATS la care clientul
6 // se va conecta. Dacă este setată și opțiunea Servers, aceasta
7 // devine primul server din array-ul Servers.
8 Url string
9
10 // InProcessServer reprezintă un server NATS care rulează în același
11 // proces. Dacă aceasta este setată, vom încerca să ne conectăm
12 // direct la server, în loc să utilizăm conexiuni TCP externe.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Să revenim la cod. Câmpul structurii opțiunii
InProcessServerdin clientul nats.go a fost definit ca o interfațăInProcessConnProvidercare execută doarInProcessConn.
1// nats-server/server/server.go
2
3// InProcessConn returnează o conexiune in-process la server,
4// evitând necesitatea utilizării unui ascultător TCP pentru conectivitate locală
5// în cadrul aceluiași proces. Aceasta poate fi utilizată indiferent de
6// starea opțiunii DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("nu s-a putut crea conexiunea")
16 }
17 return pr, nil
18}
- Cu toate acestea, tipul introdus în aceasta este
Serverdin nats-server, care efectuează diverse funcții pe lângă InProcessConn. - Aceasta se datorează faptului că, în această situație, preocuparea clientului este doar dacă a fost furnizată interfața
InProcessConn, iar alte aspecte nu sunt de o importanță majoră. - Prin urmare, clientul nats.go creează și utilizează doar o interfață definită de consumator numită
InProcessConnProvider, care definește doar funcționalitateaInProcessConn() (net.Conn, error).
Concluzie
- Am abordat pe scurt modul embedded al NATS și funcționarea sa, precum și interfața definită de consumator în Go, care poate fi observată prin codul NATS.
- Sper ca aceste informații să fie utile celor care utilizează NATS în scopurile menționate mai sus, și cu aceasta, închei acest articol.