Wie kommunizieren eingebettete NATS mit der Go-Anwendung?
Erste Schritte
Über NATS
Software-Anwendungen und -Dienste müssen Daten austauschen. NATS ist eine Infrastruktur, die einen solchen Datenaustausch ermöglicht, segmentiert in Form von Nachrichten. Wir nennen dies eine „Message Oriented Middleware“.
Mit NATS können Anwendungsentwickler:
- Mühelos verteilte und skalierbare Client-Server-Anwendungen erstellen.
- Daten in Echtzeit auf allgemeine Weise speichern und verteilen. Dies kann flexibel über verschiedene Umgebungen, Sprachen, Cloud-Anbieter und On-Premises-Systeme hinweg erreicht werden.
- NATS ist ein in Go implementierter Message Broker.
Embedded NATS
Wenn Ihre Anwendung in Go geschrieben ist und zu Ihrem Anwendungsfall und Ihren Bereitstellungsszenarien passt, können Sie sogar einen NATS-Server in Ihre Anwendung einbetten.
NATS einbetten, NATS-Dokumentation
- Eine Besonderheit von NATS ist, dass es für in Go implementierte Anwendungen den Embedded Mode unterstützt.
- Das bedeutet, dass der Broker selbst in eine in Go erstellte Anwendung eingebettet (embed) werden kann, anstatt der üblichen Methode eines Message Brokers, bei der ein separater Broker-Server gestartet und die Kommunikation über den Client der Anwendung mit diesem Server erfolgt.
Vorteile und Anwendungsfälle von Embedded NATS
- Es gibt ein gut erklärtes Youtube-Video, auf das ich verweisen möchte.
- Auch ohne die Bereitstellung eines separaten Message Broker Servers kann man eine modulare Monolith-Anwendung erstellen, um die Trennung der Belange zu erreichen, und gleichzeitig den Vorteil nutzen, NATS als embedded-Lösung zu integrieren. Darüber hinaus wird eine Single Binary Deployment möglich.
- Es ist nützlich nicht nur für Plattformen ohne Netzwerk (WASM), sondern auch für Offline-First-Anwendungen.
Beispiel aus der offiziellen Dokumentation
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialisiert einen neuen Server mit Optionen
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Startet den Server über eine Goroutine
22 go ns.Start()
23
24 // Wartet, bis der Server für Verbindungen bereit ist
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("nicht bereit für Verbindung")
27 }
28
29 // Verbindet sich mit dem Server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Abonniert das Subject
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Gibt die Nachrichtendaten aus
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Fährt den Server herunter (optional)
45 ns.Shutdown()
46 })
47
48 // Veröffentlicht Daten auf dem Subject
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Wartet auf den Server-Shutdown
52 ns.WaitForShutdown()
53}
- Dies ist ein Beispiel für Embedded NATS aus der offiziellen NATS-Dokumentation. Wenn man diesem Beispielcode folgt, wird die Kommunikation nicht im Embedding Mode durchgeführt.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Wenn Sie den
watch 'netstat -an | grep 127.0.0.1'
-Befehl verwenden, um das Netzwerk auf localhost (127.0.0.1) zu überwachen, und die Go-Datei mitgo run .
ausführen, werden Sie feststellen, dass neue Netzwerkanfragen vom Standard-NATS-Port4222
hinzugefügt werden.
Richtige Konfigurationen für den Embedding Mode
Um die Kommunikation wie beabsichtigt im Embedded Mode zu ermöglichen, sind die folgenden zwei Optionen erforderlich:
- Client: Die Option
InProcessServer
muss hinzugefügt werden. - Server: In den
Server.Options
muss das FlagDontListen
auftrue
gesetzt werden.
- Client: Die Option
Diese Punkte waren offiziell nicht dokumentiert, und der Ursprung dieser Funktion lässt sich über den entsprechenden PR nachvollziehen.
Dieser PR fügt drei Dinge hinzu:
InProcessConn()
-Funktion zumServer
, die einnet.Pipe
erstellt, um eine Verbindung zum NATS-Server ohne die Verwendung von TCP-Sockets zu erhalten.DontListen
-Option, die dem NATS-Server mitteilt, nicht auf dem üblichen TCP-Listener zu lauschen.startupComplete
-Kanal, der geschlossen wird, kurz bevor wir denAcceptLoop
starten, undreadyForConnections
wird darauf warten.
Die Hauptmotivation dafür ist, dass wir eine Anwendung haben, die entweder in einem Monolith (Einzelprozess)-Modus oder einem Polylith (Mehrprozess)-Modus ausgeführt werden kann. Wir möchten NATS aus Gründen der Einfachheit für beide Modi verwenden können, aber der Monolith-Modus muss eine Vielzahl von Plattformen bedienen können, auf denen das Öffnen von Socket-Verbindungen entweder keinen Sinn ergibt (mobil) oder einfach nicht möglich ist (WASM). Diese Änderungen werden es uns ermöglichen, NATS vollständig In-Process zu verwenden.
Ein begleitender PR nats-io/nats.go#774 fügt Unterstützung auf der Client-Seite hinzu.
Dies ist mein erster PR zu diesem Projekt, daher entschuldige ich mich im Voraus, falls ich irgendwo etwas Offensichtliches übersehen habe.
/cc @nats-io/core
Arbeitsbeispiel für den Embedded Mode
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // zur Konfiguration des eingebetteten NATS-Servers
14 // DontListen auf true setzen
15 DontListen: true,
16 }
17
18 // Initialisiert einen neuen Server mit Optionen
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Startet den Server über eine Goroutine
26 go ns.Start()
27
28 // Wartet, bis der Server für Verbindungen bereit ist
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("nicht bereit für Verbindung")
31 }
32
33 // Verbindet sich mit dem Server über eine In-Process-Verbindung
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Abonniert das Subject
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Gibt die Nachrichtendaten aus
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Fährt den Server herunter (optional)
49 ns.Shutdown()
50 })
51
52 // Veröffentlicht Daten auf dem Subject
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Wartet auf den Server-Shutdown
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Nun ist ersichtlich, dass keine zusätzlichen Network Hops mehr auftreten, wie beabsichtigt.
Under the hood
TL;DR
- Dies ist ein Sequenzdiagramm, das zeigt, wie die internen Funktionen arbeiten, wenn der Code in
main.go
ausgeführt wird. Die Hauptpunkte sind wie folgt:- Durch
DontListen: true
überspringt der Server die Client-Listening-Phase namensAcceptLoop
. - Wenn die
InProcessServer
-Option des Clients aktiviert ist, wird eine In-Memory-Verbindung erstellt und übernet.Pipe
eine Pipe aufgebaut, wobei das Ende der Pipe dem Client alsnet.Conn
-Typ zurückgegeben wird. - Client und Server kommunizieren über diese Verbindung In-Process.
- Durch
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Wartet auf Clients.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Wenn
DontListen
auftrue
gesetzt ist, wird zunächst die Client-Listening-PhaseAcceptLoop
übersprungen.
1// nats-server/server/server.go
2
3// AcceptLoop wird für einfachere Tests exportiert.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Falls wir vor der ordnungsgemäßen Einrichtung des Listeners beenden,
6 // stellen wir sicher, dass wir den Kanal schließen.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Server-Optionen erfassen.
18 opts := s.getOpts()
19
20 // Zustand einrichten, der das Herunterfahren ermöglichen kann
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Fehler beim Lauschen auf Port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Lausche auf Client-Verbindungen auf %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Bei aktiviertem TLS benachrichtigen.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS für Client-Verbindungen erforderlich")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clients, die die Option \"TLS Handshake First\" nicht verwenden, können keine Verbindung herstellen")
38 }
39 }
40
41 // Wenn der Server mit RANDOM_PORT (-1) gestartet wurde, wäre opts.Port zu Beginn dieser Funktion gleich 0.
42 // Daher müssen wir den tatsächlichen Port abrufen
43 if opts.Port == 0 {
44 // Aufgelösten Port in die Optionen zurückschreiben.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Nachdem der Port gesetzt wurde (falls er auf RANDOM gesetzt war),
49 // setzen wir die Info-Host/Port des Servers entweder mit Werten aus den Optionen oder
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Fehler beim Setzen der Server-INFO mit ClientAdvertise-Wert von %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Client-Verbindungs-URLs verfolgen. Wir könnten sie später benötigen.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signalisiert, dass wir keine neuen Clients akzeptieren
65 s.ldmCh <- true
66 // Nun auf den Shutdown warten...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Den Aufrufer wissen lassen, dass wir bereit sind
75 close(clr)
76 clr = nil
77}
- Die Funktion
AcceptLoop
führt übrigens die folgenden Schritte aus: Es handelt sich um netzwerkbezogene Teile wieTLS
oderhostPort
, die bei In-Process-Kommunikation unnötig sind und übersprungen werden können.
Client
InProcessServer
1
2// nats-go/nats.go
3
4// Connect versucht, eine Verbindung zum NATS-System herzustellen.
5// Die URL kann Benutzernamen/Passwort-Semantik enthalten. z.B. nats://derek:pass@localhost:4222
6// Kommagetrennte Arrays werden ebenfalls unterstützt, z.B. urlA, urlB.
7// Optionen beginnen mit den Standardwerten, können aber überschrieben werden.
8// Um eine Verbindung zum Websocket-Port eines NATS-Servers herzustellen, verwenden Sie das Schema `ws` oder `wss`, z.B.
9// `ws://localhost:8080`. Beachten Sie, dass Websocket-Schemata nicht mit anderen (nats/tls) gemischt werden können.
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Optionen können verwendet werden, um eine angepasste Verbindung zu erstellen.
4type Options struct {
5 // Url repräsentiert eine einzelne NATS-Server-URL, mit der der Client
6 // sich verbinden wird. Wenn die Servers-Option ebenfalls gesetzt ist,
7 // wird sie zum ersten Server im Servers-Array.
8 Url string
9
10 // InProcessServer repräsentiert einen NATS-Server, der innerhalb desselben Prozesses läuft.
11 // Wenn dies gesetzt ist, versuchen wir, uns direkt mit dem Server zu verbinden,
12 // anstatt externe TCP-Verbindungen zu verwenden.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Die
Connect
-Funktion, die die Verbindung zwischen NATS-Server und NATS-Client herstellt, ermöglicht die Konfiguration von Client-URL und Connect Option. Die Options-Struktur, die diese Optionen sammelt, enthält ein FeldInProcessServer
vom Interface-TypInProcessConnProvider
.
1// main.go des Beispielcodes
2
3// Initialisiert einen neuen Server mit Optionen
4ns, err := server.NewServer(opts)
5
6//...
7
8// Verbindet sich mit dem Server über eine In-Process-Verbindung
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Wenn beim Verbinden des NATS-Clients
nats.InProcessServer(ns)
an dasInProcessServer
-Feld übergeben wird, dann
1// nats-go/nats.go
2
3// InProcessServer ist eine Option, die versucht, eine direkte Verbindung zu einem NATS-Server herzustellen,
4// der innerhalb des Prozesses läuft, anstatt über TCP zu wählen.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- wird die Option
InProcessServer
durch den eingebetteten NATS-Server ersetzt und
1// nats-go/nats.go
2
3// createConn stellt eine Verbindung zum Server her und umschließt die entsprechenden
4// bufio-Strukturen. Es wird das Richtige tun, wenn eine bestehende
5// Verbindung vorhanden ist.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Wenn wir eine Referenz auf einen In-Process-Server haben, dann stellen wir eine
15 // Verbindung mit diesem her.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("Fehler beim Abrufen der In-Process-Verbindung: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- wird, wenn die
InProcessServer
-Option in dercreateConn
-Funktion, die die Verbindung herstellt, nicht nil (gültig) ist, dieInProcessConn
desInProcessServer
der Option ausgeführt und
1// nats-server/server/server.go
2
3// InProcessConn gibt eine In-Process-Verbindung zum Server zurück,
4// wodurch die Notwendigkeit entfällt, einen TCP-Listener für die lokale Konnektivität
5// innerhalb desselben Prozesses zu verwenden. Dies kann unabhängig vom
6// Zustand der DontListen-Option verwendet werden.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("Fehler beim Erstellen der Verbindung")
16 }
17 return pr, nil
18}
- die auf dem Server implementierte
InProcessConn
aufgerufen und ausgeführt. - Diese Funktion wird im Go-Client von NATS,
nats.go
, aufgerufen, wennInProcessServer
vonnc
(NATS-Verbindung) nicht nil ist. Sie erstellt eine Verbindung (net.Conn
) und bindet diese an die Serververbindung.
Consumer driven interface von Go
Ein Typ implementiert ein Interface, indem er dessen Methoden implementiert. Es gibt keine explizite Absichtserklärung, kein "implements"-Schlüsselwort. Implizite Interfaces entkoppeln die Definition eines Interfaces von seiner Implementierung, die dann ohne vorherige Absprache in jedem Paket erscheinen könnte.
Interfaces werden implizit implementiert, A Tour of Go
Wenn ein Typ nur dazu dient, ein Interface zu implementieren, und niemals exportierte Methoden über dieses Interface hinaus haben wird, ist es nicht notwendig, den Typ selbst zu exportieren.
- Dieses Interface-Design verkörpert die in Go häufig genannten Consumer-Defined Interfaces und Structural Typing (Duck Typing) sehr gut, daher möchte ich dieses Thema ebenfalls kurz vorstellen.
1// nats-go/nats.go
2
3// Optionen können verwendet werden, um eine angepasste Verbindung zu erstellen.
4type Options struct {
5 // Url repräsentiert eine einzelne NATS-Server-URL, mit der der Client
6 // sich verbinden wird. Wenn die Servers-Option ebenfalls gesetzt ist,
7 // wird sie zum ersten Server im Servers-Array.
8 Url string
9
10 // InProcessServer repräsentiert einen NATS-Server, der innerhalb desselben Prozesses läuft.
11 // Wenn dies gesetzt ist, versuchen wir, uns direkt mit dem Server zu verbinden,
12 // anstatt externe TCP-Verbindungen zu verwenden.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Kehren wir zum Code zurück. Im NATS.go-Client ist das
InProcessServer
-Strukturfeld alsInProcessConnProvider
-Interface definiert, das nurInProcessConn
ausführt.
1// nats-server/server/server.go
2
3// InProcessConn gibt eine In-Process-Verbindung zum Server zurück,
4// wodurch die Notwendigkeit entfällt, einen TCP-Listener für die lokale Konnektivität
5// innerhalb desselben Prozesses zu verwenden. Dies kann unabhängig vom
6// Zustand der DontListen-Option verwendet werden.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("Fehler beim Erstellen der Verbindung")
16 }
17 return pr, nil
18}
- Der Typ, der hier eingesetzt wird, ist jedoch der
Server
von NATS-Server, der nebenInProcessConn
noch eine Vielzahl weiterer Funktionen ausführt. - Dies liegt daran, dass der Client in dieser Situation nur daran interessiert ist, ob das
InProcessConn
-Interface bereitgestellt wurde oder nicht; andere Aspekte sind von geringer Bedeutung. - Daher hat der NATS.go-Client lediglich ein Consumer-Defined Interface namens
InProcessConnProvider
erstellt, das nur die FunktionInProcessConn() (net.Conn, error)
definiert und verwendet.
Fazit
- Ich habe kurz den Embedded Mode von NATS, seine Funktionsweise und das Consumer-Defined Interface von Go, das man im NATS-Code sehen kann, behandelt.
- Ich hoffe, diese Informationen sind für diejenigen hilfreich, die NATS für die oben genannten Zwecke verwenden, und hiermit beende ich diesen Artikel.