Come comunicano le NATS embedded con un'applicazione Go?
Getting started
About NATS
Le applicazioni e i servizi software necessitano di scambiare dati. NATS è un'infrastruttura che consente tale scambio di dati, segmentato sotto forma di messaggi. Questo lo definiamo un "middleware orientato ai messaggi".
Con NATS, gli sviluppatori di applicazioni possono:
- Costruire senza sforzo applicazioni client-server distribuite e scalabili.
 - Archiviare e distribuire dati in tempo reale in modo generale. Ciò può essere ottenuto in modo flessibile attraverso vari ambienti, linguaggi, fornitori di cloud e sistemi on-premises.
 
- NATS è un message broker configurato in Go.
 
Embedded NATS
Se la vostra applicazione è in Go, e se si adatta al vostro caso d'uso e agli scenari di deployment, potete persino incorporare un server NATS all'interno della vostra applicazione.
Incorporare NATS, documentazione NATS
- E c'è una caratteristica particolare di NATS: supporta la modalità embedded per le applicazioni costruite con Go.
 - In altre parole, invece del metodo comune per i message broker, che prevede l'avvio di un server broker separato e la comunicazione tramite il client dell'applicazione con tale server, è possibile incorporare il broker stesso nell'applicazione creata con Go.
 
Benefits and use cases of embedded NATS
- C'è un video di YouTube ben spiegato, quindi mi limiterò a fornire il link al video.
 - Anche senza deployare un server message broker separato, si possono ottenere i vantaggi di incorporare NATS per creare un'applicazione modular monolith e raggiungere la separation of concerns. Inoltre, è possibile un single binary deployment.
 - Può essere utilizzato utilmente non solo su piattaforme senza rete (wasm), ma anche in applicazioni offline-first.
 
Example on official docs
 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Inizializza un nuovo server con le opzioni
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Avvia il server tramite goroutine
22    go ns.Start()
23
24    // Attendi che il server sia pronto per le connessioni
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("non pronto per la connessione")
27    }
28
29    // Connettiti al server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Iscriviti all'argomento
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Stampa i dati del messaggio
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Spegni il server (opzionale)
45        ns.Shutdown()
46    })
47
48    // Pubblica i dati sull'argomento
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Attendi lo spegnimento del server
52    ns.WaitForShutdown()
53}
- Questo è un esempio di NATS embedded dalla documentazione ufficiale di NATS, ma la comunicazione non avviene in modalità embedding se si procede con il codice d'esempio.
 
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
- Eseguendo il comando 
watch 'netstat -an | grep 127.0.0.1'per controllare il traffico di rete in entrata e in uscita da localhost(127.0.0.1) e poi eseguendo il file Go congo run ., si possono osservare nuove richieste di rete in partenza dalla porta predefinita di NATS, la4222. 
Right configurations for embedding mode
Per comunicare in modalità embedded come previsto, sono necessarie le seguenti due opzioni:
- Client: È necessario includere l'opzione 
InProcessServer. - Server: Nella 
Server.Options, il flagDontListendeve essere esplicitamente impostato sutrue. 
- Client: È necessario includere l'opzione 
 Queste parti non erano ufficialmente documentate, e l'inizio di questa funzionalità può essere individuato tramite questa PR.
Questa PR aggiunge tre elementi:
- Funzione 
InProcessConn()aServerche costruisce unanet.Pipeper ottenere una connessione al server NATS senza usare i socket TCP. - Opzione 
DontListenche indica al server NATS di non ascoltare sul solito listener TCP. - Canale 
startupComplete, che viene chiuso poco prima di avviareAcceptLoop, ereadyForConnectionslo attenderà. 
La motivazione principale di ciò è che abbiamo un'applicazione che può essere eseguita in modalità monolitica (single-process) o polilitica (multi-process). Vorremmo poter utilizzare NATS per entrambe le modalità per semplicità, ma la modalità monolitica deve essere in grado di soddisfare una varietà di piattaforme in cui l'apertura di connessioni socket non ha senso (mobile) o semplicemente non è possibile (WASM). Queste modifiche ci permetteranno di utilizzare NATS interamente in-process.
Una PR di accompagnamento nats-io/nats.go#774 aggiunge il supporto lato client.
Questa è la mia prima PR a questo progetto, quindi mi scuso in anticipo se ho tralasciato qualcosa di ovvio da qualche parte.
/cc @nats-io/core
- Funzione 
 
Working Example for embedded mode
 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// per la configurazione del server NATS integrato
14		// imposta DonListen su true
15		DontListen: true,
16	}
17
18	// Inizializza un nuovo server con le opzioni
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Avvia il server tramite goroutine
26	go ns.Start()
27
28	// Attendi che il server sia pronto per le connessioni
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("non pronto per la connessione")
31	}
32
33	// Connettiti al server tramite connessione in-process
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Iscriviti all'argomento
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Stampa i dati del messaggio
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Spegni il server (opzionale)
49		ns.Shutdown()
50	})
51
52	// Pubblica i dati sull'argomento
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Attendi lo spegnimento del server
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
- Ora si può osservare che non si verificano ulteriori 
network hopcome previsto. 
Under the hood
TL;DR
- Questo è un diagramma di sequenza che mostra come le funzioni interne operano quando il codice viene eseguito da 
main.go, e il punto principale è il seguente:- Impostando 
DontListen: true, il server salta la fase di ascolto del client chiamataAcceptLoop. - Se l'opzione 
InProcessServerdel Connect del client è attiva, viene creata una connessione in-memory e una pipe tramitenet.Pipe, dopodiché l'estremità della pipe viene restituita al client come tiponet.Conn. - Il client e il server comunicano in-process tramite questa connessione.
 
 - Impostando 
 
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Attendi i client.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
- Innanzitutto, se 
DontListenètrue, la fase di ascolto del client chiamataAcceptLoopviene omessa. 
 1// nats-server/server/server.go
 2
 3// AcceptLoop è esportata per facilitare i test.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Se dovessimo uscire prima che il listener sia configurato correttamente,
 6	// assicurati di chiudere il canale.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot delle opzioni del server.
18	opts := s.getOpts()
19
20	// Imposta lo stato che può abilitare lo spegnimento
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Errore durante l'ascolto sulla porta: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("In ascolto per connessioni client su %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Avvisa di TLS abilitato.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS richiesto per le connessioni client")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("I client che non utilizzano l'opzione \"TLS Handshake First\" non riusciranno a connettersi")
38		}
39	}
40
41	// Se il server è stato avviato con RANDOM_PORT (-1), opts.Port sarebbe uguale
42	// a 0 all'inizio di questa funzione. Quindi dobbiamo ottenere la porta effettiva
43	if opts.Port == 0 {
44		// Scrivi la porta risolta nelle opzioni.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Ora che la porta è stata impostata (se era impostata su RANDOM), imposta l'Host/Port
49	// delle informazioni del server con i valori delle Opzioni o di ClientAdvertise.
50	if err := s.setInfoHostPort(); err != nil {
51		s.Fatalf("Errore durante l'impostazione delle INFO del server con il valore ClientAdvertise di %s, err=%v", opts.ClientAdvertise, err)
52		l.Close()
53		s.mu.Unlock()
54		return
55	}
56	// Tieni traccia degli URL di connessione client. Potremmo averne bisogno in seguito.
57	s.clientConnectURLs = s.getClientConnectURLs()
58	s.listener = l
59
60	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
61		func(_ error) bool {
62			if s.isLameDuckMode() {
63				// Segnala che non stiamo accettando nuovi client
64				s.ldmCh <- true
65				// Ora attendi lo spegnimento...
66				<-s.quitCh
67				return true
68			}
69			return false
70		})
71	s.mu.Unlock()
72
73	// Fai sapere al chiamante che siamo pronti
74	close(clr)
75	clr = nil
76}
- Per riferimento, la funzione 
AcceptLoopesegue le seguenti operazioni: Queste sono parti relative alla comunicazione di rete, comeTLSohostPort, che possono essere omesse poiché non sono necessarie quando si esegue una comunicazione in-process. 
Client
InProcessServer
 1
 2// nats-go/nats.go
 3
 4// Connect tenterà di connettersi al sistema NATS.
 5// L'URL può contenere la semantica username/password. es. nats://derek:pass@localhost:4222
 6// Sono supportati anche array separati da virgole, es. urlA, urlB.
 7// Le opzioni iniziano con i valori predefiniti ma possono essere sovrascritte.
 8// Per connettersi alla porta websocket di un server NATS, utilizzare lo schema `ws` o `wss`, come
 9// `ws://localhost:8080`. Si noti che gli schemi websocket non possono essere mescolati con altri (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Le opzioni possono essere utilizzate per creare una connessione personalizzata.
 4type Options struct {
 5	// Url rappresenta un singolo URL del server NATS a cui il client
 6	// si connetterà. Se è impostata anche l'opzione Servers, essa
 7	// diventa il primo server nell'array Servers.
 8	Url string
 9
10	// InProcessServer rappresenta un server NATS in esecuzione all'interno dello
11	// stesso processo. Se questo è impostato, tenteremo di connetterci
12	// direttamente al server anziché utilizzare connessioni TCP esterne.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
- La funzione 
Connect, che gestisce la connessione tra il server NATS e il client NATS, può configurare l'URL del client e l'opzione di connessione; la strutturaOptions, che raggruppa queste opzioni, contiene un campoInProcessServerdi tipo interfacciaInProcessConnProvider. 
1// main.go of example code
2
3// Inizializza un nuovo server con le opzioni
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connettiti al server tramite connessione in-process
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Quando il client NATS esegue la connessione, se 
nats.InProcessServer(ns)viene passato al campoInProcessServer, allora: 
 1// nats-go/nats.go
 2
 3// createConn si connetterà al server e avvolgerà le appropriate
 4// strutture bufio. Farà la cosa giusta quando una connessione
 5// esistente è in atto.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Se abbiamo un riferimento a un server in-process, allora stabiliamo una
15	// connessione usando quello.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("impossibile ottenere la connessione in-process: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
- Questa interfaccia esegue il 
InProcessConndelInProcessServernell'opzione, se l'opzioneInProcessServernon ènil(valida) nella funzionecreateConnche crea la connessione, e quindi: 
 1// nats-server/server/server.go
 2
 3// InProcessConn restituisce una connessione in-process al server,
 4// evitando la necessità di utilizzare un listener TCP per la connettività locale
 5// all'interno dello stesso processo. Questo può essere utilizzato indipendentemente dallo
 6// stato dell'opzione DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("impossibile creare la connessione")
16	}
17	return pr, nil
18}
- Chiama ed esegue l'implementazione di 
InProcessConnnel server. - Questa funzione viene chiamata quando 
InProcessServerdel nc (nats connection) nel client Go di nats,nats.go, non ènil, creando una connessione (net.Conn) e associandola alla connessione del server. 
Consumer driven interface of Go
Un tipo implementa un'interfaccia implementando i suoi metodi. Non c'è una dichiarazione esplicita di intento, nessun parola chiave "implements". Le interfacce implicite disaccoppiano la definizione di un'interfaccia dalla sua implementazione, che potrebbe quindi apparire in qualsiasi pacchetto senza preaccordi.
Le interfacce sono implementate implicitamente, Un Tour di Go
Se un tipo esiste solo per implementare un'interfaccia e non avrà mai metodi esportati oltre a quell'interfaccia, non c'è bisogno di esportare il tipo stesso.
- Questo design dell'interfaccia incarna bene ciò che in Go viene comunemente definito "consumer defined interface" e "structural typing" (duck typing), quindi vorrei presentare anche questo argomento.
 
 1// nats-go/nats.go
 2
 3// Le opzioni possono essere utilizzate per creare una connessione personalizzata.
 4type Options struct {
 5	// Url rappresenta un singolo URL del server NATS a cui il client
 6	// si connetterà. Se è impostata anche l'opzione Servers, essa
 7	// diventa il primo server nell'array Servers.
 8	Url string
 9
10	// InProcessServer rappresenta un server NATS in esecuzione all'interno dello
11	// stesso processo. Se questo è impostato, tenteremo di connetterci
12	// direttamente al server anziché utilizzare connessioni TCP esterne.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
- Torniamo al codice. Nel client nats.go, il campo 
InProcessServerdella structoptionè definito come l'interfacciaInProcessConnProvider, che esegue soloInProcessConn. 
 1// nats-server/server/server.go
 2
 3// InProcessConn restituisce una connessione in-process al server,
 4// evitando la necessità di utilizzare un listener TCP per la connettività locale
 5// all'interno dello stesso processo. Questo può essere utilizzato indipendentemente dallo
 6// stato dell'opzione DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("impossibile creare la connessione")
16	}
17	return pr, nil
18}
- Tuttavia, il tipo che vi rientra è il 
Serverdi nats-server, che svolge diverse funzioni oltre aInProcessConn. - Questo perché, in questa situazione, l'unica preoccupazione del client è se l'interfaccia 
InProcessConnè stata fornita o meno; altre questioni non sono di grande importanza. - Pertanto, il client nats.go sta utilizzando solo un'interfaccia definita dal consumatore, 
InProcessConnProvider, che definisce solo la funzionalitàInProcessConn() (net.Conn, error). 
Conclusion
- Ho brevemente trattato la modalità embedded di NATS, il suo funzionamento e le interfacce definite dal consumatore di Go che possono essere osservate attraverso il codice di NATS.
 - Spero che queste informazioni siano utili a coloro che utilizzano NATS per scopi simili e con questo concludo questo articolo.