GoSuda

Hoe communiceren ingebedde NATS met een Go-applicatie?

By prravda
views ...

Aan de slag

Over NATS

Softwareapplicaties en -diensten moeten gegevens uitwisselen. NATS is een infrastructuur die een dergelijke gegevensuitwisseling mogelijk maakt, gesegmenteerd in de vorm van berichten. Dit noemen we een "berichtgeoriënteerde middleware".

Met NATS kunnen applicatieontwikkelaars:

  • Moeiteloos gedistribueerde en schaalbare client-serverapplicaties bouwen.
  • Gegevens in realtime op een algemene manier opslaan en distribueren. Dit kan flexibel worden bereikt in verschillende omgevingen, talen, cloudproviders en on-premises systemen.

Wat is NATS, NATS documentatie

  • NATS is een message broker die is gebouwd met Go.

Embedded NATS

Als uw applicatie in Go is geschreven, en als het past bij uw use case en implementatiescenario's, kunt u zelfs een NATS-server binnen uw applicatie embedden.

NATS embedden, NATS documentatie

  • Een bijzonder kenmerk van NATS is de ondersteuning voor de embedded modus voor applicaties die in Go zijn geschreven.
  • Dit betekent dat, in plaats van de gebruikelijke methode voor message brokers waarbij een aparte brokerserver wordt gestart en communicatie plaatsvindt via clients van de applicatie met die server, de broker zelf kan worden ingebed in een Go-applicatie.

Voordelen en gebruiksscenario's van embedded NATS

  • Er is een goed uitgelegde Youtube-video, dus ik volsta met een link naar die video.
  • Zonder de noodzaak om een aparte message broker server te implementeren, kan men een modulaire monolith applicatie creëren en het principe van 'separation of concerns' bereiken, terwijl tegelijkertijd de voordelen van een embedded NATS worden benut. Bovendien wordt zo ook een single binary deployment mogelijk.
  • Dit is niet alleen bruikbaar voor platforms zonder netwerk (WASM), maar ook voor offline-first applicaties.

Voorbeeld uit de officiële documentatie

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialiseer een nieuwe server met opties
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start de server via goroutine
22    go ns.Start()
23
24    // Wacht tot de server klaar is voor verbindingen
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("niet klaar voor verbinding")
27    }
28
29    // Maak verbinding met de server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Abonneer op het onderwerp
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Print berichtgegevens
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Sluit de server af (optioneel)
45        ns.Shutdown()
46    })
47
48    // Publiceer gegevens naar het onderwerp
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Wacht op het afsluiten van de server
52    ns.WaitForShutdown()
53}
  • Dit is een voorbeeld van Embedded NATS dat wordt aangeboden in de officiële NATS-documentatie, maar als men deze voorbeeldcode uitvoert, vindt er geen communicatie plaats in de embedding-modus.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Wanneer u het commando watch 'netstat -an | grep 127.0.0.1' gebruikt om het netwerkverkeer van en naar localhost (127.0.0.1) te controleren en vervolgens het Go-bestand uitvoert met go run ., zult u zien dat er nieuwe netwerkverzoeken worden toegevoegd die afkomstig zijn van de standaard NATS-poort 4222.

Juiste configuraties voor de embedding-modus

  • Om te communiceren in de bedoelde embedded modus, zijn de volgende twee opties vereist:

    • Client: De InProcessServer-optie moet worden toegevoegd.
    • Server: De DontListen-vlag in Server.Options moet op true worden ingesteld.
  • Deze aspecten zijn niet officieel gedocumenteerd, en de oorsprong van deze functionaliteit kan worden achterhaald via deze PR.

    Deze PR voegt drie zaken toe:

    1. Een InProcessConn()-functie aan Server die een net.Pipe opbouwt om een verbinding met de NATS-server te verkrijgen zonder gebruik te maken van TCP-sockets.
    2. Een DontListen-optie die de NATS-server vertelt om niet te luisteren op de gebruikelijke TCP-listener.
    3. Een startupComplete-kanaal, dat wordt gesloten net voordat we AcceptLoop starten, en readyForConnections zal erop wachten.

    De belangrijkste motivatie hiervoor is dat we een applicatie hebben die zowel in een monolithische (single-process) modus als in een polylithische (multi-process) modus kan draaien. We willen NATS voor beide modi kunnen gebruiken voor de eenvoud, maar de monolithische modus moet geschikt zijn voor een verscheidenheid aan platforms waar het openen van socketverbindingen ofwel geen zin heeft (mobiel) of gewoon niet mogelijk is (WASM). Deze wijzigingen zullen ons in staat stellen om NATS volledig in-process te gebruiken.

    Een bijbehorende PR nats-io/nats.go#774 voegt ondersteuning toe aan de clientzijde.

    Dit is mijn eerste PR voor dit project, dus bij voorbaat mijn excuses als ik iets voor de hand liggends over het hoofd heb gezien.

    /cc @nats-io/core

Werkend voorbeeld voor de embedded modus

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// voor het configureren van de ingebedde NATS-server
14		// stel DontListen in op true
15		DontListen: true,
16	}
17
18	// Initialiseer een nieuwe server met opties
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start de server via goroutine
26	go ns.Start()
27
28	// Wacht tot de server klaar is voor verbindingen
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("niet klaar voor verbinding")
31	}
32
33	// Maak verbinding met de server via in-process verbinding
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Abonneer op het onderwerp
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Print berichtgegevens
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Sluit de server af (optioneel)
49		ns.Shutdown()
50	})
51
52	// Publiceer gegevens naar het onderwerp
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Wacht op het afsluiten van de server
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...geen aanvullende logboeken
4
  • Nu is te zien dat er geen extra netwerkhop plaatsvindt, zoals bedoeld.

Onder de motorkap

TL;DR

diagram1

  • Dit is een sequentiediagram dat weergeeft welke interne functies en hoe deze functioneren wanneer de code in main.go wordt uitgevoerd. De kernpunten worden hieronder toegelicht:
    • Door DontListen: true te gebruiken, slaat de server de client listening phase genaamd AcceptLoop over.
    • Indien de InProcessServer-optie van de client Connect-optie is geactiveerd, wordt een in-memory verbinding tot stand gebracht. Hierbij wordt een pipe gecreëerd via net.Pipe, waarna het einde van de pipe als een net.Conn-type aan de client wordt geretourneerd.
    • De client en server communiceren via deze verbinding in-process.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Wacht op clients.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Ten eerste, wanneer DontListen waar is, wordt de client listening phase genaamd AcceptLoop overgeslagen.
 1// nats-server/server/server.go
 2
 3// AcceptLoop is geëxporteerd voor eenvoudiger testen.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Als we zouden afsluiten voordat de listener correct is ingesteld,
 6	// zorg er dan voor dat we het kanaal sluiten.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Maak een snapshot van de serveropties.
18	opts := s.getOpts()
19
20	// Stel de status in die afsluiten mogelijk maakt
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Fout bij het luisteren op poort: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Luistert naar clientverbindingen op %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Waarschuw als TLS is ingeschakeld.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS vereist voor clientverbindingen")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients die de optie \"TLS Handshake First\" niet gebruiken, zullen geen verbinding kunnen maken")
38		}
39	}
40
41	// Als de server is gestart met RANDOM_PORT (-1), zou opts.Port aan het begin van deze functie
42	// gelijk zijn aan 0. We moeten dus de daadwerkelijke poort verkrijgen
43	if opts.Port == 0 {
44		// Schrijf de opgeloste poort terug naar de opties.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Nu de poort is ingesteld (indien ingesteld op RANDOM), stel de
49	// server's info Host/Port in met waarden uit Options of
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Fout bij het instellen van server INFO met ClientAdvertise waarde van %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Houd de client connect URL's bij. We hebben ze mogelijk later nodig.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signaleer dat we geen nieuwe clients accepteren
65				s.ldmCh <- true
66				// Wacht nu op de Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Laat de aanroeper weten dat we klaar zijn
75	close(clr)
76	clr = nil
77}
  • Ter referentie, de functie AcceptLoop doorloopt de volgende stappen. Deze zijn gerelateerd aan netwerkcommunicatie, zoals TLS en hostPort, en kunnen worden overgeslagen omdat ze niet nodig zijn bij in-process communicatie.

Client

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect zal proberen verbinding te maken met het NATS-systeem.
 5// De URL kan gebruikersnaam/wachtwoord-semantiek bevatten. bijv. nats://derek:pass@localhost:4222
 6// Komma-gescheiden arrays worden ook ondersteund, bijv. urlA, urlB.
 7// Opties beginnen met de standaardwaarden, maar kunnen worden overschreven.
 8// Om verbinding te maken met de websocket-poort van een NATS-server, gebruikt u het `ws` of `wss`-schema, zoals
 9// `ws://localhost:8080`. Merk op dat websocket-schema's niet kunnen worden gemengd met andere (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Opties kunnen worden gebruikt om een aangepaste verbinding te creëren.
 4type Options struct {
 5	// Url vertegenwoordigt een enkele NATS-server-URL waarmee de client
 6	// verbinding zal maken. Als de Servers-optie ook is ingesteld, wordt deze
 7	// dan de eerste server in de Servers-array.
 8	Url string
 9
10	// InProcessServer vertegenwoordigt een NATS-server die binnen hetzelfde
11	// proces draait. Als dit is ingesteld, zullen we proberen rechtstreeks
12	// verbinding te maken met de server in plaats van externe TCP-verbindingen te gebruiken.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • De Connect-functie, die de verbinding tussen de NATS-server en de NATS-client tot stand brengt, maakt het mogelijk om de client-URL en verbindingsopties te configureren. De Options-structuur, waarin deze opties zijn verzameld, bevat een veld genaamd InProcessServer van het type InProcessConnProvider-interface.
1// main.go van voorbeeldcode
2
3// Initialiseer een nieuwe server met opties
4ns, err := server.NewServer(opts)
5
6//...
7
8// Maak verbinding met de server via in-process verbinding
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Wanneer de NATS-client een verbinding initieert en nats.InProcessServer(ns) als InProcessServer-veld doorgeeft,
 1// nats-go/nats.go
 2
 3// InProcessServer is een optie die zal proberen een richting naar een NATS-server
 4// te bepalen die binnen het proces draait in plaats van via TCP te bellen.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • De InProcessServer van de optie wordt vervangen door de embedded NATS-server.
 1// nats-go/nats.go
 2
 3// createConn maakt verbinding met de server en wikkelt de juiste
 4// bufio-structuren in. Het zal het juiste doen wanneer een bestaande
 5// verbinding aanwezig is.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Als we een verwijzing hebben naar een in-process server, dan maken we een
15	// verbinding tot stand met behulp daarvan.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("niet gelukt om in-process verbinding te verkrijgen: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • Deze interface wordt aangeroepen wanneer de InProcessServer-optie in de functie createConn niet nil (geldig) is, waarbij de InProcessConn van de InProcessServer in de optie wordt uitgevoerd.
 1// nats-server/server/server.go
 2
 3// InProcessConn retourneert een in-process verbinding met de server,
 4// waardoor de noodzaak om een TCP-listener te gebruiken voor lokale connectiviteit
 5// binnen hetzelfde proces wordt vermeden. Dit kan worden gebruikt ongeacht de
 6// status van de DontListen-optie.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("niet gelukt om verbinding te maken")
16	}
17	return pr, nil
18}
  • De in de server geïmplementeerde InProcessConn wordt aangeroepen en uitgevoerd.
  • Deze functie, indien aangeroepen wanneer InProcessServer van nc (nats-verbinding) in de Go-client van NATS (nats.go) niet nil is, creëert een verbinding (net.Conn) en bindt deze aan de serververbinding.

Consumer driven interface van Go

Een type implementeert een interface door zijn methoden te implementeren. Er is geen expliciete intentieverklaring, geen "implements" trefwoord. Impliciete interfaces ontkoppelen de definitie van een interface van de implementatie ervan, die dan in elk pakket kan verschijnen zonder voorafgaande afspraak.

Interfaces worden impliciet geïmplementeerd, A Tour of Go

Als een type alleen bestaat om een interface te implementeren en nooit geëxporteerde methoden buiten die interface zal hebben, is het niet nodig om het type zelf te exporteren.

Algemeenheid, Effectieve Go

  • Dit interface-ontwerp omvat het zogenaamde "consumer defined interface" en "structural typing" (duck typing) in Go, en ik wil deze onderwerpen kort introduceren.
 1// nats-go/nats.go
 2
 3// Opties kunnen worden gebruikt om een aangepaste verbinding te creëren.
 4type Options struct {
 5	// Url vertegenwoordigt een enkele NATS-server-URL waarmee de client
 6	// verbinding zal maken. Als de Servers-optie ook is ingesteld, wordt deze
 7	// dan de eerste server in de Servers-array.
 8	Url string
 9
10	// InProcessServer vertegenwoordigt een NATS-server die binnen hetzelfde
11	// proces draait. Als dit is ingesteld, zullen we proberen rechtstreeks
12	// verbinding te maken met de server in plaats van externe TCP-verbindingen te gebruiken.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Laten we teruggaan naar de code. In de NATS.go-client is het InProcessServer-optiestructuurveld gedefinieerd als de InProcessConnProvider-interface, die alleen InProcessConn uitvoert.
 1// nats-server/server/server.go
 2
 3// InProcessConn retourneert een in-process verbinding met de server,
 4// waardoor de noodzaak om een TCP-listener te gebruiken voor lokale connectiviteit
 5// binnen hetzelfde proces wordt vermeden. Dit kan worden gebruikt ongeacht de
 6// status van de DontListen-optie.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("niet gelukt om verbinding te maken")
16	}
17	return pr, nil
18}
  • Het type dat erin wordt ingevoerd, is echter de Server van nats-server, die naast InProcessConn ook diverse andere functionaliteiten uitvoert.
  • Dit komt doordat de client in deze situatie alleen geïnteresseerd is in de vraag of de InProcessConn-interface is geleverd, en andere aspecten zijn van ondergeschikt belang.
  • Daarom heeft de nats.go-client alleen een "consumer defined interface" genaamd InProcessConnProvider gecreëerd en gebruikt, die alleen de functionaliteit InProcessConn() (net.Conn, error) definieert.

Conclusie

  • Ik heb kort de embedded modus van NATS, de werking ervan, en de consumer defined interface van Go, zoals te zien is in de NATS-code, behandeld.
  • Ik hoop dat deze informatie nuttig zal zijn voor mensen die NATS voor de bovengenoemde doeleinden gebruiken, en hiermee sluit ik dit artikel af.