GoSuda

Come comunicano le NATS embedded con l'applicazione Go?

By prravda
views ...

Getting started

About NATS

Le applicazioni e i servizi software necessitano di scambiare dati. NATS è un'infrastruttura che consente tale scambio di dati, segmentato sotto forma di messaggi. Questo lo definiamo un "middleware orientato ai messaggi".

Con NATS, gli sviluppatori di applicazioni possono:

  • Costruire senza sforzo applicazioni client-server distribuite e scalabili.
  • Archiviare e distribuire dati in tempo reale in modo generale. Questo può essere raggiunto in modo flessibile attraverso vari ambienti, linguaggi, cloud provider e sistemi on-premises.

What is NATS, NATS docs

  • NATS è un message broker composto in Go.

Embedded NATS

Se la vostra applicazione è in Go, e se si adatta al vostro caso d'uso e agli scenari di deployment, potete persino incorporare un server NATS all'interno della vostra applicazione.

Embedding NATS, NATS docs

  • E c'è una particolarità di NATS: supporta la modalità embedded per le applicazioni scritte in Go.
  • In altre parole, invece del metodo tradizionale dei message broker, che prevede l'avvio di un server broker separato e la comunicazione tramite il client dell'applicazione con tale server, è possibile incorporare (embed) il broker stesso nell'applicazione Go.

Benefits and use cases of embedded NATS

  • C'è un video di YouTube ben spiegato, quindi mi limiterò a fornire il link al video.
  • Anche senza deployare un server message broker separato, è possibile creare un'applicazione modular monolith per ottenere la separatezza delle preoccupazioni e sfruttare il vantaggio di incorporare NATS. Inoltre, è possibile un single binary deployment.
  • Può essere utilmente impiegato in piattaforme senza rete (WASM) e in applicazioni offline-first.

Example on official docs

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Inizializza un nuovo server con le opzioni
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Avvia il server tramite goroutine
22    go ns.Start()
23
24    // Attendi che il server sia pronto per le connessioni
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Connettiti al server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Sottoscriviti all'argomento
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Stampa i dati del messaggio
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Spegni il server (opzionale)
45        ns.Shutdown()
46    })
47
48    // Pubblica dati sull'argomento
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Attendi lo spegnimento del server
52    ns.WaitForShutdown()
53}
  • Questo è un esempio di NATS embedded fornito nella documentazione ufficiale di NATS, ma se si segue questo codice di esempio, la comunicazione non avviene in modalità embedding.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Eseguendo il comando watch 'netstat -an | grep 127.0.0.1' per monitorare il traffico di rete da e verso localhost (127.0.0.1) e poi eseguendo il file Go con go run ., si noterà l'aggiunta di nuove richieste di rete che partono dalla porta 4222, la porta predefinita di NATS.

Right configurations for embedding mode

  • Per comunicare in modalità embedded come previsto, sono necessarie le seguenti due opzioni:

    • Client: È necessario includere l'opzione InProcessServer.
    • Server: Nel Server.Options, è necessario specificare il flag DontListen come true.
  • Queste parti non sono state documentate ufficialmente, e l'inizio di questa funzionalità può essere individuato attraverso questa PR.

    Questa PR aggiunge tre elementi:

    1. La funzione InProcessConn() a Server che crea una net.Pipe per ottenere una connessione al server NATS senza utilizzare socket TCP.
    2. L'opzione DontListen che indica al server NATS di non mettersi in ascolto sul consueto listener TCP.
    3. Il canale startupComplete, che viene chiuso poco prima di avviare AcceptLoop, e readyForConnections lo attenderà.

    La motivazione principale di ciò è che abbiamo un'applicazione che può funzionare sia in modalità monolitica (single-process) sia in modalità polilitica (multi-process). Vorremmo poter utilizzare NATS per entrambe le modalità per semplicità, ma la modalità monolitica deve essere in grado di soddisfare una varietà di piattaforme dove l'apertura di connessioni socket o non ha senso (mobile) o semplicemente non è possibile (WASM). Queste modifiche ci permetteranno di utilizzare NATS interamente in-process.

    Una PR di accompagnamento nats-io/nats.go#774 aggiunge il supporto lato client.

    Questa è la mia prima PR a questo progetto, quindi mi scuso in anticipo se ho tralasciato qualcosa di ovvio.

    /cc @nats-io/core

Working Example for embedded mode

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// per configurare il server NATS embedded
14		// imposta DonListen su true
15		DontListen: true,
16	}
17
18	// Inizializza un nuovo server con le opzioni
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Avvia il server tramite goroutine
26	go ns.Start()
27
28	// Attendi che il server sia pronto per le connessioni
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Connettiti al server tramite connessione in-process
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Sottoscriviti all'argomento
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Stampa i dati del messaggio
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Spegni il server (opzionale)
49		ns.Shutdown()
50	})
51
52	// Pubblica dati sull'argomento
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Attendi lo spegnimento del server
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Ora si può osservare che non si verificano ulteriori network hop come previsto.

Under the hood

TL;DR

diagram1

  • Questo è un diagramma di sequenza che mostra come le funzioni interne operano quando il codice viene eseguito in main.go, e la sua essenza è la seguente:
    • Tramite DontListen: true, il server salta la fase di ascolto del client chiamata AcceptLoop.
    • Se l'opzione InProcessServer del client è attiva, viene creata una connessione in-memory e una pipe tramite net.Pipe, quindi l'estremità della pipe viene restituita al client come tipo net.Conn.
    • Client e server comunicano in-process tramite questa connessione.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Attendi i client.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • In primo luogo, se DontListen è true, la fase di ascolto del client chiamata AcceptLoop viene omessa.
 1// nats-server/server/server.go
 2
 3// AcceptLoop è esportata per facilitare i test.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Se dovessimo uscire prima che il listener sia configurato correttamente,
 6	// assicuriamoci di chiudere il canale.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot delle opzioni del server.
18	opts := s.getOpts()
19
20	// Imposta lo stato che può abilitare lo spegnimento
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Avvisa se TLS è abilitato.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS richiesto per le connessioni client")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("I client che non utilizzano l'opzione \"TLS Handshake First\" falliranno la connessione")
38		}
39	}
40
41	// Se il server è stato avviato con RANDOM_PORT (-1), opts.Port sarebbe uguale
42	// a 0 all'inizio di questa funzione. Quindi dobbiamo ottenere la porta effettiva
43	if opts.Port == 0 {
44		// Scrivi la porta risolta di nuovo nelle opzioni.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Ora che la porta è stata impostata (se era impostata su RANDOM), imposta l'Host/Port
49	// delle informazioni del server con i valori delle Opzioni o
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Tieni traccia degli URL di connessione del client. Potremmo averne bisogno in seguito.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Segnala che non stiamo accettando nuovi client
65				s.ldmCh <- true
66				// Ora attendi lo spegnimento...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Fai sapere al chiamante che siamo pronti
75	close(clr)
76	clr = nil
77}
  • Si noti che la funzione AcceptLoop esegue i seguenti passaggi. Poiché si tratta di parti relative alla comunicazione di rete, come TLS o hostPort, e sono superflue per la comunicazione in-process, possono essere omesse.

Client

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect tenterà di connettersi al sistema NATS.
 5// L'URL può contenere semantiche username/password. es. nats://derek:pass@localhost:4222
 6// Sono supportati anche array separati da virgole, es. urlA, urlB.
 7// Le opzioni partono dai valori predefiniti ma possono essere sovrascritte.
 8// Per connettersi alla porta websocket di un server NATS, usare lo schema `ws` o `wss`, come
 9// `ws://localhost:8080`. Si noti che gli schemi websocket non possono essere mescolati con altri (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Le opzioni possono essere utilizzate per creare una connessione personalizzata.
 4type Options struct {
 5	// Url rappresenta un singolo URL del server NATS a cui il client
 6	// si connetterà. Se è impostata anche l'opzione Servers,
 7	// diventa il primo server nell'array Servers.
 8	Url string
 9
10	// InProcessServer rappresenta un server NATS in esecuzione all'interno dello
11	// stesso processo. Se questo è impostato, tenteremo di connetterci
12	// direttamente al server anziché utilizzare connessioni TCP esterne.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • La funzione Connect, che gestisce la connessione tra il server NATS e il client NATS, può configurare l'URL del client e l'opzione di connessione. La struct Options, che raccoglie queste opzioni, contiene un campo InProcessServer di tipo interfaccia InProcessConnProvider.
1// main.go of example code
2
3// Inizializza un nuovo server con le opzioni
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connettiti al server tramite connessione in-process
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Quando il client NATS procede con la connessione, passando nats.InProcessServer(ns) al campo InProcessServer:
 1// nats-go/nats.go
 2
 3// InProcessServer è un'opzione che tenterà di stabilire una direzione verso un server NATS
 4// in esecuzione all'interno del processo anziché tramite TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • L'InProcessServer dell'opzione viene sostituito con il server NATS embedded.
 1// nats-go/nats.go
 2
 3// createConn si connetterà al server e avvolgerà le appropriate
 4// strutture bufio. Farà la cosa giusta quando una connessione esistente
 5// è già presente.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Se abbiamo un riferimento a un server in-process, stabiliamo una
15	// connessione usando quello.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • Questa interfaccia, quando l'opzione InProcessServer non è nil (valida) nella funzione createConn che crea la connessione, esegue InProcessConn dell'InProcessServer presente nell'opzione.
 1// nats-server/server/server.go
 2
 3// InProcessConn restituisce una connessione in-process al server,
 4// evitando la necessità di utilizzare un listener TCP per la connettività locale
 5// all'interno dello stesso processo. Questo può essere utilizzato indipendentemente dallo
 6// stato dell'opzione DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • La funzione InProcessConn implementata sul server viene invocata ed eseguita.
  • Questa funzione, se il campo InProcessServer di nc (nats connection) nel client Go di NATS (nats.go) non è nil, viene chiamata per creare una connessione (net.Conn) e la lega alla connessione del server.

Consumer driven interface of Go

Un tipo implementa un'interfaccia implementandone i metodi. Non c'è una dichiarazione esplicita di intenti, nessuna parola chiave "implements". Le interfacce implicite disaccoppiano la definizione di un'interfaccia dalla sua implementazione, che potrebbe quindi apparire in qualsiasi pacchetto senza preaccordi.

Interfaces are implemented implicitly, A Tour of Go

Se un tipo esiste solo per implementare un'interfaccia e non avrà mai metodi esportati oltre quell'interfaccia, non è necessario esportare il tipo stesso.

Generality, Effective Go

  • Questo design dell'interfaccia cattura bene le interfacce definite dal consumatore e il structural typing (duck typing) comuni in Go, quindi vorrei presentare anche questo argomento.
 1// nats-go/nats.go
 2
 3// Le opzioni possono essere utilizzate per creare una connessione personalizzata.
 4type Options struct {
 5	// Url rappresenta un singolo URL del server NATS a cui il client
 6	// si connetterà. Se è impostata anche l'opzione Servers,
 7	// diventa il primo server nell'array Servers.
 8	Url string
 9
10	// InProcessServer rappresenta un server NATS in esecuzione all'interno dello
11	// stesso processo. Se questo è impostato, tenteremo di connetterci
12	// direttamente al server anziché utilizzare connessioni TCP esterne.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Torniamo al codice. Nel client nats.go, il campo InProcessServer della struct option è definito come un'interfaccia InProcessConnProvider che esegue solo InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn restituisce una connessione in-process al server,
 4// evitando la necessità di utilizzare un listener TCP per la connettività locale
 5// all'interno dello stesso processo. Questo può essere utilizzato indipendentemente dallo
 6// stato dell'opzione DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Tuttavia, il tipo che vi viene inserito è il Server di nats-server, che esegue una varietà di funzioni oltre a InProcessConn.
  • Questo perché l'interesse del client in questa situazione è solo se l'interfaccia InProcessConn è stata fornita o meno; le altre cose non sono particolarmente importanti.
  • Pertanto, il client nats.go sta utilizzando solo un'interfaccia definita dal consumatore, InProcessConnProvider, che definisce solo la funzionalità InProcessConn() (net.Conn, error).

Conclusion

  • Ho trattato brevemente la modalità embedded di NATS, il suo funzionamento e le interfacce definite dal consumatore in Go che possono essere verificate attraverso il codice di NATS.
  • Spero che queste informazioni siano utili a coloro che utilizzano NATS per scopi simili e con questo concludo questo articolo.