Hoe communiceren ingebedde NATS met een Go-applicatie?
Aan de slag
Over NATS
Softwareapplicaties en -diensten moeten gegevens uitwisselen. NATS is een infrastructuur die een dergelijke gegevensuitwisseling mogelijk maakt, gesegmenteerd in de vorm van berichten. Dit noemen we "message oriented middleware".
Met NATS kunnen applicatieontwikkelaars:
- Moeiteloos gedistribueerde en schaalbare client-server applicaties bouwen.
- Gegevens in realtime op een algemene manier opslaan en distribueren. Dit kan flexibel worden bereikt in diverse omgevingen, talen, cloud providers en on-premises systemen.
- NATS is een message broker geschreven in Go.
Embedded NATS
Indien uw applicatie in Go is geschreven, en indien dit past bij uw use case en implementatiescenario's, kunt u zelfs een NATS server in uw applicatie embedden.
- Een bijzonder kenmerk van NATS is de ondersteuning voor een embedded mode voor applicaties die in Go zijn geschreven.
- Dit betekent dat, in tegenstelling tot de gebruikelijke methode waarbij een aparte broker server wordt gestart en communicatie via clients van de applicatie plaatsvindt, de broker zelf kan worden ingebed in een in Go gemaakte applicatie.
Voordelen en use cases van embedded NATS
- Er is een goed uitgelegde Youtube video beschikbaar, waarnaar ik verwijs.
- Zelfs zonder een aparte message broker server te implementeren, kan men een modular monolith applicatie creëren en het voordeel benutten van het embedded implementeren van NATS om zo separation of concerns te bewerkstelligen. Bovendien wordt single binary deployment mogelijk.
- Dit kan nuttig worden toegepast, niet alleen op platforms zonder netwerk (WASM), maar ook in offline-first applicaties.
Voorbeeld in de officiële documentatie
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialiseer nieuwe server met opties
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Start de server via goroutine
22 go ns.Start()
23
24 // Wacht tot de server klaar is voor verbindingen
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("niet klaar voor verbinding")
27 }
28
29 // Maak verbinding met de server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Abonneer op het subject
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Print berichtdata
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Sluit de server af (optioneel)
45 ns.Shutdown()
46 })
47
48 // Publiceer data naar het subject
49 nc.Publish(subject, []byte("Hallo embedded NATS!"))
50
51 // Wacht op serverafsluiting
52 ns.WaitForShutdown()
53}
- Dit is een voorbeeld van Embedded NATS uit de officiële NATS-documentatie, maar de communicatie vindt niet plaats in embedded mode als men de voorbeeldcode volgt.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Als u de
go run .
opdracht uitvoert voor het betreffende Go-bestand, terwijl u het netwerkverkeer van en naar localhost (127.0.0.1) controleert met hetwatch 'netstat -an | grep 127.0.0.1'
commando, zult u zien dat er nieuwe netwerkverzoeken, afkomstig van de standaard NATS-poort4222
, worden toegevoegd.
Juiste configuraties voor embedded mode
Om in de gewenste embedded mode te communiceren, zijn de volgende twee opties vereist:
- Client: De
InProcessServer
optie moet worden toegevoegd. - Server: In de
Server.Options
moet deDontListen
flag alstrue
worden gespecificeerd.
- Client: De
Deze aspecten waren niet officieel gedocumenteerd; de oorsprong van deze functionaliteit kan worden afgeleid uit deze PR.
Deze PR voegt drie zaken toe:
InProcessConn()
functie naarServer
die eennet.Pipe
bouwt om een verbinding met de NATS-server te krijgen zonder TCP sockets te gebruikenDontListen
optie die de NATS-server vertelt niet te luisteren op de gebruikelijke TCP listenerstartupComplete
kanaal, dat wordt gesloten vlak voordat weAcceptLoop
starten, enreadyForConnections
zal erop wachten
De belangrijkste motivatie hiervoor is dat we een applicatie hebben die zowel in een monolithische (single-process) modus als in een polylithische (multi-process) modus kan draaien. We willen NATS voor beide modi kunnen gebruiken voor de eenvoud, maar de monolithische modus moet in staat zijn om te voldoen aan een verscheidenheid aan platforms waar het openen van socketverbindingen ofwel geen zin heeft (mobiel) of gewoon niet mogelijk is (WASM). Deze wijzigingen zullen ons in staat stellen om NATS volledig in-process te gebruiken.
Een bijbehorende PR nats-io/nats.go#774 voegt ondersteuning toe aan de clientzijde.
Dit is mijn eerste PR voor dit project, dus bij voorbaat mijn excuses als ik iets voor de hand liggends over het hoofd heb gezien.
/cc @nats-io/core
Werkend voorbeeld voor embedded mode
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // voor het configureren van de ingebedde NATS-server
14 // stel DonListen in op true
15 DontListen: true,
16 }
17
18 // Initialiseer nieuwe server met opties
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Start de server via goroutine
26 go ns.Start()
27
28 // Wacht tot de server klaar is voor verbindingen
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("niet klaar voor verbinding")
31 }
32
33 // Maak verbinding met de server via een in-process verbinding
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Abonneer op het subject
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Print berichtdata
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Sluit de server af (optioneel)
49 ns.Shutdown()
50 })
51
52 // Publiceer data naar het subject
53 nc.Publish(subject, []byte("Hallo embedded NATS!"))
54
55 // Wacht op serverafsluiting
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Nu is te zien dat er geen extra network hop optreedt, zoals beoogd.
Under the hood
TL;DR
- Dit is een sequence diagram dat aangeeft welke functies intern worden uitgevoerd wanneer de code in
main.go
wordt uitgevoerd, en de essentie is als volgt:- Via
DontListen: true
slaat de server de client listening phase, genaamdAcceptLoop
, over. - Indien de
InProcessServer
optie van de client wordt geactiveerd, wordt een in-memory verbinding tot stand gebracht en wordt een pipe gecreëerd vianet.Pipe
. Vervolgens wordt het einde van de pipe alsnet.Conn
type aan de client geretourneerd. - De client en server voeren in-process communicatie uit via deze verbinding.
- Via
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Wacht op clients.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Ten eerste, als
DontListen
true is, wordt de client listening phase, genaamdAcceptLoop
, overgeslagen.
1// nats-server/server/server.go
2
3// AcceptLoop is geëxporteerd voor eenvoudiger testen.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Als we zouden afsluiten voordat de listener correct is ingesteld,
6 // zorg er dan voor dat we het kanaal sluiten.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Maak een snapshot van de serveropties.
18 opts := s.getOpts()
19
20 // Stel de status in die afsluiting mogelijk maakt
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Fout bij het luisteren op poort: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Luisteren voor clientverbindingen op %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Waarschuwing voor ingeschakelde TLS.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS vereist voor clientverbindingen")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clients die de optie \"TLS Handshake First\" niet gebruiken, zullen geen verbinding kunnen maken")
38 }
39 }
40
41 // Als de server werd gestart met RANDOM_PORT (-1), zou opts.Port aan het begin van
42 // deze functie gelijk zijn aan 0. We moeten dus de daadwerkelijke poort ophalen
43 if opts.Port == 0 {
44 // Schrijf de opgeloste poort terug naar de opties.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Nu de poort is ingesteld (indien ingesteld op RANDOM), stel de
49 // INFO Host/Poort van de server in met waarden uit Options of
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Fout bij het instellen van server INFO met ClientAdvertise waarde van %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Houd client connect URLs bij. We hebben ze misschien later nodig.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signaal dat we geen nieuwe clients accepteren
65 s.ldmCh <- true
66 // Wacht nu op de Shutdown...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Laat de beller weten dat we klaar zijn
75 close(clr)
76 clr = nil
77}
- Ter referentie, de AcceptLoop functie voert de volgende processen uit. Deze omvatten netwerkcommunicatie-gerelateerde onderdelen zoals
TLS
enhostPort
, die overbodig zijn bij in-process communicatie en daarom kunnen worden weggelaten.
Client
InProcessServer
1
2// nats-go/nats.go
3
4// Connect zal proberen verbinding te maken met het NATS-systeem.
5// De url kan gebruikersnaam/wachtwoord semantiek bevatten. bijv. nats://derek:pass@localhost:4222
6// Door komma's gescheiden arrays worden ook ondersteund, bijv. urlA, urlB.
7// Opties beginnen met de standaardwaarden, maar kunnen worden overschreven.
8// Om verbinding te maken met de websocket-poort van een NATS-server, gebruikt u het `ws` of `wss` schema, zoals
9// `ws://localhost:8080`. Merk op dat websocket-schema's niet kunnen worden gemengd met andere (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Opties kunnen worden gebruikt om een aangepaste verbinding te creëren.
4type Options struct {
5 // Url vertegenwoordigt een enkele NATS-server-url waarmee de client
6 // verbinding zal maken. Als de Servers-optie ook is ingesteld,
7 // wordt deze de eerste server in de Servers-array.
8 Url string
9
10 // InProcessServer vertegenwoordigt een NATS-server die binnen dezelfde
11 // proces draait. Als dit is ingesteld, zullen we proberen verbinding
12 // te maken met de server direct in plaats van externe TCP-verbindingen te gebruiken.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- De
Connect
functie, die de verbinding tussen de NATS server en de NATS client tot stand brengt, kan de client URL en connect Option instellen. De Options struct, die deze opties bevat, heeft eenInProcessServer
veld van hetInProcessConnProvider
interface type.
1// main.go of voorbeeldcode
2
3// Initialiseer nieuwe server met opties
4ns, err := server.NewServer(opts)
5
6//...
7
8// Maak verbinding met de server via een in-process verbinding
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Wanneer de nats client een Connectie initieert en
nats.InProcessServer(ns)
doorgeeft aan hetInProcessServer
veld, dan:
1// nats-go/nats.go
2
3// InProcessServer is een optie die zal proberen een directe verbinding tot stand te brengen met een NATS-server
4// die binnen het proces draait, in plaats van te bellen via TCP.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- wordt de
InProcessServer
van de optie vervangen door de embedded NATS server, en
1// nats-go/nats.go
2
3// createConn zal verbinding maken met de server en de juiste
4// bufio structuren omwikkelen. Het zal het juiste doen wanneer een bestaande
5// verbinding aanwezig is.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Als we een referentie hebben naar een in-process server, maak dan een
15 // verbinding tot stand met die.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("fout bij het verkrijgen van in-process verbinding: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- wordt, indien de
InProcessServer
optie in decreateConn
functie (die de verbinding tot stand brengt) nietnil
(geldig) is, deInProcessConn
van deInProcessServer
optie uitgevoerd, en
1// nats-server/server/server.go
2
3// InProcessConn retourneert een in-process verbinding naar de server,
4// waardoor het niet nodig is om een TCP-listener te gebruiken voor lokale connectiviteit
5// binnen hetzelfde proces. Dit kan worden gebruikt ongeacht de
6// staat van de DontListen optie.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("fout bij het aanmaken van verbinding")
16 }
17 return pr, nil
18}
- wordt de
InProcessConn
die in de server is geïmplementeerd, aangeroepen en uitgevoerd. - Deze functie wordt aangeroepen wanneer de
InProcessServer
van de nc (nats connection) in de Go client van NATS,nats.go
, nietnil
is. Vervolgens wordt een verbinding (net.Conn
) tot stand gebracht en aan de serververbinding gebonden.
Consumer driven interface van Go
Een type implementeert een interface door de methoden ervan te implementeren. Er is geen expliciete intentieverklaring, geen "implements" trefwoord. Impliciete interfaces ontkoppelen de definitie van een interface van de implementatie ervan, die dan in elk pakket kan verschijnen zonder voorafgaande afspraak.
Interfaces worden impliciet geïmplementeerd, A Tour of Go
Als een type uitsluitend bestaat om een interface te implementeren en nooit geëxporteerde methoden zal hebben die verder gaan dan die interface, is het niet nodig om het type zelf te exporteren.
- Dit interface-ontwerp omvat goed wat in Go vaak een consumer-defined interface en structural typing (duck typing) wordt genoemd, en daarom wilde ik dit onderwerp ook introduceren.
1// nats-go/nats.go
2
3// Opties kunnen worden gebruikt om een aangepaste verbinding te creëren.
4type Options struct {
5 // Url vertegenwoordigt een enkele NATS-server-url waarmee de client
6 // verbinding zal maken. Als de Servers-optie ook is ingesteld,
7 // wordt deze de eerste server in de Servers-array.
8 Url string
9
10 // InProcessServer vertegenwoordigt een NATS-server die binnen dezelfde
11 // proces draait. Als dit is ingesteld, zullen we proberen verbinding
12 // te maken met de server direct in plaats van externe TCP-verbindingen te gebruiken.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Laten we teruggaan naar de code. In de nats.go client is het
InProcessServer
optie struct veld gedefinieerd als eenInProcessConnProvider
interface, die alleenInProcessConn
uitvoert.
1// nats-server/server/server.go
2
3// InProcessConn retourneert een in-process verbinding naar de server,
4// waardoor het niet nodig is om een TCP-listener te gebruiken voor lokale connectiviteit
5// binnen hetzelfde proces. Dit kan worden gebruikt ongeacht de
6// staat van de DontListen optie.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("fout bij het aanmaken van verbinding")
16 }
17 return pr, nil
18}
- Echter, het type dat daarin wordt geplaatst is de
Server
van nats-server, die naastInProcessConn
ook diverse andere functies uitvoert. - Dit komt doordat de client in dit scenario zich alleen bekommert om de vraag of de
InProcessConn
interface is geboden, en andere zaken van ondergeschikt belang zijn. - Daarom heeft de nats.go client uitsluitend een consumer-defined interface genaamd
InProcessConnProvider
gecreëerd en gebruikt, die enkel de functionaliteitInProcessConn() (net.Conn, error)
definieert.
Conclusie
- In dit artikel heb ik kort de embedded mode van NATS en de werking ervan besproken, evenals de consumer-defined interface van Go die in de NATS-code te vinden is.
- Ik hoop dat deze informatie nuttig zal zijn voor mensen die NATS voor de bovengenoemde doeleinden gebruiken, en hiermee sluit ik dit artikel af.