GoSuda

Comment les NATS embarqués communiquent-ils avec une application go ?

By prravda
views ...

Pour commencer

À propos de NATS

Les applications logicielles et les services doivent échanger des données. NATS est une infrastructure qui permet un tel échange de données, segmenté sous la forme de messages. Nous appelons cela un "middleware orienté message".

Avec NATS, les développeurs d'applications peuvent :

  • Construire sans effort des applications client-serveur distribuées et évolutives.
  • Stocker et distribuer des données en temps réel de manière générale. Cela peut être réalisé de manière flexible à travers divers environnements, langages, fournisseurs de cloud et systèmes sur site.

Qu'est-ce que NATS, documentation NATS

  • NATS est un message broker construit en Go.

NATS embarqué

Si votre application est en Go, et si cela correspond à votre cas d'utilisation et à vos scénarios de déploiement, vous pouvez même embarquer un serveur NATS au sein de votre application.

Intégration de NATS, documentation NATS

  • Une particularité de NATS est qu'il prend en charge le mode embarqué pour les applications développées en Go.
  • En d'autres termes, au lieu du mode habituel des message brokers, qui consiste à démarrer un serveur broker séparé puis à communiquer avec lui via le client de l'application, il est possible d'embarquer (embed) le broker lui-même dans une application Go.

Avantages et cas d'utilisation de NATS embarqué

  • Il existe une vidéo Youtube bien expliquée, que je substitue ici par un lien.
  • Même sans déployer un serveur de message broker séparé, il est possible de créer une application modular monolith et d'atteindre la separation of concerns tout en bénéficiant de l'avantage d'intégrer NATS en mode embarqué. De plus, un single binary deployment devient également possible.
  • Il peut être utilisé de manière utile non seulement pour les platform with no network (WASM), mais aussi pour les applications offline-first.

Exemple sur la documentation officielle

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialise un nouveau serveur avec des options
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Démarre le serveur via une goroutine
22    go ns.Start()
23
24    // Attend que le serveur soit prêt pour les connexions
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("pas prêt pour la connexion")
27    }
28
29    // Se connecte au serveur
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // S'abonne au sujet
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Affiche les données du message
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Arrête le serveur (facultatif)
45        ns.Shutdown()
46    })
47
48    // Publie des données sur le sujet
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Attend l'arrêt du serveur
52    ns.WaitForShutdown()
53}
  • L'exemple de NATS embarqué fourni par la documentation officielle de NATS ne permet pas la communication en mode embarqué si l'on suit ce code d'exemple.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • En utilisant la commande watch 'netstat -an | grep 127.0.0.1' pour vérifier le trafic réseau vers le localhost (127.0.0.1) et en exécutant le fichier Go avec go run ., on peut observer de nouvelles requêtes réseau supplémentaires provenant du port par défaut de NATS, 4222.

Configurations correctes pour le mode embarqué

  • Pour que la communication s'effectue en mode embarqué comme prévu, les deux options suivantes sont nécessaires.

    • Client : L'option InProcessServer doit être ajoutée.
    • Serveur : Le flag DontListen dans Server.Options doit être explicitement défini à true.
  • Ces aspects n'étaient pas officiellement documentés, et l'origine de cette fonctionnalité peut être identifiée via cette PR.

    Cette PR ajoute trois éléments :

    1. Une fonction InProcessConn() à Server qui construit un net.Pipe pour obtenir une connexion au serveur NATS sans utiliser de sockets TCP.
    2. Une option DontListen qui indique au serveur NATS de ne pas écouter sur le listener TCP habituel.
    3. Un canal startupComplete, qui est fermé juste avant de démarrer AcceptLoop, et readyForConnections l'attendra.

    La motivation principale est que nous avons une application qui peut fonctionner soit en mode monolithe (single-process) soit en mode polylithe (multi-process). Nous aimerions pouvoir utiliser NATS pour les deux modes par souci de simplicité, mais le mode monolithe doit être capable de s'adapter à une variété de plateformes où l'ouverture de connexions socket n'a pas de sens (mobile) ou n'est tout simplement pas possible (WASM). Ces changements nous permettront d'utiliser NATS entièrement in-process à la place.

    Une PR complémentaire nats-io/nats.go#774 ajoute le support côté client.

    C'est ma première PR sur ce projet, donc je m'excuse d'avance si j'ai manqué quelque chose d'évident.

    /cc @nats-io/core

Exemple fonctionnel pour le mode embarqué

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// pour configurer le serveur NATS embarqué
14		// définir DontListen à true
15		DontListen: true,
16	}
17
18	// Initialise un nouveau serveur avec des options
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Démarre le serveur via une goroutine
26	go ns.Start()
27
28	// Attend que le serveur soit prêt pour les connexions
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("pas prêt pour la connexion")
31	}
32
33	// Se connecte au serveur via une connexion in-process
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// S'abonne au sujet
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Affiche les données du message
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Arrête le serveur (facultatif)
49		ns.Shutdown()
50	})
51
52	// Publie des données sur le sujet
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Attend l'arrêt du serveur
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • On peut maintenant observer qu'aucun network hop supplémentaire ne se produit, comme prévu.

Sous le capot

TL;DR

diagram1

  • Il s'agit d'un diagramme de séquence illustrant quelles fonctions sont exécutées et comment elles fonctionnent en interne lorsque ce code est exécuté depuis main.go, et les points essentiels sont les suivants :
    • Avec DontListen: true, le serveur ignore la phase d'écoute du client appelée AcceptLoop.
    • Si l'option InProcessServer du client est activée, une connexion in-memory est créée, un pipe est établi via net.Pipe, puis l'extrémité du pipe est renvoyée au client sous le type net.Conn.
    • Le client et le serveur communiquent in-process via cette connexion.

Serveur

AcceptLoop

1// nats-server/server/server.go
2
3// Attend les clients.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Tout d'abord, si DontListen est à true, la phase d'écoute du client appelée AcceptLoop est ignorée.
 1// nats-server/server/server.go
 2
 3// AcceptLoop est exporté pour faciliter les tests.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Si nous devions sortir avant que le listener ne soit correctement configuré,
 6	// assurez-vous de fermer le canal.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Capture les options du serveur.
18	opts := s.getOpts()
19
20	// Configure l'état qui peut activer l'arrêt
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Erreur d'écoute sur le port : %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Écoute des connexions client sur %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Alerte si TLS est activé.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS requis pour les connexions client")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Les clients n'utilisant pas l'option \"TLS Handshake First\" échoueront à se connecter")
38		}
39	}
40
41	// Si le serveur a été démarré avec RANDOM_PORT (-1), opts.Port serait égal
42	// à 0 au début de cette fonction. Nous devons donc obtenir le port réel
43	if opts.Port == 0 {
44		// Écrit le port résolu dans les options.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Maintenant que le port a été défini (s'il était défini à RANDOM), définissez le
49	// Host/Port d'info du serveur avec les valeurs des Options ou
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Erreur de définition de l'INFO du serveur avec la valeur ClientAdvertise de %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Garde une trace des URLs de connexion client. Nous pourrions en avoir besoin plus tard.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signale que nous n'acceptons pas de nouveaux clients
65				s.ldmCh <- true
66				// Attend maintenant l'arrêt...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Informe l'appelant que nous sommes prêts
75	close(clr)
76	clr = nil
77}
  • À titre de référence, la fonction AcceptLoop effectue les étapes suivantes. Ces parties sont liées à la communication réseau, comme TLS ou hostPort, et peuvent être omises car elles sont superflues pour la communication in-process.

Client

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect tentera de se connecter au système NATS.
 5// L'URL peut contenir une sémantique de nom d'utilisateur/mot de passe. Par exemple, nats://derek:pass@localhost:4222
 6// Les tableaux séparés par des virgules sont également pris en charge, par exemple, urlA, urlB.
 7// Les options commencent avec les valeurs par défaut mais peuvent être remplacées.
 8// Pour se connecter au port websocket d'un serveur NATS, utilisez le schéma `ws` ou `wss`, tel que
 9// `ws://localhost:8080`. Notez que les schémas websocket ne peuvent pas être mélangés avec d'autres (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Les options peuvent être utilisées pour créer une connexion personnalisée.
 4type Options struct {
 5	// L'URL représente une seule URL de serveur NATS à laquelle le client
 6	// se connectera. Si l'option Servers est également définie, elle
 7	// devient alors le premier serveur dans le tableau Servers.
 8	Url string
 9
10	// InProcessServer représente un serveur NATS exécuté au sein du
11	// même processus. Si cette option est définie, nous tenterons de nous connecter
12	// directement au serveur plutôt que d'utiliser des connexions TCP externes.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • La fonction Connect qui établit la connexion entre le serveur NATS et le client NATS permet de définir l'URL du client et l'Option de connexion. La structure Options, qui regroupe ces options, contient un champ InProcessServer de type interface InProcessConnProvider.
1// main.go du code d'exemple
2
3// Initialise un nouveau serveur avec des options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Se connecte au serveur via une connexion in-process
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Lorsque le client NATS effectue une connexion, si nats.InProcessServer(ns) est transmis au champ InProcessServer,
 1// nats-go/nats.go
 2
 3// InProcessServer est une Option qui tentera d'établir une direction vers un serveur NATS
 4// exécuté au sein du processus au lieu de composer via TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • l'InProcessServer de l'option est remplacé par le serveur NATS embarqué, et
 1// nats-go/nats.go
 2
 3// createConn se connectera au serveur et encapsulera les structures
 4// bufio appropriées. Il fera ce qu'il faut lorsqu'une connexion existante
 5// est en place.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Si nous avons une référence à un serveur in-process, établissons une
15	// connexion en l'utilisant.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("échec de l'obtention de la connexion in-process : %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • cette interface est exécutée en appelant la fonction InProcessConn de l'InProcessServer dans l'option si l'option InProcessServer n'est pas nil (valide) dans la fonction createConn qui crée la connexion.
 1// nats-server/server/server.go
 2
 3// InProcessConn renvoie une connexion in-process au serveur,
 4// évitant la nécessité d'utiliser un écouteur TCP pour la connectivité locale
 5// au sein du même processus. Cela peut être utilisé quelle que soit la
 6// valeur de l'option DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("échec de la création de la connexion")
16	}
17	return pr, nil
18}
  • la fonction InProcessConn implémentée dans le serveur est appelée et exécutée.
  • Cette fonction est appelée si l'InProcessServer de nc (connexion nats) dans le client go de nats, nats.go, n'est pas nil. Elle crée une connexion (net.Conn) et la lie à la connexion du serveur.

Interface pilotée par le consommateur de Go

Un type implémente une interface en implémentant ses méthodes. Il n'y a pas de déclaration explicite d'intention, pas de mot-clé "implements". Les interfaces implicites découplent la définition d'une interface de son implémentation, qui pourrait alors apparaître dans n'importe quel package sans arrangement préalable.

Les interfaces sont implémentées implicitement, A Tour of Go

Si un type n'existe que pour implémenter une interface et n'aura jamais de méthodes exportées au-delà de cette interface, il n'est pas nécessaire d'exporter le type lui-même.

Généralité, Effective Go

  • Cette conception d'interface incarne bien ce que l'on appelle couramment en Go l'interface définie par le consommateur et le structural typing (duck typing), c'est pourquoi j'aimerais également présenter ce sujet.
 1// nats-go/nats.go
 2
 3// Les options peuvent être utilisées pour créer une connexion personnalisée.
 4type Options struct {
 5	// L'URL représente une seule URL de serveur NATS à laquelle le client
 6	// se connectera. Si l'option Servers est également définie, elle
 7	// devient alors le premier serveur dans le tableau Servers.
 8	Url string
 9
10	// InProcessServer représente un serveur NATS exécuté au sein du
11	// même processus. Si cette option est définie, nous tenterons de nous connecter
12	// directement au serveur plutôt que d'utiliser des connexions TCP externes.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Revenons au code. Le champ InProcessServer de la structure d'options dans le client nats.go est défini comme une interface InProcessConnProvider qui n'exécute que InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn renvoie une connexion in-process au serveur,
 4// évitant la nécessité d'utiliser un écouteur TCP pour la connectivité locale
 5// au sein du même processus. Cela peut être utilisé quelle que soit la
 6// valeur de l'option DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("échec de la création de la connexion")
16	}
17	return pr, nil
18}
  • Cependant, le type qui y est inséré est le Server de nats-server, qui exécute diverses fonctions en plus d'InProcessConn.
  • En effet, dans cette situation, le client ne se soucie que de savoir si une interface InProcessConn a été fournie ou non ; les autres aspects ne sont pas d'une grande importance.
  • Par conséquent, le client nats.go crée et utilise uniquement une interface définie par le consommateur appelée InProcessConnProvider, qui ne définit que la fonctionnalité InProcessConn() (net.Conn, error).

Conclusion

  • Nous avons brièvement abordé le mode embarqué de NATS, son fonctionnement, ainsi que l'interface définie par le consommateur de Go, telle qu'on peut l'observer à travers le code de NATS.
  • J'espère que ces informations seront utiles à ceux qui utilisent NATS à des fins similaires, et c'est sur cette note que je conclurai cet article.