GoSuda

Comment les NATS embarqués communiquent-ils avec une application go ?

By prravda
views ...

Pour commencer

À propos de NATS

Les applications logicielles et les services doivent échanger des données. NATS est une infrastructure qui permet un tel échange de données, segmentées sous forme de messages. Nous appelons cela un "middleware orienté message".

Avec NATS, les développeurs d'applications peuvent :

  • Construire sans effort des applications client-serveur distribuées et évolutives.
  • Stocker et distribuer des données en temps réel de manière générale. Cela peut être réalisé de manière flexible à travers divers environnements, langages, fournisseurs de cloud et systèmes sur site.

Qu'est-ce que NATS, NATS docs

  • NATS est un message broker développé en Go.

NATS embarqué

Si votre application est en Go, et si cela correspond à votre cas d'utilisation et à vos scénarios de déploiement, vous pouvez même embarquer un serveur NATS à l'intérieur de votre application.

Embedding NATS, NATS docs

  • De plus, une particularité de NATS est qu'il prend en charge le mode embarqué pour les applications développées en Go.
  • En d'autres termes, au lieu de la méthode habituelle pour les message brokers, qui consiste à lancer un serveur broker distinct et à communiquer avec lui via le client de l'application, il est possible d'intégrer (embed) le broker lui-même directement dans une application Go.

Avantages et cas d'utilisation de NATS embarqué

  • Une vidéo Youtube bien expliquée est disponible, je m'y réfère donc via le lien.
  • Même sans déployer un serveur de message broker distinct, on peut profiter de l'avantage d'embarquer NATS pour créer une application modular monolith tout en atteignant la separation of concerns. De plus, un single binary deployment devient également possible.
  • Il peut être utile non seulement pour les plateformes sans réseau (WASM), mais aussi pour les applications offline-first.

Exemple sur les documents officiels

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialize new server with options
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start the server via goroutine
22    go ns.Start()
23
24    // Wait for server to be ready for connections
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Connect to server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Subscribe to the subject
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Print message data
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Shutdown the server (optional)
45        ns.Shutdown()
46    })
47
48    // Publish data to the subject
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Wait for server shutdown
52    ns.WaitForShutdown()
53}
  • L'exemple de NATS embarqué fourni dans la documentation officielle de NATS ne permet pas la communication en mode embarqué si l'on suit le code d'exemple tel quel.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Lorsque vous exécutez le fichier Go avec go run . tout en surveillant le trafic réseau vers le localhost (127.0.0.1) à l'aide de la commande watch 'netstat -an | grep 127.0.0.1', vous pouvez observer de nouvelles requêtes réseau émanant du port par défaut de NATS, le 4222.

Configurations correctes pour le mode embarqué

  • Pour que la communication s'effectue en mode embarqué comme prévu, les deux options suivantes sont nécessaires :

    • Client : L'option InProcessServer doit être incluse.
    • Serveur : Le flag DontListen doit être explicitement défini sur true dans Server.Options.
  • Ces aspects n'étaient pas officiellement documentés, et l'origine de cette fonctionnalité peut être retracée via cette PR.

    This PR adds three things:

    1. InProcessConn() function to Server which builds a net.Pipe to get a connection to the NATS server without using TCP sockets
    2. DontListen option which tells the NATS server not to listen on the usual TCP listener
    3. startupComplete channel, which is closed right before we start AcceptLoop, and readyForConnections will wait for it

    The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.

    An accompanying PR nats-io/nats.go#774 adds support to the client side.

    This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.

    /cc @nats-io/core

Exemple fonctionnel pour le mode embarqué

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// for configuring the embeded NATS server
14		// set DonListen as true
15		DontListen: true,
16	}
17
18	// Initialize new server with options
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start the server via goroutine
26	go ns.Start()
27
28	// Wait for server to be ready for connections
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Connect to server via in-process connection
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Subscribe to the subject
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Print message data
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Shutdown the server (optional)
49		ns.Shutdown()
50	})
51
52	// Publish data to the subject
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Wait for server shutdown
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • On peut maintenant constater qu'aucun saut de réseau supplémentaire ne se produit, comme prévu.

Sous le capot

TL;DR

diagram1

  • Ce diagramme de séquence illustre le fonctionnement interne des fonctions lorsque le code est exécuté dans main.go, et l'essentiel est le suivant :
    • Avec DontListen: true, le serveur ignore la phase d'écoute client appelée AcceptLoop.
    • Si l'option de connexion client InProcessServer est activée, une connexion in-memory est créée, un pipe est établi via net.Pipe, et l'extrémité du pipe est retournée au client en tant que type net.Conn.
    • Le client et le serveur communiquent in-process via cette connexion.

Serveur

AcceptLoop

1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Tout d'abord, si DontListen est true, la phase d'écoute client appelée AcceptLoop est ignorée.
 1// nats-server/server/server.go
 2
 3// AcceptLoop is exported for easier testing.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// If we were to exit before the listener is setup properly,
 6	// make sure we close the channel.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot server options.
18	opts := s.getOpts()
19
20	// Setup state that can enable shutdown
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Alert of TLS enabled.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS required for client connections")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38		}
39	}
40
41	// If server was started with RANDOM_PORT (-1), opts.Port would be equal
42	// to 0 at the beginning this function. So we need to get the actual port
43	if opts.Port == 0 {
44		// Write resolved port back to options.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Now that port has been set (if it was set to RANDOM), set the
49	// server's info Host/Port with either values from Options or
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Keep track of client connect URLs. We may need them later.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signal that we are not accepting new clients
65				s.ldmCh <- true
66				// Now wait for the Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Let the caller know that we are ready
75	close(clr)
76	clr = nil
77}
  • L'exécution de la fonction AcceptLoop implique les étapes suivantes : les sections liées à la communication réseau, telles que TLS ou hostPort, sont superflues pour la communication in-process et peuvent donc être omises.

Client

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect will attempt to connect to the NATS system.
 5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
 6// Comma separated arrays are also supported, e.g. urlA, urlB.
 7// Options start with the defaults but can be overridden.
 8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
 9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • La fonction Connect, qui établit la connexion entre le serveur NATS et le client NATS, permet de définir l'URL du client et les options de connexion. La structure Options, qui regroupe ces options, contient un champ InProcessServer de type interface InProcessConnProvider.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Lorsque la connexion client NATS est établie avec nats.InProcessServer(ns) comme champ InProcessServer,
 1// nats-go/nats.go
 2
 3// InProcessServer is an Option that will try to establish a direction to a NATS server
 4// running within the process instead of dialing via TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • L'option InProcessServer est remplacée par le serveur NATS embarqué, et
 1// nats-go/nats.go
 2
 3// createConn will connect to the server and wrap the appropriate
 4// bufio structures. It will do the right thing when an existing
 5// connection is in place.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// If we have a reference to an in-process server then establish a
15	// connection using that.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • Cette interface est exécutée dans la fonction createConn qui établit la connexion. Si l'option InProcessServer n'est pas nil (valide), la fonction InProcessConn de l'option InProcessServer est exécutée, et
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • La fonction InProcessConn implémentée sur le serveur est appelée et exécutée.
  • Cette fonction est appelée depuis le client Go de NATS (nats.go) si le champ InProcessServer de nc (connexion NATS) n'est pas nil. Elle crée une connexion (net.Conn) et la lie à la connexion du serveur.

Interface pilotée par le consommateur de Go

Un type implémente une interface en implémentant ses méthodes. Il n'y a pas de déclaration d'intention explicite, pas de mot-clé "implements". Les interfaces implicites découplent la définition d'une interface de son implémentation, qui peut alors apparaître dans n'importe quel package sans arrangement préalable.

Interfaces are implemented implicitly, A Tour of Go

If a type exists only to implement an interface and will never have exported methods beyond that interface, there is no need to export the type itself.

Generality, Effective Go

  • Cette conception d'interface illustre bien ce que l'on appelle couramment en Go l'interface définie par le consommateur et le structural typing (duck typing). Je souhaite donc aborder ces sujets.
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Revenons au code. Dans le client nats.go, le champ de structure d'option InProcessServer est défini comme une interface InProcessConnProvider qui n'exécute que InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Cependant, le type qui y est inséré est le Server de nats-server, qui exécute non seulement InProcessConn mais aussi diverses autres fonctions.
  • En effet, dans ce scénario, le client ne se soucie que de savoir si l'interface InProcessConn a été fournie ou non ; les autres aspects ne sont pas d'une grande importance.
  • Par conséquent, le client nats.go ne crée et n'utilise qu'une interface définie par le consommateur, InProcessConnProvider, qui ne définit que la fonctionnalité InProcessConn() (net.Conn, error).

Conclusion

  • Nous avons brièvement abordé le mode embarqué de NATS, son fonctionnement, ainsi que l'interface définie par le consommateur en Go, que l'on peut observer à travers le code de NATS.
  • J'espère que ces informations seront utiles à ceux qui utilisent NATS dans les buts mentionnés ci-dessus, et je conclurai cet article.