GoSuda

Wie kommunizieren eingebettete NATS mit einer Go-Anwendung?

By prravda
views ...

Erste Schritte

Über NATS

Softwareanwendungen und -dienste müssen Daten austauschen. NATS ist eine Infrastruktur, die einen solchen Datenaustausch ermöglicht, segmentiert in Form von Nachrichten. Wir nennen dies eine „nachrichtenorientierte Middleware“.

Mit NATS können Anwendungsentwickler:

  • Mühelos verteilte und skalierbare Client-Server-Anwendungen erstellen.
  • Daten in Echtzeit auf allgemeine Weise speichern und verteilen. Dies kann flexibel über verschiedene Umgebungen, Sprachen, Cloud-Anbieter und On-Premises-Systeme hinweg erreicht werden.

What is NATS, NATS docs

  • NATS ist ein in Go entwickelter Message Broker.

Embedded NATS

Wenn Ihre Anwendung in Go geschrieben ist und zu Ihrem Anwendungsfall und Ihren Bereitstellungsszenarien passt, können Sie sogar einen NATS-Server in Ihre Anwendung einbetten.

Embedding NATS, NATS docs

  • Eine Besonderheit von NATS ist, dass es für in Go entwickelte Anwendungen einen embedded mode unterstützt.
  • Das bedeutet, anstatt den üblichen Ansatz eines Message Brokers zu verfolgen, bei dem ein separater Broker-Server gestartet und die Kommunikation über Clients der Anwendung mit diesem Server erfolgt, kann der Broker selbst in eine in Go erstellte Anwendung eingebettet werden.

Vorteile und Anwendungsfälle von embedded NATS

  • Es gibt ein gut erklärtes Youtube-Video, auf das ich verweisen möchte.
  • Selbst ohne die Bereitstellung eines separaten Message Broker Servers kann man die Vorteile nutzen, ein modular monolith application zu erstellen und dabei separate of concern zu erreichen, indem NATS embedded integriert wird. Zusätzlich wird ein single binary deployment ermöglicht.
  • Dies kann nicht nur für platform with no network (WASM) nützlich sein, sondern auch für offline-first application.

Beispiel in der offiziellen Dokumentation

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialisiert einen neuen Server mit Optionen
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Startet den Server über eine Goroutine
22    go ns.Start()
23
24    // Wartet, bis der Server für Verbindungen bereit ist
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Verbindet sich mit dem Server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Abonniert das Thema
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Druckt die Nachrichtendaten
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Fährt den Server herunter (optional)
45        ns.Shutdown()
46    })
47
48    // Veröffentlicht Daten zu dem Thema
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Wartet auf das Herunterfahren des Servers
52    ns.WaitForShutdown()
53}
  • Das von der offiziellen NATS-Dokumentation bereitgestellte Beispiel für Embedded NATS führt bei Ausführung nicht zu einer Kommunikation im embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Wenn man den Befehl watch 'netstat -an | grep 127.0.0.1' verwendet, um den Netzwerkverkehr zum und vom localhost (127.0.0.1) zu überwachen und dann die Go-Datei mit go run . ausführt, kann man sehen, dass neue Netzwerkanfragen, die vom Standard-NATS-Port 4222 ausgehen, hinzugefügt werden.

Richtige Konfigurationen für den Embedding Mode

  • Um die Kommunikation im embedded mode wie beabsichtigt zu ermöglichen, sind die folgenden zwei Optionen erforderlich:

    • Client: Die Option InProcessServer muss hinzugefügt werden.
    • Server: Das Flag DontListen in Server.Options muss auf true gesetzt werden.
  • Diese Aspekte sind nicht offiziell dokumentiert, aber der Ursprung dieser Funktionalität lässt sich durch diesen PR nachvollziehen.

    This PR adds three things:

    1. InProcessConn() function to Server which builds a net.Pipe to get a connection to the NATS server without using TCP sockets
    2. DontListen option which tells the NATS server not to listen on the usual TCP listener
    3. startupComplete channel, which is closed right before we start AcceptLoop, and readyForConnections will wait for it

    The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.

    An accompanying PR nats-io/nats.go#774 adds support to the client side.

    This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.

    /cc @nats-io/core

Funktionierendes Beispiel für den Embedded Mode

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// zur Konfiguration des eingebetteten NATS-Servers
14		// DontListen auf true setzen
15		DontListen: true,
16	}
17
18	// Initialisiert einen neuen Server mit Optionen
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Startet den Server über eine Goroutine
26	go ns.Start()
27
28	// Wartet, bis der Server für Verbindungen bereit ist
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Verbindet sich mit dem Server über eine In-Process-Verbindung
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Abonniert das Thema
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Druckt die Nachrichtendaten
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Fährt den Server herunter (optional)
49		ns.Shutdown()
50	})
51
52	// Veröffentlicht Daten zu dem Thema
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Wartet auf das Herunterfahren des Servers
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Nun ist ersichtlich, dass keine zusätzlichen network hops mehr auftreten, wie beabsichtigt.

Under the hood

TL;DR

diagram1

  • Dies ist ein sequence diagram, das zeigt, wie die internen Funktionen ablaufen, wenn der Code in main.go ausgeführt wird. Die Hauptpunkte sind wie folgt:
    • Durch DontListen: true überspringt der Server die client listening phase, die als AcceptLoop bezeichnet wird.
    • Wenn die InProcessServer-Option des Clients aktiviert ist, wird eine in-memory connection erstellt und über net.Pipe eine Pipe aufgebaut, wobei das Ende der Pipe als net.Conn an den Client zurückgegeben wird.
    • Client und Server kommunizieren über diese Verbindung in-process.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Wartet auf Clients.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Zunächst, wenn DontListen auf true gesetzt ist, wird die client listening phase, genannt AcceptLoop, übersprungen.
 1// nats-server/server/server.go
 2
 3// AcceptLoop wird zum einfacheren Testen exportiert.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Falls wir beenden sollten, bevor der Listener ordnungsgemäß eingerichtet ist,
 6	// stellen wir sicher, dass wir den Kanal schließen.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot der Serveroptionen.
18	opts := s.getOpts()
19
20	// Zustand einrichten, der das Herunterfahren ermöglichen kann
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Benachrichtigung über aktiviertes TLS.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS required for client connections")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38		}
39	}
40
41	// Wenn der Server mit RANDOM_PORT (-1) gestartet wurde, wäre opts.Port am Anfang
42	// dieser Funktion gleich 0. Wir müssen also den tatsächlichen Port abrufen.
43	if opts.Port == 0 {
44		// Den aufgelösten Port zurück in die Optionen schreiben.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Nachdem der Port gesetzt wurde (falls er auf RANDOM gesetzt war), setzen wir die
49	// Host/Port-Informationen des Servers entweder mit Werten aus Options oder
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Speichern der Client-Verbindungs-URLs. Wir könnten sie später benötigen.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signalisiert, dass wir keine neuen Clients akzeptieren
65				s.ldmCh <- true
66				// Nun auf das Herunterfahren warten...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Den Aufrufer informieren, dass wir bereit sind
75	close(clr)
76	clr = nil
77}
  • Die AcceptLoop-Funktion führt übrigens folgende Schritte aus: Sie behandelt netzwerkbezogene Aspekte wie TLS oder hostPort, die bei in-process communication irrelevant sind und daher übersprungen werden können.

Client

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect versucht, eine Verbindung zum NATS-System herzustellen.
 5// Die URL kann Benutzernamen/Passwort-Semantik enthalten. z.B. nats://derek:pass@localhost:4222
 6// Kommagetrennte Arrays werden ebenfalls unterstützt, z.B. urlA, urlB.
 7// Optionen beginnen mit den Standardwerten, können aber überschrieben werden.
 8// Um sich mit dem Websocket-Port eines NATS-Servers zu verbinden, verwenden Sie das Schema `ws` oder `wss`, z.B.
 9// `ws://localhost:8080`. Beachten Sie, dass Websocket-Schemata nicht mit anderen (nats/tls) gemischt werden können.
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Optionen können verwendet werden, um eine angepasste Verbindung zu erstellen.
 4type Options struct {
 5	// Url repräsentiert eine einzelne NATS-Server-URL, mit der sich der Client
 6	// verbinden wird. Wenn die Servers-Option ebenfalls gesetzt ist,
 7	// wird sie zum ersten Server im Servers-Array.
 8	Url string
 9
10	// InProcessServer repräsentiert einen NATS-Server, der innerhalb desselben
11	// Prozesses läuft. Wenn dies gesetzt ist, versuchen wir, uns
12	// direkt mit dem Server zu verbinden, anstatt externe TCP-Verbindungen zu verwenden.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Die Connect-Funktion, die die Verbindung zwischen nats server und nats client herstellt, ermöglicht die Konfiguration der client URL und der connect Option. In der Options-Struktur, die diese Optionen sammelt, existiert ein Feld namens InProcessServer vom Typ InProcessConnProvider interface.
1// main.go of example code
2
3// Initialisiert einen neuen Server mit Optionen
4ns, err := server.NewServer(opts)
5
6//...
7
8// Verbindet sich mit dem Server über eine In-Process-Verbindung
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Wenn der nats client beim Herstellen einer Verbindung nats.InProcessServer(ns) als Wert für das InProcessServer-Feld übergibt,
 1// nats-go/nats.go
 2
 3// InProcessServer ist eine Option, die versucht, eine Verbindung zu einem NATS-Server
 4// herzustellen, der innerhalb des Prozesses läuft, anstatt über TCP zu wählen.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • wird die Option InProcessServer durch den embedded NATS-Server ersetzt, und
 1// nats-go/nats.go
 2
 3// createConn stellt eine Verbindung zum Server her und umhüllt die entsprechenden
 4// bufio-Strukturen. Es wird das Richtige tun, wenn eine bestehende
 5// Verbindung vorhanden ist.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Wenn wir eine Referenz auf einen In-Process-Server haben, stellen wir eine
15	// Verbindung mit diesem her.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • führt diese Schnittstelle in der createConn-Funktion, die eine Verbindung herstellt, die InProcesConn des InProcessServer-Option aus, falls diese nicht nil (gültig) ist, und
 1// nats-server/server/server.go
 2
 3// InProcessConn gibt eine In-Process-Verbindung zum Server zurück,
 4// wodurch die Notwendigkeit entfällt, einen TCP-Listener für die lokale Konnektivität
 5// innerhalb desselben Prozesses zu verwenden. Dies kann unabhängig vom
 6// Zustand der DontListen-Option verwendet werden.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • ruft und führt die im Server implementierte InProcessConn Funktion aus.
  • Diese Funktion wird im Go-Client von NATS, nats.go, aufgerufen, wenn InProcessServer von nc (nats connection) nicht nil ist, um eine Verbindung (net.Conn) herzustellen und diese dann an die Serververbindung zu binden.

Consumer driven interface of Go

Ein Typ implementiert eine Schnittstelle, indem er ihre Methoden implementiert. Es gibt keine explizite Absichtserklärung, kein "implements"-Schlüsselwort. Implizite Schnittstellen entkoppeln die Definition einer Schnittstelle von ihrer Implementierung, die dann in jedem Paket ohne vorherige Absprache erscheinen könnte.

Interfaces are implemented implicitly, A Tour of Go

Wenn ein Typ nur dazu existiert, eine Schnittstelle zu implementieren und niemals exportierte Methoden über diese Schnittstelle hinaus haben wird, ist es nicht notwendig, den Typ selbst zu exportieren.

Generality, Effective Go

  • Dieses Interface-Design verkörpert treffend das, was in Go gemeinhin als consumer defined interface und structural typing (duck typing) bezeichnet wird, und ich möchte dieses Thema ebenfalls kurz vorstellen.
 1// nats-go/nats.go
 2
 3// Optionen können verwendet werden, um eine angepasste Verbindung zu erstellen.
 4type Options struct {
 5	// Url repräsentiert eine einzelne NATS-Server-URL, mit der sich der Client
 6	// verbinden wird. Wenn die Servers-Option ebenfalls gesetzt ist,
 7	// wird sie zum ersten Server im Servers-Array.
 8	Url string
 9
10	// InProcessServer repräsentiert einen NATS-Server, der innerhalb desselben
11	// Prozesses läuft. Wenn dies gesetzt ist, versuchen wir, uns
12	// direkt mit dem Server zu verbinden, anstatt externe TCP-Verbindungen zu verwenden.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Kehren wir zum Code zurück. Im nats.go-Client ist das InProcessServer-Optionsstrukturfeld als InProcessConnProvider-Interface definiert, das nur InProcessConn ausführt.
 1// nats-server/server/server.go
 2
 3// InProcessConn gibt eine In-Process-Verbindung zum Server zurück,
 4// wodurch die Notwendigkeit entfällt, einen TCP-Listener für die lokale Konnektivität
 5// innerhalb desselben Prozesses zu verwenden. Dies kann unabhängig vom
 6// Zustand der DontListen-Option verwendet werden.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Der Typ, der darin verwendet wird, ist jedoch der Server von nats-server, der nicht nur InProcessConn, sondern auch verschiedene andere Funktionen ausführt.
  • Dies liegt daran, dass das Interesse des Clients in diesem Szenario lediglich darin besteht, ob ein Interface namens InProcessConn bereitgestellt wurde oder nicht, während andere Aspekte von geringer Bedeutung sind.
  • Daher hat der nats.go-Client lediglich ein InProcessConnProvider-Interface erstellt, das nur die Funktionalität InProcessConn() (net.Conn, error) definiert, was ein consumer defined interface darstellt.

Fazit

  • Ich habe kurz den embedded mode von NATS und seine Funktionsweise sowie das consumer defined interface von Go behandelt, das im NATS-Code zu finden ist.
  • Ich hoffe, diese Informationen sind für diejenigen nützlich, die NATS zu den oben genannten Zwecken verwenden, und möchte diesen Artikel hiermit abschließen.