GoSuda

Cum comunică NATS-ul embedded cu aplicația go?

By prravda
views ...

Noțiuni introductive

Despre NATS

Aplicațiile și serviciile software trebuie să schimbe date. NATS este o infrastructură care permite un astfel de schimb de date, segmentat sub formă de mesaje. Numim aceasta un „middleware orientat pe mesaje”.

Cu NATS, dezvoltatorii de aplicații pot:

  • Construi fără efort aplicații client-server distribuite și scalabile.
  • Stoca și distribui date în timp real într-o manieră generală. Acest lucru poate fi realizat flexibil în diverse medii, limbaje, furnizori de cloud și sisteme on-premises.

Ce este NATS, documentația NATS

  • NATS este un broker de mesaje configurat în Go.

NATS încorporat

Dacă aplicația dumneavoastră este în Go și dacă se potrivește cazului dumneavoastră de utilizare și scenariilor de implementare, puteți chiar încorpora un server NATS în interiorul aplicației dumneavoastră.

Încorporarea NATS, documentația NATS

  • O caracteristică distinctivă a NATS este suportul pentru modul embedded în cazul aplicațiilor dezvoltate în Go.
  • Aceasta înseamnă că, în loc de abordarea obișnuită a brokerului de mesaje, care implică pornirea unui server de broker separat și comunicarea cu acesta prin intermediul unui client al aplicației, brokerul însuși poate fi încorporat (embed) direct în aplicația creată în Go.

Beneficiile și cazurile de utilizare ale NATS încorporat

  • Există un videoclip pe Youtube bine explicat, așa că voi folosi linkul către videoclip.
  • Chiar și fără a implementa un server de broker de mesaje separat, se poate crea o aplicație modular monolith, realizând separarea preocupărilor, și se poate beneficia de avantajul de a încorpora NATS. În plus, devine posibilă și o implementare single binary.
  • Poate fi utilizat util nu numai pe platforme fără rețea (WASM), ci și în aplicații offline-first.

Exemplu în documentația oficială

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Inițializați un nou server cu opțiuni
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Porniți serverul printr-o goroutine
22    go ns.Start()
23
24    // Așteptați ca serverul să fie gata pentru conexiuni
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("nu este gata pentru conexiune")
27    }
28
29    // Conectați-vă la server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Abonați-vă la subiect
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Afișați datele mesajului
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Opriți serverul (opțional)
45        ns.Shutdown()
46    })
47
48    // Publicați date către subiect
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Așteptați oprirea serverului
52    ns.WaitForShutdown()
53}
  • Acesta este un exemplu de NATS încorporat furnizat în documentația oficială NATS, dar executarea codului conform acestui exemplu nu va realiza comunicarea în modul embedding.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Când rulați fișierul Go cu go run . și verificați traficul de rețea de la localhost (127.0.0.1) folosind comanda watch 'netstat -an | grep 127.0.0.1', puteți observa că se adaugă noi cereri de rețea care provin de la portul implicit al NATS, 4222.

Configurații corecte pentru modul embedding

  • Pentru a realiza comunicarea în modul embedded conform intenției, sunt necesare următoarele două opțiuni:

    • Client: Trebuie inclusă opțiunea InProcessServer.
    • Server: În Server.Options, flag-ul DontListen trebuie specificat ca true.
  • Aceste aspecte nu au fost documentate oficial, iar inițierea acestei funcționalități poate fi înțeleasă prin intermediul PR-ului respectiv.

    Acest PR adaugă trei lucruri:

    1. Funcția InProcessConn() la Server care construiește un net.Pipe pentru a obține o conexiune la serverul NATS fără a utiliza socket-uri TCP
    2. Opțiunea DontListen care indică serverului NATS să nu asculte pe ascultătorul TCP obișnuit
    3. Canalul startupComplete, care este închis chiar înainte de a începe AcceptLoop, iar readyForConnections va aștepta acest lucru

    Principala motivație pentru aceasta este că avem o aplicație care poate rula fie într-un mod monolit (single-process), fie într-un mod polilit (multi-process). Am dori să putem utiliza NATS pentru ambele moduri, pentru simplitate, dar modul monolit trebuie să poată satisface o varietate de platforme unde deschiderea conexiunilor socket fie nu are sens (mobil), fie pur și simplu nu este posibilă (WASM). Aceste modificări ne vor permite să utilizăm NATS în întregime in-process.

    Un PR însoțitor nats-io/nats.go#774 adaugă suport pentru partea de client.

    Acesta este primul meu PR la acest proiect, așa că îmi cer scuze în avans dacă am omis ceva evident undeva.

    /cc @nats-io/core

Exemplu funcțional pentru modul embedded

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// pentru configurarea serverului NATS încorporat
14		// setați DontListen ca true
15		DontListen: true,
16	}
17
18	// Inițializați un nou server cu opțiuni
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Porniți serverul printr-o goroutine
26	go ns.Start()
27
28	// Așteptați ca serverul să fie gata pentru conexiuni
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("nu este gata pentru conexiune")
31	}
32
33	// Conectați-vă la server printr-o conexiune in-process
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Abonați-vă la subiect
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Afișați datele mesajului
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Opriți serverul (opțional)
49		ns.Shutdown()
50	})
51
52	// Publicați date către subiect
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Așteptați oprirea serverului
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Acum se poate observa că nu mai apar hop-uri de rețea suplimentare, conform intenției.

Sub capotă

TL;DR

diagram1

  • Aceasta este o diagramă secvențială care ilustrează modul în care funcțiile interne operează la executarea codului în main.go, iar esența poate fi explicată după cum urmează.
    • Prin DontListen: true, serverul omite faza de ascultare a clientului denumită AcceptLoop.
    • Dacă opțiunea de conectare a clientului InProcessServer este activată, se creează o conexiune in-memory, se stabilește o conductă prin net.Pipe, iar capătul conductei este returnat clientului ca tip net.Conn.
    • Clientul și serverul realizează comunicarea in-process prin intermediul acestei conexiuni.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Așteaptă clienții.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • În primul rând, dacă DontListen este true, faza de ascultare a clientului denumită AcceptLoop este omisă.
 1// nats-server/server/server.go
 2
 3// AcceptLoop este exportată pentru o testare mai ușoară.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Dacă am ieși înainte ca ascultătorul să fie configurat corect,
 6	// asigurați-vă că închideți canalul.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Faceți un snapshot al opțiunilor serverului.
18	opts := s.getOpts()
19
20	// Configurați starea care poate permite oprirea
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Eroare la ascultarea pe port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Ascultă conexiuni client pe %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Alertează dacă TLS este activat.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS necesar pentru conexiunile client")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clienții care nu utilizează opțiunea \"TLS Handshake First\" nu se vor putea conecta")
38		}
39	}
40
41	// Dacă serverul a fost pornit cu RANDOM_PORT (-1), opts.Port ar fi egal
42	// cu 0 la începutul acestei funcții. Deci trebuie să obținem portul real
43	if opts.Port == 0 {
44		// Scrie portul rezolvat înapoi în opțiuni.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Acum că portul a fost setat (dacă a fost setat la RANDOM), setați
49	// Host/Port-ul informațiilor serverului cu valorile din Options sau
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Eroare la setarea INFO serverului cu valoarea ClientAdvertise de %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Păstrați evidența URL-urilor de conectare ale clientului. S-ar putea să avem nevoie de ele mai târziu.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Semnalați că nu acceptăm clienți noi
65				s.ldmCh <- true
66				// Acum așteptați oprirea...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Anunțați apelantul că suntem gata
75	close(clr)
76	clr = nil
77}
  • De altfel, funcția AcceptLoop efectuează următoarele procese. Acestea sunt aspecte legate de comunicarea în rețea, cum ar fi TLS sau hostPort, și pot fi omise deoarece nu sunt necesare în cazul comunicării in-process.

Client

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect va încerca să se conecteze la sistemul NATS.
 5// URL-ul poate conține semantică de nume de utilizator/parolă. de exemplu nats://derek:pass@localhost:4222
 6// Sunt suportate și array-uri separate prin virgulă, de exemplu urlA, urlB.
 7// Opțiunile încep cu valorile implicite, dar pot fi suprascrise.
 8// Pentru a vă conecta la un port websocket al unui server NATS, utilizați schema `ws` sau `wss`, cum ar fi
 9// `ws://localhost:8080`. Rețineți că schemele websocket nu pot fi amestecate cu altele (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options pot fi utilizate pentru a crea o conexiune personalizată.
 4type Options struct {
 5	// Url reprezintă un singur URL de server NATS la care clientul
 6	// se va conecta. Dacă este setată și opțiunea Servers, aceasta
 7	// devine primul server din array-ul Servers.
 8	Url string
 9
10	// InProcessServer reprezintă un server NATS care rulează în același
11	// proces. Dacă aceasta este setată, vom încerca să ne conectăm
12	// direct la server, în loc să utilizăm conexiuni TCP externe.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Funcția Connect, care realizează conexiunea dintre serverul NATS și clientul NATS, permite configurarea URL-ului clientului și a opțiunilor de conectare. Structura Options, care conține aceste opțiuni, include un câmp InProcessServer de tip interfață InProcessConnProvider.
1// main.go din codul exemplu
2
3// Inițializați un nou server cu opțiuni
4ns, err := server.NewServer(opts)
5
6//...
7
8// Conectați-vă la server printr-o conexiune in-process
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Când clientul NATS inițiază o conexiune și transmite nats.InProcessServer(ns) către câmpul InProcessServer al opțiunii,
 1// nats-go/nats.go
 2
 3// InProcessServer este o opțiune care va încerca să stabilească o direcție către un server NATS
 4// care rulează în cadrul procesului, în loc să stabilească o conexiune prin TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • InProcessServer al opțiunii este înlocuit cu serverul NATS încorporat și
 1// nats-go/nats.go
 2
 3// createConn se va conecta la server și va încapsula structurile
 4// bufio corespunzătoare. Va acționa corect atunci când există o
 5// conexiune existentă.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Dacă avem o referință la un server in-process, atunci stabilim o
15	// conexiune utilizând acel server.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("nu s-a putut obține conexiunea in-process: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • Această interfață este executată în funcția createConn care creează conexiunea, atunci când opțiunea InProcessServer nu este nil (validă), rulând InProcessConn din InProcessServer al opțiunii.
 1// nats-server/server/server.go
 2
 3// InProcessConn returnează o conexiune in-process la server,
 4// evitând necesitatea utilizării unui ascultător TCP pentru conectivitate locală
 5// în cadrul aceluiași proces. Aceasta poate fi utilizată indiferent de
 6// starea opțiunii DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("nu s-a putut crea conexiunea")
16	}
17	return pr, nil
18}
  • Este apelată și executată implementarea InProcessConn din server.
  • Această funcție, atunci când InProcessServer al nc (conexiunea NATS) nu este nil în nats.go, clientul Go al NATS, este apelată pentru a crea o conexiune (net.Conn) și a o lega la conexiunea serverului.

Interfață orientată pe consumator în Go

Un tip implementează o interfață prin implementarea metodelor sale. Nu există o declarație explicită de intenție, niciun cuvânt cheie „implements”. Interfețele implicite decuplează definiția unei interfețe de implementarea sa, care ar putea apărea apoi în orice pachet fără o aranjare prealabilă.

Interfețele sunt implementate implicit, Un tur al Go

Dacă un tip există doar pentru a implementa o interfață și nu va avea niciodată metode exportate în afara acelei interfețe, nu este necesar să exportați tipul însuși.

Generalitate, Go Eficient

  • Acest design de interfață încorporează bine ceea ce este adesea denumit în Go interfața definită de consumator și structural typing (duck typing), așa că aș dori să introduc și acest subiect.
 1// nats-go/nats.go
 2
 3// Options pot fi utilizate pentru a crea o conexiune personalizată.
 4type Options struct {
 5	// Url reprezintă un singur URL de server NATS la care clientul
 6	// se va conecta. Dacă este setată și opțiunea Servers, aceasta
 7	// devine primul server din array-ul Servers.
 8	Url string
 9
10	// InProcessServer reprezintă un server NATS care rulează în același
11	// proces. Dacă aceasta este setată, vom încerca să ne conectăm
12	// direct la server, în loc să utilizăm conexiuni TCP externe.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Să revenim la cod. Câmpul structurii opțiunii InProcessServer din clientul nats.go a fost definit ca o interfață InProcessConnProvider care execută doar InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn returnează o conexiune in-process la server,
 4// evitând necesitatea utilizării unui ascultător TCP pentru conectivitate locală
 5// în cadrul aceluiași proces. Aceasta poate fi utilizată indiferent de
 6// starea opțiunii DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("nu s-a putut crea conexiunea")
16	}
17	return pr, nil
18}
  • Cu toate acestea, tipul introdus în aceasta este Server din nats-server, care efectuează diverse funcții pe lângă InProcessConn.
  • Aceasta se datorează faptului că, în această situație, preocuparea clientului este doar dacă a fost furnizată interfața InProcessConn, iar alte aspecte nu sunt de o importanță majoră.
  • Prin urmare, clientul nats.go creează și utilizează doar o interfață definită de consumator numită InProcessConnProvider, care definește doar funcționalitatea InProcessConn() (net.Conn, error).

Concluzie

  • Am abordat pe scurt modul embedded al NATS și funcționarea sa, precum și interfața definită de consumator în Go, care poate fi observată prin codul NATS.
  • Sper ca aceste informații să fie utile celor care utilizează NATS în scopurile menționate mai sus, și cu aceasta, închei acest articol.