Come comunicano le NATS embedded con l'applicazione Go?
Getting started
About NATS
Le applicazioni e i servizi software necessitano di scambiare dati. NATS è un'infrastruttura che consente tale scambio di dati, segmentato sotto forma di messaggi. Questo lo definiamo un "middleware orientato ai messaggi".
Con NATS, gli sviluppatori di applicazioni possono:
- Costruire senza sforzo applicazioni client-server distribuite e scalabili.
- Archiviare e distribuire dati in tempo reale in modo generale. Questo può essere raggiunto in modo flessibile attraverso vari ambienti, linguaggi, cloud provider e sistemi on-premises.
- NATS è un message broker composto in Go.
Embedded NATS
Se la vostra applicazione è in Go, e se si adatta al vostro caso d'uso e agli scenari di deployment, potete persino incorporare un server NATS all'interno della vostra applicazione.
- E c'è una particolarità di NATS: supporta la modalità embedded per le applicazioni scritte in Go.
- In altre parole, invece del metodo tradizionale dei message broker, che prevede l'avvio di un server broker separato e la comunicazione tramite il client dell'applicazione con tale server, è possibile incorporare (embed) il broker stesso nell'applicazione Go.
Benefits and use cases of embedded NATS
- C'è un video di YouTube ben spiegato, quindi mi limiterò a fornire il link al video.
- Anche senza deployare un server message broker separato, è possibile creare un'applicazione modular monolith per ottenere la separatezza delle preoccupazioni e sfruttare il vantaggio di incorporare NATS. Inoltre, è possibile un single binary deployment.
- Può essere utilmente impiegato in piattaforme senza rete (WASM) e in applicazioni offline-first.
Example on official docs
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Inizializza un nuovo server con le opzioni
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Avvia il server tramite goroutine
22 go ns.Start()
23
24 // Attendi che il server sia pronto per le connessioni
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Connettiti al server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Sottoscriviti all'argomento
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Stampa i dati del messaggio
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Spegni il server (opzionale)
45 ns.Shutdown()
46 })
47
48 // Pubblica dati sull'argomento
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Attendi lo spegnimento del server
52 ns.WaitForShutdown()
53}
- Questo è un esempio di NATS embedded fornito nella documentazione ufficiale di NATS, ma se si segue questo codice di esempio, la comunicazione non avviene in modalità embedding.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Eseguendo il comando
watch 'netstat -an | grep 127.0.0.1'per monitorare il traffico di rete da e verso localhost (127.0.0.1) e poi eseguendo il file Go congo run ., si noterà l'aggiunta di nuove richieste di rete che partono dalla porta4222, la porta predefinita di NATS.
Right configurations for embedding mode
Per comunicare in modalità embedded come previsto, sono necessarie le seguenti due opzioni:
- Client: È necessario includere l'opzione
InProcessServer. - Server: Nel
Server.Options, è necessario specificare il flagDontListencometrue.
- Client: È necessario includere l'opzione
Queste parti non sono state documentate ufficialmente, e l'inizio di questa funzionalità può essere individuato attraverso questa PR.
Questa PR aggiunge tre elementi:
- La funzione
InProcessConn()aServerche crea unanet.Pipeper ottenere una connessione al server NATS senza utilizzare socket TCP. - L'opzione
DontListenche indica al server NATS di non mettersi in ascolto sul consueto listener TCP. - Il canale
startupComplete, che viene chiuso poco prima di avviareAcceptLoop, ereadyForConnectionslo attenderà.
La motivazione principale di ciò è che abbiamo un'applicazione che può funzionare sia in modalità monolitica (single-process) sia in modalità polilitica (multi-process). Vorremmo poter utilizzare NATS per entrambe le modalità per semplicità, ma la modalità monolitica deve essere in grado di soddisfare una varietà di piattaforme dove l'apertura di connessioni socket o non ha senso (mobile) o semplicemente non è possibile (WASM). Queste modifiche ci permetteranno di utilizzare NATS interamente in-process.
Una PR di accompagnamento nats-io/nats.go#774 aggiunge il supporto lato client.
Questa è la mia prima PR a questo progetto, quindi mi scuso in anticipo se ho tralasciato qualcosa di ovvio.
/cc @nats-io/core
- La funzione
Working Example for embedded mode
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // per configurare il server NATS embedded
14 // imposta DonListen su true
15 DontListen: true,
16 }
17
18 // Inizializza un nuovo server con le opzioni
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Avvia il server tramite goroutine
26 go ns.Start()
27
28 // Attendi che il server sia pronto per le connessioni
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Connettiti al server tramite connessione in-process
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Sottoscriviti all'argomento
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Stampa i dati del messaggio
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Spegni il server (opzionale)
49 ns.Shutdown()
50 })
51
52 // Pubblica dati sull'argomento
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Attendi lo spegnimento del server
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Ora si può osservare che non si verificano ulteriori network hop come previsto.
Under the hood
TL;DR
- Questo è un diagramma di sequenza che mostra come le funzioni interne operano quando il codice viene eseguito in
main.go, e la sua essenza è la seguente:- Tramite
DontListen: true, il server salta la fase di ascolto del client chiamataAcceptLoop. - Se l'opzione
InProcessServerdel client è attiva, viene creata una connessione in-memory e una pipe tramitenet.Pipe, quindi l'estremità della pipe viene restituita al client come tiponet.Conn. - Client e server comunicano in-process tramite questa connessione.
- Tramite
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Attendi i client.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- In primo luogo, se
DontListenè true, la fase di ascolto del client chiamataAcceptLoopviene omessa.
1// nats-server/server/server.go
2
3// AcceptLoop è esportata per facilitare i test.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Se dovessimo uscire prima che il listener sia configurato correttamente,
6 // assicuriamoci di chiudere il canale.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Snapshot delle opzioni del server.
18 opts := s.getOpts()
19
20 // Imposta lo stato che può abilitare lo spegnimento
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Error listening on port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Listening for client connections on %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Avvisa se TLS è abilitato.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS richiesto per le connessioni client")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("I client che non utilizzano l'opzione \"TLS Handshake First\" falliranno la connessione")
38 }
39 }
40
41 // Se il server è stato avviato con RANDOM_PORT (-1), opts.Port sarebbe uguale
42 // a 0 all'inizio di questa funzione. Quindi dobbiamo ottenere la porta effettiva
43 if opts.Port == 0 {
44 // Scrivi la porta risolta di nuovo nelle opzioni.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Ora che la porta è stata impostata (se era impostata su RANDOM), imposta l'Host/Port
49 // delle informazioni del server con i valori delle Opzioni o
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Tieni traccia degli URL di connessione del client. Potremmo averne bisogno in seguito.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Segnala che non stiamo accettando nuovi client
65 s.ldmCh <- true
66 // Ora attendi lo spegnimento...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Fai sapere al chiamante che siamo pronti
75 close(clr)
76 clr = nil
77}
- Si noti che la funzione AcceptLoop esegue i seguenti passaggi. Poiché si tratta di parti relative alla comunicazione di rete, come TLS o hostPort, e sono superflue per la comunicazione in-process, possono essere omesse.
Client
InProcessServer
1
2// nats-go/nats.go
3
4// Connect tenterà di connettersi al sistema NATS.
5// L'URL può contenere semantiche username/password. es. nats://derek:pass@localhost:4222
6// Sono supportati anche array separati da virgole, es. urlA, urlB.
7// Le opzioni partono dai valori predefiniti ma possono essere sovrascritte.
8// Per connettersi alla porta websocket di un server NATS, usare lo schema `ws` o `wss`, come
9// `ws://localhost:8080`. Si noti che gli schemi websocket non possono essere mescolati con altri (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Le opzioni possono essere utilizzate per creare una connessione personalizzata.
4type Options struct {
5 // Url rappresenta un singolo URL del server NATS a cui il client
6 // si connetterà. Se è impostata anche l'opzione Servers,
7 // diventa il primo server nell'array Servers.
8 Url string
9
10 // InProcessServer rappresenta un server NATS in esecuzione all'interno dello
11 // stesso processo. Se questo è impostato, tenteremo di connetterci
12 // direttamente al server anziché utilizzare connessioni TCP esterne.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- La funzione
Connect, che gestisce la connessione tra il server NATS e il client NATS, può configurare l'URL del client e l'opzione di connessione. La struct Options, che raccoglie queste opzioni, contiene un campoInProcessServerdi tipo interfacciaInProcessConnProvider.
1// main.go of example code
2
3// Inizializza un nuovo server con le opzioni
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connettiti al server tramite connessione in-process
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Quando il client NATS procede con la connessione, passando
nats.InProcessServer(ns)al campoInProcessServer:
1// nats-go/nats.go
2
3// InProcessServer è un'opzione che tenterà di stabilire una direzione verso un server NATS
4// in esecuzione all'interno del processo anziché tramite TCP.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- L'InProcessServer dell'opzione viene sostituito con il server NATS embedded.
1// nats-go/nats.go
2
3// createConn si connetterà al server e avvolgerà le appropriate
4// strutture bufio. Farà la cosa giusta quando una connessione esistente
5// è già presente.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Se abbiamo un riferimento a un server in-process, stabiliamo una
15 // connessione usando quello.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("failed to get in-process connection: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- Questa interfaccia, quando l'opzione
InProcessServernon è nil (valida) nella funzionecreateConnche crea la connessione, esegueInProcessConndell'InProcessServer presente nell'opzione.
1// nats-server/server/server.go
2
3// InProcessConn restituisce una connessione in-process al server,
4// evitando la necessità di utilizzare un listener TCP per la connettività locale
5// all'interno dello stesso processo. Questo può essere utilizzato indipendentemente dallo
6// stato dell'opzione DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- La funzione
InProcessConnimplementata sul server viene invocata ed eseguita. - Questa funzione, se il campo
InProcessServerdi nc (nats connection) nel client Go di NATS (nats.go) non è nil, viene chiamata per creare una connessione (net.Conn) e la lega alla connessione del server.
Consumer driven interface of Go
Un tipo implementa un'interfaccia implementandone i metodi. Non c'è una dichiarazione esplicita di intenti, nessuna parola chiave "implements". Le interfacce implicite disaccoppiano la definizione di un'interfaccia dalla sua implementazione, che potrebbe quindi apparire in qualsiasi pacchetto senza preaccordi.
Interfaces are implemented implicitly, A Tour of Go
Se un tipo esiste solo per implementare un'interfaccia e non avrà mai metodi esportati oltre quell'interfaccia, non è necessario esportare il tipo stesso.
- Questo design dell'interfaccia cattura bene le interfacce definite dal consumatore e il structural typing (duck typing) comuni in Go, quindi vorrei presentare anche questo argomento.
1// nats-go/nats.go
2
3// Le opzioni possono essere utilizzate per creare una connessione personalizzata.
4type Options struct {
5 // Url rappresenta un singolo URL del server NATS a cui il client
6 // si connetterà. Se è impostata anche l'opzione Servers,
7 // diventa il primo server nell'array Servers.
8 Url string
9
10 // InProcessServer rappresenta un server NATS in esecuzione all'interno dello
11 // stesso processo. Se questo è impostato, tenteremo di connetterci
12 // direttamente al server anziché utilizzare connessioni TCP esterne.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Torniamo al codice. Nel client nats.go, il campo
InProcessServerdella struct option è definito come un'interfacciaInProcessConnProviderche esegue soloInProcessConn.
1// nats-server/server/server.go
2
3// InProcessConn restituisce una connessione in-process al server,
4// evitando la necessità di utilizzare un listener TCP per la connettività locale
5// all'interno dello stesso processo. Questo può essere utilizzato indipendentemente dallo
6// stato dell'opzione DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- Tuttavia, il tipo che vi viene inserito è il
Serverdi nats-server, che esegue una varietà di funzioni oltre a InProcessConn. - Questo perché l'interesse del client in questa situazione è solo se l'interfaccia
InProcessConnè stata fornita o meno; le altre cose non sono particolarmente importanti. - Pertanto, il client nats.go sta utilizzando solo un'interfaccia definita dal consumatore,
InProcessConnProvider, che definisce solo la funzionalitàInProcessConn() (net.Conn, error).
Conclusion
- Ho trattato brevemente la modalità embedded di NATS, il suo funzionamento e le interfacce definite dal consumatore in Go che possono essere verificate attraverso il codice di NATS.
- Spero che queste informazioni siano utili a coloro che utilizzano NATS per scopi simili e con questo concludo questo articolo.