GoSuda

Hoe communiceren ingebedde NATS met een Go-applicatie?

By prravda
views ...

Aan de slag

Over NATS

Softwareapplicaties en -diensten moeten gegevens uitwisselen. NATS is een infrastructuur die een dergelijke gegevensuitwisseling mogelijk maakt, gesegmenteerd in de vorm van berichten. Dit noemen we "message oriented middleware".

Met NATS kunnen applicatieontwikkelaars:

  • Moeiteloos gedistribueerde en schaalbare client-server applicaties bouwen.
  • Gegevens in realtime op een algemene manier opslaan en distribueren. Dit kan flexibel worden bereikt in diverse omgevingen, talen, cloud providers en on-premises systemen.

Wat is NATS, NATS docs

  • NATS is een message broker geschreven in Go.

Embedded NATS

Indien uw applicatie in Go is geschreven, en indien dit past bij uw use case en implementatiescenario's, kunt u zelfs een NATS server in uw applicatie embedden.

Embedding NATS, NATS docs

  • Een bijzonder kenmerk van NATS is de ondersteuning voor een embedded mode voor applicaties die in Go zijn geschreven.
  • Dit betekent dat, in tegenstelling tot de gebruikelijke methode waarbij een aparte broker server wordt gestart en communicatie via clients van de applicatie plaatsvindt, de broker zelf kan worden ingebed in een in Go gemaakte applicatie.

Voordelen en use cases van embedded NATS

  • Er is een goed uitgelegde Youtube video beschikbaar, waarnaar ik verwijs.
  • Zelfs zonder een aparte message broker server te implementeren, kan men een modular monolith applicatie creëren en het voordeel benutten van het embedded implementeren van NATS om zo separation of concerns te bewerkstelligen. Bovendien wordt single binary deployment mogelijk.
  • Dit kan nuttig worden toegepast, niet alleen op platforms zonder netwerk (WASM), maar ook in offline-first applicaties.

Voorbeeld in de officiële documentatie

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialiseer nieuwe server met opties
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start de server via goroutine
22    go ns.Start()
23
24    // Wacht tot de server klaar is voor verbindingen
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("niet klaar voor verbinding")
27    }
28
29    // Maak verbinding met de server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Abonneer op het subject
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Print berichtdata
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Sluit de server af (optioneel)
45        ns.Shutdown()
46    })
47
48    // Publiceer data naar het subject
49    nc.Publish(subject, []byte("Hallo embedded NATS!"))
50
51    // Wacht op serverafsluiting
52    ns.WaitForShutdown()
53}
  • Dit is een voorbeeld van Embedded NATS uit de officiële NATS-documentatie, maar de communicatie vindt niet plaats in embedded mode als men de voorbeeldcode volgt.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Als u de go run . opdracht uitvoert voor het betreffende Go-bestand, terwijl u het netwerkverkeer van en naar localhost (127.0.0.1) controleert met het watch 'netstat -an | grep 127.0.0.1' commando, zult u zien dat er nieuwe netwerkverzoeken, afkomstig van de standaard NATS-poort 4222, worden toegevoegd.

Juiste configuraties voor embedded mode

  • Om in de gewenste embedded mode te communiceren, zijn de volgende twee opties vereist:

    • Client: De InProcessServer optie moet worden toegevoegd.
    • Server: In de Server.Options moet de DontListen flag als true worden gespecificeerd.
  • Deze aspecten waren niet officieel gedocumenteerd; de oorsprong van deze functionaliteit kan worden afgeleid uit deze PR.

    Deze PR voegt drie zaken toe:

    1. InProcessConn() functie naar Server die een net.Pipe bouwt om een verbinding met de NATS-server te krijgen zonder TCP sockets te gebruiken
    2. DontListen optie die de NATS-server vertelt niet te luisteren op de gebruikelijke TCP listener
    3. startupComplete kanaal, dat wordt gesloten vlak voordat we AcceptLoop starten, en readyForConnections zal erop wachten

    De belangrijkste motivatie hiervoor is dat we een applicatie hebben die zowel in een monolithische (single-process) modus als in een polylithische (multi-process) modus kan draaien. We willen NATS voor beide modi kunnen gebruiken voor de eenvoud, maar de monolithische modus moet in staat zijn om te voldoen aan een verscheidenheid aan platforms waar het openen van socketverbindingen ofwel geen zin heeft (mobiel) of gewoon niet mogelijk is (WASM). Deze wijzigingen zullen ons in staat stellen om NATS volledig in-process te gebruiken.

    Een bijbehorende PR nats-io/nats.go#774 voegt ondersteuning toe aan de clientzijde.

    Dit is mijn eerste PR voor dit project, dus bij voorbaat mijn excuses als ik iets voor de hand liggends over het hoofd heb gezien.

    /cc @nats-io/core

Werkend voorbeeld voor embedded mode

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// voor het configureren van de ingebedde NATS-server
14		// stel DonListen in op true
15		DontListen: true,
16	}
17
18	// Initialiseer nieuwe server met opties
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start de server via goroutine
26	go ns.Start()
27
28	// Wacht tot de server klaar is voor verbindingen
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("niet klaar voor verbinding")
31	}
32
33	// Maak verbinding met de server via een in-process verbinding
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Abonneer op het subject
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Print berichtdata
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Sluit de server af (optioneel)
49		ns.Shutdown()
50	})
51
52	// Publiceer data naar het subject
53	nc.Publish(subject, []byte("Hallo embedded NATS!"))
54
55	// Wacht op serverafsluiting
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Nu is te zien dat er geen extra network hop optreedt, zoals beoogd.

Under the hood

TL;DR

diagram1

  • Dit is een sequence diagram dat aangeeft welke functies intern worden uitgevoerd wanneer de code in main.go wordt uitgevoerd, en de essentie is als volgt:
    • Via DontListen: true slaat de server de client listening phase, genaamd AcceptLoop, over.
    • Indien de InProcessServer optie van de client wordt geactiveerd, wordt een in-memory verbinding tot stand gebracht en wordt een pipe gecreëerd via net.Pipe. Vervolgens wordt het einde van de pipe als net.Conn type aan de client geretourneerd.
    • De client en server voeren in-process communicatie uit via deze verbinding.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Wacht op clients.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Ten eerste, als DontListen true is, wordt de client listening phase, genaamd AcceptLoop, overgeslagen.
 1// nats-server/server/server.go
 2
 3// AcceptLoop is geëxporteerd voor eenvoudiger testen.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Als we zouden afsluiten voordat de listener correct is ingesteld,
 6	// zorg er dan voor dat we het kanaal sluiten.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Maak een snapshot van de serveropties.
18	opts := s.getOpts()
19
20	// Stel de status in die afsluiting mogelijk maakt
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Fout bij het luisteren op poort: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Luisteren voor clientverbindingen op %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Waarschuwing voor ingeschakelde TLS.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS vereist voor clientverbindingen")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients die de optie \"TLS Handshake First\" niet gebruiken, zullen geen verbinding kunnen maken")
38		}
39	}
40
41	// Als de server werd gestart met RANDOM_PORT (-1), zou opts.Port aan het begin van
42	// deze functie gelijk zijn aan 0. We moeten dus de daadwerkelijke poort ophalen
43	if opts.Port == 0 {
44		// Schrijf de opgeloste poort terug naar de opties.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Nu de poort is ingesteld (indien ingesteld op RANDOM), stel de
49	// INFO Host/Poort van de server in met waarden uit Options of
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Fout bij het instellen van server INFO met ClientAdvertise waarde van %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Houd client connect URLs bij. We hebben ze misschien later nodig.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signaal dat we geen nieuwe clients accepteren
65				s.ldmCh <- true
66				// Wacht nu op de Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Laat de beller weten dat we klaar zijn
75	close(clr)
76	clr = nil
77}
  • Ter referentie, de AcceptLoop functie voert de volgende processen uit. Deze omvatten netwerkcommunicatie-gerelateerde onderdelen zoals TLS en hostPort, die overbodig zijn bij in-process communicatie en daarom kunnen worden weggelaten.

Client

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect zal proberen verbinding te maken met het NATS-systeem.
 5// De url kan gebruikersnaam/wachtwoord semantiek bevatten. bijv. nats://derek:pass@localhost:4222
 6// Door komma's gescheiden arrays worden ook ondersteund, bijv. urlA, urlB.
 7// Opties beginnen met de standaardwaarden, maar kunnen worden overschreven.
 8// Om verbinding te maken met de websocket-poort van een NATS-server, gebruikt u het `ws` of `wss` schema, zoals
 9// `ws://localhost:8080`. Merk op dat websocket-schema's niet kunnen worden gemengd met andere (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Opties kunnen worden gebruikt om een aangepaste verbinding te creëren.
 4type Options struct {
 5	// Url vertegenwoordigt een enkele NATS-server-url waarmee de client
 6	// verbinding zal maken. Als de Servers-optie ook is ingesteld,
 7	// wordt deze de eerste server in de Servers-array.
 8	Url string
 9
10	// InProcessServer vertegenwoordigt een NATS-server die binnen dezelfde
11	// proces draait. Als dit is ingesteld, zullen we proberen verbinding
12	// te maken met de server direct in plaats van externe TCP-verbindingen te gebruiken.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • De Connect functie, die de verbinding tussen de NATS server en de NATS client tot stand brengt, kan de client URL en connect Option instellen. De Options struct, die deze opties bevat, heeft een InProcessServer veld van het InProcessConnProvider interface type.
1// main.go of voorbeeldcode
2
3// Initialiseer nieuwe server met opties
4ns, err := server.NewServer(opts)
5
6//...
7
8// Maak verbinding met de server via een in-process verbinding
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Wanneer de nats client een Connectie initieert en nats.InProcessServer(ns) doorgeeft aan het InProcessServer veld, dan:
 1// nats-go/nats.go
 2
 3// InProcessServer is een optie die zal proberen een directe verbinding tot stand te brengen met een NATS-server
 4// die binnen het proces draait, in plaats van te bellen via TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • wordt de InProcessServer van de optie vervangen door de embedded NATS server, en
 1// nats-go/nats.go
 2
 3// createConn zal verbinding maken met de server en de juiste
 4// bufio structuren omwikkelen. Het zal het juiste doen wanneer een bestaande
 5// verbinding aanwezig is.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Als we een referentie hebben naar een in-process server, maak dan een
15	// verbinding tot stand met die.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("fout bij het verkrijgen van in-process verbinding: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • wordt, indien de InProcessServer optie in de createConn functie (die de verbinding tot stand brengt) niet nil (geldig) is, de InProcessConn van de InProcessServer optie uitgevoerd, en
 1// nats-server/server/server.go
 2
 3// InProcessConn retourneert een in-process verbinding naar de server,
 4// waardoor het niet nodig is om een TCP-listener te gebruiken voor lokale connectiviteit
 5// binnen hetzelfde proces. Dit kan worden gebruikt ongeacht de
 6// staat van de DontListen optie.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("fout bij het aanmaken van verbinding")
16	}
17	return pr, nil
18}
  • wordt de InProcessConn die in de server is geïmplementeerd, aangeroepen en uitgevoerd.
  • Deze functie wordt aangeroepen wanneer de InProcessServer van de nc (nats connection) in de Go client van NATS, nats.go, niet nil is. Vervolgens wordt een verbinding (net.Conn) tot stand gebracht en aan de serververbinding gebonden.

Consumer driven interface van Go

Een type implementeert een interface door de methoden ervan te implementeren. Er is geen expliciete intentieverklaring, geen "implements" trefwoord. Impliciete interfaces ontkoppelen de definitie van een interface van de implementatie ervan, die dan in elk pakket kan verschijnen zonder voorafgaande afspraak.

Interfaces worden impliciet geïmplementeerd, A Tour of Go

Als een type uitsluitend bestaat om een interface te implementeren en nooit geëxporteerde methoden zal hebben die verder gaan dan die interface, is het niet nodig om het type zelf te exporteren.

Algemeenheid, Effectieve Go

  • Dit interface-ontwerp omvat goed wat in Go vaak een consumer-defined interface en structural typing (duck typing) wordt genoemd, en daarom wilde ik dit onderwerp ook introduceren.
 1// nats-go/nats.go
 2
 3// Opties kunnen worden gebruikt om een aangepaste verbinding te creëren.
 4type Options struct {
 5	// Url vertegenwoordigt een enkele NATS-server-url waarmee de client
 6	// verbinding zal maken. Als de Servers-optie ook is ingesteld,
 7	// wordt deze de eerste server in de Servers-array.
 8	Url string
 9
10	// InProcessServer vertegenwoordigt een NATS-server die binnen dezelfde
11	// proces draait. Als dit is ingesteld, zullen we proberen verbinding
12	// te maken met de server direct in plaats van externe TCP-verbindingen te gebruiken.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Laten we teruggaan naar de code. In de nats.go client is het InProcessServer optie struct veld gedefinieerd als een InProcessConnProvider interface, die alleen InProcessConn uitvoert.
 1// nats-server/server/server.go
 2
 3// InProcessConn retourneert een in-process verbinding naar de server,
 4// waardoor het niet nodig is om een TCP-listener te gebruiken voor lokale connectiviteit
 5// binnen hetzelfde proces. Dit kan worden gebruikt ongeacht de
 6// staat van de DontListen optie.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("fout bij het aanmaken van verbinding")
16	}
17	return pr, nil
18}
  • Echter, het type dat daarin wordt geplaatst is de Server van nats-server, die naast InProcessConn ook diverse andere functies uitvoert.
  • Dit komt doordat de client in dit scenario zich alleen bekommert om de vraag of de InProcessConn interface is geboden, en andere zaken van ondergeschikt belang zijn.
  • Daarom heeft de nats.go client uitsluitend een consumer-defined interface genaamd InProcessConnProvider gecreëerd en gebruikt, die enkel de functionaliteit InProcessConn() (net.Conn, error) definieert.

Conclusie

  • In dit artikel heb ik kort de embedded mode van NATS en de werking ervan besproken, evenals de consumer-defined interface van Go die in de NATS-code te vinden is.
  • Ik hoop dat deze informatie nuttig zal zijn voor mensen die NATS voor de bovengenoemde doeleinden gebruiken, en hiermee sluit ik dit artikel af.