Come comunicano le NATS embedded con un'applicazione Go?
Getting started
About NATS
Le applicazioni e i servizi software necessitano di scambiare dati. NATS è un'infrastruttura che consente tale scambio di dati, segmentato sotto forma di messaggi. Questo lo definiamo un "middleware orientato ai messaggi".
Con NATS, gli sviluppatori di applicazioni possono:
- Costruire senza sforzo applicazioni client-server distribuite e scalabili.
- Archiviare e distribuire dati in tempo reale in modo generale. Ciò può essere ottenuto in modo flessibile attraverso vari ambienti, linguaggi, fornitori di cloud e sistemi on-premises.
- NATS è un message broker configurato in Go.
Embedded NATS
Se la vostra applicazione è in Go, e se si adatta al vostro caso d'uso e agli scenari di deployment, potete persino incorporare un server NATS all'interno della vostra applicazione.
Incorporare NATS, documentazione NATS
- E c'è una caratteristica particolare di NATS: supporta la modalità embedded per le applicazioni costruite con Go.
- In altre parole, invece del metodo comune per i message broker, che prevede l'avvio di un server broker separato e la comunicazione tramite il client dell'applicazione con tale server, è possibile incorporare il broker stesso nell'applicazione creata con Go.
Benefits and use cases of embedded NATS
- C'è un video di YouTube ben spiegato, quindi mi limiterò a fornire il link al video.
- Anche senza deployare un server message broker separato, si possono ottenere i vantaggi di incorporare NATS per creare un'applicazione modular monolith e raggiungere la separation of concerns. Inoltre, è possibile un single binary deployment.
- Può essere utilizzato utilmente non solo su piattaforme senza rete (wasm), ma anche in applicazioni offline-first.
Example on official docs
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Inizializza un nuovo server con le opzioni
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Avvia il server tramite goroutine
22 go ns.Start()
23
24 // Attendi che il server sia pronto per le connessioni
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("non pronto per la connessione")
27 }
28
29 // Connettiti al server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Iscriviti all'argomento
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Stampa i dati del messaggio
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Spegni il server (opzionale)
45 ns.Shutdown()
46 })
47
48 // Pubblica i dati sull'argomento
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Attendi lo spegnimento del server
52 ns.WaitForShutdown()
53}
- Questo è un esempio di NATS embedded dalla documentazione ufficiale di NATS, ma la comunicazione non avviene in modalità embedding se si procede con il codice d'esempio.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Eseguendo il comando
watch 'netstat -an | grep 127.0.0.1'
per controllare il traffico di rete in entrata e in uscita da localhost(127.0.0.1) e poi eseguendo il file Go congo run .
, si possono osservare nuove richieste di rete in partenza dalla porta predefinita di NATS, la4222
.
Right configurations for embedding mode
Per comunicare in modalità embedded come previsto, sono necessarie le seguenti due opzioni:
- Client: È necessario includere l'opzione
InProcessServer
. - Server: Nella
Server.Options
, il flagDontListen
deve essere esplicitamente impostato sutrue
.
- Client: È necessario includere l'opzione
Queste parti non erano ufficialmente documentate, e l'inizio di questa funzionalità può essere individuato tramite questa PR.
Questa PR aggiunge tre elementi:
- Funzione
InProcessConn()
aServer
che costruisce unanet.Pipe
per ottenere una connessione al server NATS senza usare i socket TCP. - Opzione
DontListen
che indica al server NATS di non ascoltare sul solito listener TCP. - Canale
startupComplete
, che viene chiuso poco prima di avviareAcceptLoop
, ereadyForConnections
lo attenderà.
La motivazione principale di ciò è che abbiamo un'applicazione che può essere eseguita in modalità monolitica (single-process) o polilitica (multi-process). Vorremmo poter utilizzare NATS per entrambe le modalità per semplicità, ma la modalità monolitica deve essere in grado di soddisfare una varietà di piattaforme in cui l'apertura di connessioni socket non ha senso (mobile) o semplicemente non è possibile (WASM). Queste modifiche ci permetteranno di utilizzare NATS interamente in-process.
Una PR di accompagnamento nats-io/nats.go#774 aggiunge il supporto lato client.
Questa è la mia prima PR a questo progetto, quindi mi scuso in anticipo se ho tralasciato qualcosa di ovvio da qualche parte.
/cc @nats-io/core
- Funzione
Working Example for embedded mode
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // per la configurazione del server NATS integrato
14 // imposta DonListen su true
15 DontListen: true,
16 }
17
18 // Inizializza un nuovo server con le opzioni
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Avvia il server tramite goroutine
26 go ns.Start()
27
28 // Attendi che il server sia pronto per le connessioni
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("non pronto per la connessione")
31 }
32
33 // Connettiti al server tramite connessione in-process
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Iscriviti all'argomento
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Stampa i dati del messaggio
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Spegni il server (opzionale)
49 ns.Shutdown()
50 })
51
52 // Pubblica i dati sull'argomento
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Attendi lo spegnimento del server
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Ora si può osservare che non si verificano ulteriori
network hop
come previsto.
Under the hood
TL;DR
- Questo è un diagramma di sequenza che mostra come le funzioni interne operano quando il codice viene eseguito da
main.go
, e il punto principale è il seguente:- Impostando
DontListen: true
, il server salta la fase di ascolto del client chiamataAcceptLoop
. - Se l'opzione
InProcessServer
del Connect del client è attiva, viene creata una connessione in-memory e una pipe tramitenet.Pipe
, dopodiché l'estremità della pipe viene restituita al client come tiponet.Conn
. - Il client e il server comunicano in-process tramite questa connessione.
- Impostando
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Attendi i client.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Innanzitutto, se
DontListen
ètrue
, la fase di ascolto del client chiamataAcceptLoop
viene omessa.
1// nats-server/server/server.go
2
3// AcceptLoop è esportata per facilitare i test.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Se dovessimo uscire prima che il listener sia configurato correttamente,
6 // assicurati di chiudere il canale.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Snapshot delle opzioni del server.
18 opts := s.getOpts()
19
20 // Imposta lo stato che può abilitare lo spegnimento
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Errore durante l'ascolto sulla porta: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("In ascolto per connessioni client su %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Avvisa di TLS abilitato.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS richiesto per le connessioni client")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("I client che non utilizzano l'opzione \"TLS Handshake First\" non riusciranno a connettersi")
38 }
39 }
40
41 // Se il server è stato avviato con RANDOM_PORT (-1), opts.Port sarebbe uguale
42 // a 0 all'inizio di questa funzione. Quindi dobbiamo ottenere la porta effettiva
43 if opts.Port == 0 {
44 // Scrivi la porta risolta nelle opzioni.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Ora che la porta è stata impostata (se era impostata su RANDOM), imposta l'Host/Port
49 // delle informazioni del server con i valori delle Opzioni o di ClientAdvertise.
50 if err := s.setInfoHostPort(); err != nil {
51 s.Fatalf("Errore durante l'impostazione delle INFO del server con il valore ClientAdvertise di %s, err=%v", opts.ClientAdvertise, err)
52 l.Close()
53 s.mu.Unlock()
54 return
55 }
56 // Tieni traccia degli URL di connessione client. Potremmo averne bisogno in seguito.
57 s.clientConnectURLs = s.getClientConnectURLs()
58 s.listener = l
59
60 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
61 func(_ error) bool {
62 if s.isLameDuckMode() {
63 // Segnala che non stiamo accettando nuovi client
64 s.ldmCh <- true
65 // Ora attendi lo spegnimento...
66 <-s.quitCh
67 return true
68 }
69 return false
70 })
71 s.mu.Unlock()
72
73 // Fai sapere al chiamante che siamo pronti
74 close(clr)
75 clr = nil
76}
- Per riferimento, la funzione
AcceptLoop
esegue le seguenti operazioni: Queste sono parti relative alla comunicazione di rete, comeTLS
ohostPort
, che possono essere omesse poiché non sono necessarie quando si esegue una comunicazione in-process.
Client
InProcessServer
1
2// nats-go/nats.go
3
4// Connect tenterà di connettersi al sistema NATS.
5// L'URL può contenere la semantica username/password. es. nats://derek:pass@localhost:4222
6// Sono supportati anche array separati da virgole, es. urlA, urlB.
7// Le opzioni iniziano con i valori predefiniti ma possono essere sovrascritte.
8// Per connettersi alla porta websocket di un server NATS, utilizzare lo schema `ws` o `wss`, come
9// `ws://localhost:8080`. Si noti che gli schemi websocket non possono essere mescolati con altri (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Le opzioni possono essere utilizzate per creare una connessione personalizzata.
4type Options struct {
5 // Url rappresenta un singolo URL del server NATS a cui il client
6 // si connetterà. Se è impostata anche l'opzione Servers, essa
7 // diventa il primo server nell'array Servers.
8 Url string
9
10 // InProcessServer rappresenta un server NATS in esecuzione all'interno dello
11 // stesso processo. Se questo è impostato, tenteremo di connetterci
12 // direttamente al server anziché utilizzare connessioni TCP esterne.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- La funzione
Connect
, che gestisce la connessione tra il server NATS e il client NATS, può configurare l'URL del client e l'opzione di connessione; la strutturaOptions
, che raggruppa queste opzioni, contiene un campoInProcessServer
di tipo interfacciaInProcessConnProvider
.
1// main.go of example code
2
3// Inizializza un nuovo server con le opzioni
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connettiti al server tramite connessione in-process
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Quando il client NATS esegue la connessione, se
nats.InProcessServer(ns)
viene passato al campoInProcessServer
, allora:
1// nats-go/nats.go
2
3// createConn si connetterà al server e avvolgerà le appropriate
4// strutture bufio. Farà la cosa giusta quando una connessione
5// esistente è in atto.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Se abbiamo un riferimento a un server in-process, allora stabiliamo una
15 // connessione usando quello.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("impossibile ottenere la connessione in-process: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- Questa interfaccia esegue il
InProcessConn
delInProcessServer
nell'opzione, se l'opzioneInProcessServer
non ènil
(valida) nella funzionecreateConn
che crea la connessione, e quindi:
1// nats-server/server/server.go
2
3// InProcessConn restituisce una connessione in-process al server,
4// evitando la necessità di utilizzare un listener TCP per la connettività locale
5// all'interno dello stesso processo. Questo può essere utilizzato indipendentemente dallo
6// stato dell'opzione DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("impossibile creare la connessione")
16 }
17 return pr, nil
18}
- Chiama ed esegue l'implementazione di
InProcessConn
nel server. - Questa funzione viene chiamata quando
InProcessServer
del nc (nats connection) nel client Go di nats,nats.go
, non ènil
, creando una connessione (net.Conn
) e associandola alla connessione del server.
Consumer driven interface of Go
Un tipo implementa un'interfaccia implementando i suoi metodi. Non c'è una dichiarazione esplicita di intento, nessun parola chiave "implements". Le interfacce implicite disaccoppiano la definizione di un'interfaccia dalla sua implementazione, che potrebbe quindi apparire in qualsiasi pacchetto senza preaccordi.
Le interfacce sono implementate implicitamente, Un Tour di Go
Se un tipo esiste solo per implementare un'interfaccia e non avrà mai metodi esportati oltre a quell'interfaccia, non c'è bisogno di esportare il tipo stesso.
- Questo design dell'interfaccia incarna bene ciò che in Go viene comunemente definito "consumer defined interface" e "structural typing" (duck typing), quindi vorrei presentare anche questo argomento.
1// nats-go/nats.go
2
3// Le opzioni possono essere utilizzate per creare una connessione personalizzata.
4type Options struct {
5 // Url rappresenta un singolo URL del server NATS a cui il client
6 // si connetterà. Se è impostata anche l'opzione Servers, essa
7 // diventa il primo server nell'array Servers.
8 Url string
9
10 // InProcessServer rappresenta un server NATS in esecuzione all'interno dello
11 // stesso processo. Se questo è impostato, tenteremo di connetterci
12 // direttamente al server anziché utilizzare connessioni TCP esterne.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Torniamo al codice. Nel client nats.go, il campo
InProcessServer
della structoption
è definito come l'interfacciaInProcessConnProvider
, che esegue soloInProcessConn
.
1// nats-server/server/server.go
2
3// InProcessConn restituisce una connessione in-process al server,
4// evitando la necessità di utilizzare un listener TCP per la connettività locale
5// all'interno dello stesso processo. Questo può essere utilizzato indipendentemente dallo
6// stato dell'opzione DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("impossibile creare la connessione")
16 }
17 return pr, nil
18}
- Tuttavia, il tipo che vi rientra è il
Server
di nats-server, che svolge diverse funzioni oltre aInProcessConn
. - Questo perché, in questa situazione, l'unica preoccupazione del client è se l'interfaccia
InProcessConn
è stata fornita o meno; altre questioni non sono di grande importanza. - Pertanto, il client nats.go sta utilizzando solo un'interfaccia definita dal consumatore,
InProcessConnProvider
, che definisce solo la funzionalitàInProcessConn() (net.Conn, error)
.
Conclusion
- Ho brevemente trattato la modalità embedded di NATS, il suo funzionamento e le interfacce definite dal consumatore di Go che possono essere osservate attraverso il codice di NATS.
- Spero che queste informazioni siano utili a coloro che utilizzano NATS per scopi simili e con questo concludo questo articolo.