Hoe communiceren ingebedde NATS met een Go-applicatie?
Aan de slag
Over NATS
Softwareapplicaties en -diensten moeten gegevens uitwisselen. NATS is een infrastructuur die een dergelijke gegevensuitwisseling mogelijk maakt, gesegmenteerd in de vorm van berichten. Dit noemen we een "berichtgeoriënteerde middleware".
Met NATS kunnen applicatieontwikkelaars:
- Moeiteloos gedistribueerde en schaalbare client-serverapplicaties bouwen.
- Gegevens in realtime op een algemene manier opslaan en distribueren. Dit kan flexibel worden bereikt in verschillende omgevingen, talen, cloudproviders en on-premises systemen.
- NATS is een message broker die is gebouwd met Go.
Embedded NATS
Als uw applicatie in Go is geschreven, en als het past bij uw use case en implementatiescenario's, kunt u zelfs een NATS-server binnen uw applicatie embedden.
NATS embedden, NATS documentatie
- Een bijzonder kenmerk van NATS is de ondersteuning voor de embedded modus voor applicaties die in Go zijn geschreven.
- Dit betekent dat, in plaats van de gebruikelijke methode voor message brokers waarbij een aparte brokerserver wordt gestart en communicatie plaatsvindt via clients van de applicatie met die server, de broker zelf kan worden ingebed in een Go-applicatie.
Voordelen en gebruiksscenario's van embedded NATS
- Er is een goed uitgelegde Youtube-video, dus ik volsta met een link naar die video.
- Zonder de noodzaak om een aparte message broker server te implementeren, kan men een modulaire monolith applicatie creëren en het principe van 'separation of concerns' bereiken, terwijl tegelijkertijd de voordelen van een embedded NATS worden benut. Bovendien wordt zo ook een single binary deployment mogelijk.
- Dit is niet alleen bruikbaar voor platforms zonder netwerk (WASM), maar ook voor offline-first applicaties.
Voorbeeld uit de officiële documentatie
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialiseer een nieuwe server met opties
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Start de server via goroutine
22 go ns.Start()
23
24 // Wacht tot de server klaar is voor verbindingen
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("niet klaar voor verbinding")
27 }
28
29 // Maak verbinding met de server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Abonneer op het onderwerp
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Print berichtgegevens
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Sluit de server af (optioneel)
45 ns.Shutdown()
46 })
47
48 // Publiceer gegevens naar het onderwerp
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Wacht op het afsluiten van de server
52 ns.WaitForShutdown()
53}
- Dit is een voorbeeld van Embedded NATS dat wordt aangeboden in de officiële NATS-documentatie, maar als men deze voorbeeldcode uitvoert, vindt er geen communicatie plaats in de embedding-modus.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Wanneer u het commando
watch 'netstat -an | grep 127.0.0.1'gebruikt om het netwerkverkeer van en naar localhost (127.0.0.1) te controleren en vervolgens het Go-bestand uitvoert metgo run ., zult u zien dat er nieuwe netwerkverzoeken worden toegevoegd die afkomstig zijn van de standaard NATS-poort4222.
Juiste configuraties voor de embedding-modus
Om te communiceren in de bedoelde embedded modus, zijn de volgende twee opties vereist:
- Client: De
InProcessServer-optie moet worden toegevoegd. - Server: De
DontListen-vlag inServer.Optionsmoet optrueworden ingesteld.
- Client: De
Deze aspecten zijn niet officieel gedocumenteerd, en de oorsprong van deze functionaliteit kan worden achterhaald via deze PR.
Deze PR voegt drie zaken toe:
- Een
InProcessConn()-functie aanServerdie eennet.Pipeopbouwt om een verbinding met de NATS-server te verkrijgen zonder gebruik te maken van TCP-sockets. - Een
DontListen-optie die de NATS-server vertelt om niet te luisteren op de gebruikelijke TCP-listener. - Een
startupComplete-kanaal, dat wordt gesloten net voordat weAcceptLoopstarten, enreadyForConnectionszal erop wachten.
De belangrijkste motivatie hiervoor is dat we een applicatie hebben die zowel in een monolithische (single-process) modus als in een polylithische (multi-process) modus kan draaien. We willen NATS voor beide modi kunnen gebruiken voor de eenvoud, maar de monolithische modus moet geschikt zijn voor een verscheidenheid aan platforms waar het openen van socketverbindingen ofwel geen zin heeft (mobiel) of gewoon niet mogelijk is (WASM). Deze wijzigingen zullen ons in staat stellen om NATS volledig in-process te gebruiken.
Een bijbehorende PR nats-io/nats.go#774 voegt ondersteuning toe aan de clientzijde.
Dit is mijn eerste PR voor dit project, dus bij voorbaat mijn excuses als ik iets voor de hand liggends over het hoofd heb gezien.
/cc @nats-io/core
- Een
Werkend voorbeeld voor de embedded modus
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // voor het configureren van de ingebedde NATS-server
14 // stel DontListen in op true
15 DontListen: true,
16 }
17
18 // Initialiseer een nieuwe server met opties
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Start de server via goroutine
26 go ns.Start()
27
28 // Wacht tot de server klaar is voor verbindingen
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("niet klaar voor verbinding")
31 }
32
33 // Maak verbinding met de server via in-process verbinding
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Abonneer op het onderwerp
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Print berichtgegevens
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Sluit de server af (optioneel)
49 ns.Shutdown()
50 })
51
52 // Publiceer gegevens naar het onderwerp
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Wacht op het afsluiten van de server
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...geen aanvullende logboeken
4
- Nu is te zien dat er geen extra netwerkhop plaatsvindt, zoals bedoeld.
Onder de motorkap
TL;DR
- Dit is een sequentiediagram dat weergeeft welke interne functies en hoe deze functioneren wanneer de code in
main.gowordt uitgevoerd. De kernpunten worden hieronder toegelicht:- Door
DontListen: truete gebruiken, slaat de server de client listening phase genaamdAcceptLoopover. - Indien de
InProcessServer-optie van de client Connect-optie is geactiveerd, wordt een in-memory verbinding tot stand gebracht. Hierbij wordt een pipe gecreëerd vianet.Pipe, waarna het einde van de pipe als eennet.Conn-type aan de client wordt geretourneerd. - De client en server communiceren via deze verbinding in-process.
- Door
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Wacht op clients.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Ten eerste, wanneer
DontListenwaar is, wordt de client listening phase genaamdAcceptLoopovergeslagen.
1// nats-server/server/server.go
2
3// AcceptLoop is geëxporteerd voor eenvoudiger testen.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Als we zouden afsluiten voordat de listener correct is ingesteld,
6 // zorg er dan voor dat we het kanaal sluiten.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Maak een snapshot van de serveropties.
18 opts := s.getOpts()
19
20 // Stel de status in die afsluiten mogelijk maakt
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Fout bij het luisteren op poort: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Luistert naar clientverbindingen op %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Waarschuw als TLS is ingeschakeld.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS vereist voor clientverbindingen")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clients die de optie \"TLS Handshake First\" niet gebruiken, zullen geen verbinding kunnen maken")
38 }
39 }
40
41 // Als de server is gestart met RANDOM_PORT (-1), zou opts.Port aan het begin van deze functie
42 // gelijk zijn aan 0. We moeten dus de daadwerkelijke poort verkrijgen
43 if opts.Port == 0 {
44 // Schrijf de opgeloste poort terug naar de opties.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Nu de poort is ingesteld (indien ingesteld op RANDOM), stel de
49 // server's info Host/Port in met waarden uit Options of
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Fout bij het instellen van server INFO met ClientAdvertise waarde van %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Houd de client connect URL's bij. We hebben ze mogelijk later nodig.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signaleer dat we geen nieuwe clients accepteren
65 s.ldmCh <- true
66 // Wacht nu op de Shutdown...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Laat de aanroeper weten dat we klaar zijn
75 close(clr)
76 clr = nil
77}
- Ter referentie, de functie AcceptLoop doorloopt de volgende stappen. Deze zijn gerelateerd aan netwerkcommunicatie, zoals TLS en hostPort, en kunnen worden overgeslagen omdat ze niet nodig zijn bij in-process communicatie.
Client
InProcessServer
1
2// nats-go/nats.go
3
4// Connect zal proberen verbinding te maken met het NATS-systeem.
5// De URL kan gebruikersnaam/wachtwoord-semantiek bevatten. bijv. nats://derek:pass@localhost:4222
6// Komma-gescheiden arrays worden ook ondersteund, bijv. urlA, urlB.
7// Opties beginnen met de standaardwaarden, maar kunnen worden overschreven.
8// Om verbinding te maken met de websocket-poort van een NATS-server, gebruikt u het `ws` of `wss`-schema, zoals
9// `ws://localhost:8080`. Merk op dat websocket-schema's niet kunnen worden gemengd met andere (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Opties kunnen worden gebruikt om een aangepaste verbinding te creëren.
4type Options struct {
5 // Url vertegenwoordigt een enkele NATS-server-URL waarmee de client
6 // verbinding zal maken. Als de Servers-optie ook is ingesteld, wordt deze
7 // dan de eerste server in de Servers-array.
8 Url string
9
10 // InProcessServer vertegenwoordigt een NATS-server die binnen hetzelfde
11 // proces draait. Als dit is ingesteld, zullen we proberen rechtstreeks
12 // verbinding te maken met de server in plaats van externe TCP-verbindingen te gebruiken.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- De
Connect-functie, die de verbinding tussen de NATS-server en de NATS-client tot stand brengt, maakt het mogelijk om de client-URL en verbindingsopties te configureren. DeOptions-structuur, waarin deze opties zijn verzameld, bevat een veld genaamdInProcessServervan het typeInProcessConnProvider-interface.
1// main.go van voorbeeldcode
2
3// Initialiseer een nieuwe server met opties
4ns, err := server.NewServer(opts)
5
6//...
7
8// Maak verbinding met de server via in-process verbinding
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Wanneer de NATS-client een verbinding initieert en
nats.InProcessServer(ns)alsInProcessServer-veld doorgeeft,
1// nats-go/nats.go
2
3// InProcessServer is een optie die zal proberen een richting naar een NATS-server
4// te bepalen die binnen het proces draait in plaats van via TCP te bellen.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- De InProcessServer van de optie wordt vervangen door de embedded NATS-server.
1// nats-go/nats.go
2
3// createConn maakt verbinding met de server en wikkelt de juiste
4// bufio-structuren in. Het zal het juiste doen wanneer een bestaande
5// verbinding aanwezig is.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Als we een verwijzing hebben naar een in-process server, dan maken we een
15 // verbinding tot stand met behulp daarvan.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("niet gelukt om in-process verbinding te verkrijgen: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- Deze interface wordt aangeroepen wanneer de
InProcessServer-optie in de functiecreateConnnietnil(geldig) is, waarbij deInProcessConnvan de InProcessServer in de optie wordt uitgevoerd.
1// nats-server/server/server.go
2
3// InProcessConn retourneert een in-process verbinding met de server,
4// waardoor de noodzaak om een TCP-listener te gebruiken voor lokale connectiviteit
5// binnen hetzelfde proces wordt vermeden. Dit kan worden gebruikt ongeacht de
6// status van de DontListen-optie.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("niet gelukt om verbinding te maken")
16 }
17 return pr, nil
18}
- De in de server geïmplementeerde
InProcessConnwordt aangeroepen en uitgevoerd. - Deze functie, indien aangeroepen wanneer
InProcessServervannc(nats-verbinding) in de Go-client van NATS (nats.go) nietnilis, creëert een verbinding (net.Conn) en bindt deze aan de serververbinding.
Consumer driven interface van Go
Een type implementeert een interface door zijn methoden te implementeren. Er is geen expliciete intentieverklaring, geen "implements" trefwoord. Impliciete interfaces ontkoppelen de definitie van een interface van de implementatie ervan, die dan in elk pakket kan verschijnen zonder voorafgaande afspraak.
Interfaces worden impliciet geïmplementeerd, A Tour of Go
Als een type alleen bestaat om een interface te implementeren en nooit geëxporteerde methoden buiten die interface zal hebben, is het niet nodig om het type zelf te exporteren.
- Dit interface-ontwerp omvat het zogenaamde "consumer defined interface" en "structural typing" (duck typing) in Go, en ik wil deze onderwerpen kort introduceren.
1// nats-go/nats.go
2
3// Opties kunnen worden gebruikt om een aangepaste verbinding te creëren.
4type Options struct {
5 // Url vertegenwoordigt een enkele NATS-server-URL waarmee de client
6 // verbinding zal maken. Als de Servers-optie ook is ingesteld, wordt deze
7 // dan de eerste server in de Servers-array.
8 Url string
9
10 // InProcessServer vertegenwoordigt een NATS-server die binnen hetzelfde
11 // proces draait. Als dit is ingesteld, zullen we proberen rechtstreeks
12 // verbinding te maken met de server in plaats van externe TCP-verbindingen te gebruiken.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Laten we teruggaan naar de code. In de NATS.go-client is het
InProcessServer-optiestructuurveld gedefinieerd als deInProcessConnProvider-interface, die alleenInProcessConnuitvoert.
1// nats-server/server/server.go
2
3// InProcessConn retourneert een in-process verbinding met de server,
4// waardoor de noodzaak om een TCP-listener te gebruiken voor lokale connectiviteit
5// binnen hetzelfde proces wordt vermeden. Dit kan worden gebruikt ongeacht de
6// status van de DontListen-optie.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("niet gelukt om verbinding te maken")
16 }
17 return pr, nil
18}
- Het type dat erin wordt ingevoerd, is echter de
Servervan nats-server, die naast InProcessConn ook diverse andere functionaliteiten uitvoert. - Dit komt doordat de client in deze situatie alleen geïnteresseerd is in de vraag of de
InProcessConn-interface is geleverd, en andere aspecten zijn van ondergeschikt belang. - Daarom heeft de nats.go-client alleen een "consumer defined interface" genaamd
InProcessConnProvidergecreëerd en gebruikt, die alleen de functionaliteitInProcessConn() (net.Conn, error)definieert.
Conclusie
- Ik heb kort de embedded modus van NATS, de werking ervan, en de consumer defined interface van Go, zoals te zien is in de NATS-code, behandeld.
- Ik hoop dat deze informatie nuttig zal zijn voor mensen die NATS voor de bovengenoemde doeleinden gebruiken, en hiermee sluit ik dit artikel af.