GoSuda

Come comunicano le NATS embedded con un'applicazione Go?

By prravda
views ...

Getting started

About NATS

Le applicazioni e i servizi software necessitano di scambiare dati. NATS è un'infrastruttura che consente tale scambio di dati, segmentato sotto forma di messaggi. Questo lo definiamo un "middleware orientato ai messaggi".

Con NATS, gli sviluppatori di applicazioni possono:

  • Costruire senza sforzo applicazioni client-server distribuite e scalabili.
  • Archiviare e distribuire dati in tempo reale in modo generale. Ciò può essere ottenuto in modo flessibile attraverso vari ambienti, linguaggi, fornitori di cloud e sistemi on-premises.

Cos'è NATS, documentazione NATS

  • NATS è un message broker configurato in Go.

Embedded NATS

Se la vostra applicazione è in Go, e se si adatta al vostro caso d'uso e agli scenari di deployment, potete persino incorporare un server NATS all'interno della vostra applicazione.

Incorporare NATS, documentazione NATS

  • E c'è una caratteristica particolare di NATS: supporta la modalità embedded per le applicazioni costruite con Go.
  • In altre parole, invece del metodo comune per i message broker, che prevede l'avvio di un server broker separato e la comunicazione tramite il client dell'applicazione con tale server, è possibile incorporare il broker stesso nell'applicazione creata con Go.

Benefits and use cases of embedded NATS

  • C'è un video di YouTube ben spiegato, quindi mi limiterò a fornire il link al video.
  • Anche senza deployare un server message broker separato, si possono ottenere i vantaggi di incorporare NATS per creare un'applicazione modular monolith e raggiungere la separation of concerns. Inoltre, è possibile un single binary deployment.
  • Può essere utilizzato utilmente non solo su piattaforme senza rete (wasm), ma anche in applicazioni offline-first.

Example on official docs

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Inizializza un nuovo server con le opzioni
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Avvia il server tramite goroutine
22    go ns.Start()
23
24    // Attendi che il server sia pronto per le connessioni
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("non pronto per la connessione")
27    }
28
29    // Connettiti al server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Iscriviti all'argomento
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Stampa i dati del messaggio
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Spegni il server (opzionale)
45        ns.Shutdown()
46    })
47
48    // Pubblica i dati sull'argomento
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Attendi lo spegnimento del server
52    ns.WaitForShutdown()
53}
  • Questo è un esempio di NATS embedded dalla documentazione ufficiale di NATS, ma la comunicazione non avviene in modalità embedding se si procede con il codice d'esempio.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Eseguendo il comando watch 'netstat -an | grep 127.0.0.1' per controllare il traffico di rete in entrata e in uscita da localhost(127.0.0.1) e poi eseguendo il file Go con go run ., si possono osservare nuove richieste di rete in partenza dalla porta predefinita di NATS, la 4222.

Right configurations for embedding mode

  • Per comunicare in modalità embedded come previsto, sono necessarie le seguenti due opzioni:

    • Client: È necessario includere l'opzione InProcessServer.
    • Server: Nella Server.Options, il flag DontListen deve essere esplicitamente impostato su true.
  • Queste parti non erano ufficialmente documentate, e l'inizio di questa funzionalità può essere individuato tramite questa PR.

    Questa PR aggiunge tre elementi:

    1. Funzione InProcessConn() a Server che costruisce una net.Pipe per ottenere una connessione al server NATS senza usare i socket TCP.
    2. Opzione DontListen che indica al server NATS di non ascoltare sul solito listener TCP.
    3. Canale startupComplete, che viene chiuso poco prima di avviare AcceptLoop, e readyForConnections lo attenderà.

    La motivazione principale di ciò è che abbiamo un'applicazione che può essere eseguita in modalità monolitica (single-process) o polilitica (multi-process). Vorremmo poter utilizzare NATS per entrambe le modalità per semplicità, ma la modalità monolitica deve essere in grado di soddisfare una varietà di piattaforme in cui l'apertura di connessioni socket non ha senso (mobile) o semplicemente non è possibile (WASM). Queste modifiche ci permetteranno di utilizzare NATS interamente in-process.

    Una PR di accompagnamento nats-io/nats.go#774 aggiunge il supporto lato client.

    Questa è la mia prima PR a questo progetto, quindi mi scuso in anticipo se ho tralasciato qualcosa di ovvio da qualche parte.

    /cc @nats-io/core

Working Example for embedded mode

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// per la configurazione del server NATS integrato
14		// imposta DonListen su true
15		DontListen: true,
16	}
17
18	// Inizializza un nuovo server con le opzioni
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Avvia il server tramite goroutine
26	go ns.Start()
27
28	// Attendi che il server sia pronto per le connessioni
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("non pronto per la connessione")
31	}
32
33	// Connettiti al server tramite connessione in-process
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Iscriviti all'argomento
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Stampa i dati del messaggio
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Spegni il server (opzionale)
49		ns.Shutdown()
50	})
51
52	// Pubblica i dati sull'argomento
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Attendi lo spegnimento del server
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Ora si può osservare che non si verificano ulteriori network hop come previsto.

Under the hood

TL;DR

diagram1

  • Questo è un diagramma di sequenza che mostra come le funzioni interne operano quando il codice viene eseguito da main.go, e il punto principale è il seguente:
    • Impostando DontListen: true, il server salta la fase di ascolto del client chiamata AcceptLoop.
    • Se l'opzione InProcessServer del Connect del client è attiva, viene creata una connessione in-memory e una pipe tramite net.Pipe, dopodiché l'estremità della pipe viene restituita al client come tipo net.Conn.
    • Il client e il server comunicano in-process tramite questa connessione.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Attendi i client.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Innanzitutto, se DontListen è true, la fase di ascolto del client chiamata AcceptLoop viene omessa.
 1// nats-server/server/server.go
 2
 3// AcceptLoop è esportata per facilitare i test.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Se dovessimo uscire prima che il listener sia configurato correttamente,
 6	// assicurati di chiudere il canale.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot delle opzioni del server.
18	opts := s.getOpts()
19
20	// Imposta lo stato che può abilitare lo spegnimento
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Errore durante l'ascolto sulla porta: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("In ascolto per connessioni client su %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Avvisa di TLS abilitato.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS richiesto per le connessioni client")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("I client che non utilizzano l'opzione \"TLS Handshake First\" non riusciranno a connettersi")
38		}
39	}
40
41	// Se il server è stato avviato con RANDOM_PORT (-1), opts.Port sarebbe uguale
42	// a 0 all'inizio di questa funzione. Quindi dobbiamo ottenere la porta effettiva
43	if opts.Port == 0 {
44		// Scrivi la porta risolta nelle opzioni.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Ora che la porta è stata impostata (se era impostata su RANDOM), imposta l'Host/Port
49	// delle informazioni del server con i valori delle Opzioni o di ClientAdvertise.
50	if err := s.setInfoHostPort(); err != nil {
51		s.Fatalf("Errore durante l'impostazione delle INFO del server con il valore ClientAdvertise di %s, err=%v", opts.ClientAdvertise, err)
52		l.Close()
53		s.mu.Unlock()
54		return
55	}
56	// Tieni traccia degli URL di connessione client. Potremmo averne bisogno in seguito.
57	s.clientConnectURLs = s.getClientConnectURLs()
58	s.listener = l
59
60	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
61		func(_ error) bool {
62			if s.isLameDuckMode() {
63				// Segnala che non stiamo accettando nuovi client
64				s.ldmCh <- true
65				// Ora attendi lo spegnimento...
66				<-s.quitCh
67				return true
68			}
69			return false
70		})
71	s.mu.Unlock()
72
73	// Fai sapere al chiamante che siamo pronti
74	close(clr)
75	clr = nil
76}
  • Per riferimento, la funzione AcceptLoop esegue le seguenti operazioni: Queste sono parti relative alla comunicazione di rete, come TLS o hostPort, che possono essere omesse poiché non sono necessarie quando si esegue una comunicazione in-process.

Client

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect tenterà di connettersi al sistema NATS.
 5// L'URL può contenere la semantica username/password. es. nats://derek:pass@localhost:4222
 6// Sono supportati anche array separati da virgole, es. urlA, urlB.
 7// Le opzioni iniziano con i valori predefiniti ma possono essere sovrascritte.
 8// Per connettersi alla porta websocket di un server NATS, utilizzare lo schema `ws` o `wss`, come
 9// `ws://localhost:8080`. Si noti che gli schemi websocket non possono essere mescolati con altri (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Le opzioni possono essere utilizzate per creare una connessione personalizzata.
 4type Options struct {
 5	// Url rappresenta un singolo URL del server NATS a cui il client
 6	// si connetterà. Se è impostata anche l'opzione Servers, essa
 7	// diventa il primo server nell'array Servers.
 8	Url string
 9
10	// InProcessServer rappresenta un server NATS in esecuzione all'interno dello
11	// stesso processo. Se questo è impostato, tenteremo di connetterci
12	// direttamente al server anziché utilizzare connessioni TCP esterne.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • La funzione Connect, che gestisce la connessione tra il server NATS e il client NATS, può configurare l'URL del client e l'opzione di connessione; la struttura Options, che raggruppa queste opzioni, contiene un campo InProcessServer di tipo interfaccia InProcessConnProvider.
1// main.go of example code
2
3// Inizializza un nuovo server con le opzioni
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connettiti al server tramite connessione in-process
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Quando il client NATS esegue la connessione, se nats.InProcessServer(ns) viene passato al campo InProcessServer, allora:
 1// nats-go/nats.go
 2
 3// createConn si connetterà al server e avvolgerà le appropriate
 4// strutture bufio. Farà la cosa giusta quando una connessione
 5// esistente è in atto.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Se abbiamo un riferimento a un server in-process, allora stabiliamo una
15	// connessione usando quello.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("impossibile ottenere la connessione in-process: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • Questa interfaccia esegue il InProcessConn del InProcessServer nell'opzione, se l'opzione InProcessServer non è nil (valida) nella funzione createConn che crea la connessione, e quindi:
 1// nats-server/server/server.go
 2
 3// InProcessConn restituisce una connessione in-process al server,
 4// evitando la necessità di utilizzare un listener TCP per la connettività locale
 5// all'interno dello stesso processo. Questo può essere utilizzato indipendentemente dallo
 6// stato dell'opzione DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("impossibile creare la connessione")
16	}
17	return pr, nil
18}
  • Chiama ed esegue l'implementazione di InProcessConn nel server.
  • Questa funzione viene chiamata quando InProcessServer del nc (nats connection) nel client Go di nats, nats.go, non è nil, creando una connessione (net.Conn) e associandola alla connessione del server.

Consumer driven interface of Go

Un tipo implementa un'interfaccia implementando i suoi metodi. Non c'è una dichiarazione esplicita di intento, nessun parola chiave "implements". Le interfacce implicite disaccoppiano la definizione di un'interfaccia dalla sua implementazione, che potrebbe quindi apparire in qualsiasi pacchetto senza preaccordi.

Le interfacce sono implementate implicitamente, Un Tour di Go

Se un tipo esiste solo per implementare un'interfaccia e non avrà mai metodi esportati oltre a quell'interfaccia, non c'è bisogno di esportare il tipo stesso.

Generalità, Effective Go

  • Questo design dell'interfaccia incarna bene ciò che in Go viene comunemente definito "consumer defined interface" e "structural typing" (duck typing), quindi vorrei presentare anche questo argomento.
 1// nats-go/nats.go
 2
 3// Le opzioni possono essere utilizzate per creare una connessione personalizzata.
 4type Options struct {
 5	// Url rappresenta un singolo URL del server NATS a cui il client
 6	// si connetterà. Se è impostata anche l'opzione Servers, essa
 7	// diventa il primo server nell'array Servers.
 8	Url string
 9
10	// InProcessServer rappresenta un server NATS in esecuzione all'interno dello
11	// stesso processo. Se questo è impostato, tenteremo di connetterci
12	// direttamente al server anziché utilizzare connessioni TCP esterne.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Torniamo al codice. Nel client nats.go, il campo InProcessServer della struct option è definito come l'interfaccia InProcessConnProvider, che esegue solo InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn restituisce una connessione in-process al server,
 4// evitando la necessità di utilizzare un listener TCP per la connettività locale
 5// all'interno dello stesso processo. Questo può essere utilizzato indipendentemente dallo
 6// stato dell'opzione DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("impossibile creare la connessione")
16	}
17	return pr, nil
18}
  • Tuttavia, il tipo che vi rientra è il Server di nats-server, che svolge diverse funzioni oltre a InProcessConn.
  • Questo perché, in questa situazione, l'unica preoccupazione del client è se l'interfaccia InProcessConn è stata fornita o meno; altre questioni non sono di grande importanza.
  • Pertanto, il client nats.go sta utilizzando solo un'interfaccia definita dal consumatore, InProcessConnProvider, che definisce solo la funzionalità InProcessConn() (net.Conn, error).

Conclusion

  • Ho brevemente trattato la modalità embedded di NATS, il suo funzionamento e le interfacce definite dal consumatore di Go che possono essere osservate attraverso il codice di NATS.
  • Spero che queste informazioni siano utili a coloro che utilizzano NATS per scopi simili e con questo concludo questo articolo.