Hvordan kommuniserer innebygde NATS med Go-applikasjoner?
Komme i gang
Om NATS
Programvareapplikasjoner og tjenester må utveksle data. NATS er en infrastruktur som tillater slik datautveksling, segmentert i form av meldinger. Vi kaller dette en «meldingsorientert mellomvare».
Med NATS kan applikasjonsutviklere:
- Uanstrengt bygge distribuerte og skalerbare klient-server-applikasjoner.
- Lagre og distribuere data i sanntid på en generell måte. Dette kan fleksibelt oppnås på tvers av ulike miljøer, språk, skyleverandører og lokale systemer.
- NATS er en meldingsmegler konstruert i Go.
Embedded NATS
Hvis applikasjonen din er i Go, og hvis den passer til bruksområdet og distribusjonsscenariene dine, kan du til og med bygge inn en NATS-server i applikasjonen din.
Embedding NATS, NATS-dokumenter
- En særegenhet ved NATS er at den støtter embedded mode for applikasjoner konstruert i Go.
- Dette innebærer at i stedet for den konvensjonelle tilnærmingen med meldingsmeglere, som involverer drift av en separat megler-server og kommunikasjon via applikasjonens klient, er det mulig å bygge inn (embed) selve megleren direkte i en Go-basert applikasjon.
Fordeler og bruksområder for embedded NATS
- En utmerket YouTube-video forklarer dette, og jeg henviser derfor til lenken til videoen.
- Selv uten å distribuere en separat meldingsmeglerserver, kan man oppnå separate concerns ved å konstruere en modular monolith application, og dermed dra nytte av fordelene ved å integrere NATS i embedded-modus. I tillegg blir single binary deployment også mulig.
- Dette kan anvendes for platform with no network (WASM) så vel som for offline-first applikasjoner.
Eksempel i offisiell dokumentasjon
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialiser ny server med opsjoner
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Start serveren via goroutine
22 go ns.Start()
23
24 // Vent til serveren er klar for tilkoblinger
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("ikke klar for tilkobling")
27 }
28
29 // Koble til server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Abonner på emnet
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Skriv ut meldingsdata
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Steng ned serveren (valgfritt)
45 ns.Shutdown()
46 })
47
48 // Publiser data til emnet
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Vent på servernedstengning
52 ns.WaitForShutdown()
53}
- Dette er et eksempel på Embedded NATS som er publisert i den offisielle NATS-dokumentasjonen, men hvis man følger denne eksempelkoden, vil kommunikasjon i embedded mode ikke finne sted.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Ved å kjøre
go run .
med kommandoenwatch 'netstat -an | grep 127.0.0.1'
for å overvåke nettverkstrafikken til localhost (127.0.0.1), kan man observere at nye nettverksforespørsler som stammer fra NATS' standardport4222
blir lagt til.
Korrekte konfigurasjoner for embedding mode
For å oppnå den tilsiktede kommunikasjonen i embedded mode, er det nødvendig med følgende to opsjoner:
- Klient: Opsjonen
InProcessServer
må inkluderes. - Server: Flagget
DontListen
iServer.Options
må eksplisitt settes tiltrue
.
- Klient: Opsjonen
Disse spesifikke detaljene er ikke formelt dokumentert, men funksjonens opprinnelse kan spores tilbake til denne PR-en.
Denne PR-en legger til tre ting:
InProcessConn()
-funksjon tilServer
som bygger ennet.Pipe
for å få en forbindelse til NATS-serveren uten å bruke TCP-socketsDontListen
-opsjon som instruerer NATS-serveren om ikke å lytte på den vanlige TCP-lytterenstartupComplete
-kanal, som lukkes rett før vi starterAcceptLoop
, ogreadyForConnections
vil vente på den
Hovedmotivasjonen for dette er at vi har en applikasjon som kan kjøre enten i en monolitisk (single-process) modus eller en polylitisk (multi-process) modus. Vi ønsker å kunne bruke NATS for begge moduser for enkelthets skyld, men monolitisk modus må kunne imøtekomme en rekke plattformer der åpning av socket-forbindelser enten ikke gir mening (mobil) eller rett og slett ikke er mulig (WASM). Disse endringene vil tillate oss å bruke NATS fullstendig in-process i stedet.
En medfølgende PR nats-io/nats.go#774 legger til støtte på klientsiden.
Dette er min første PR til dette prosjektet, så beklager på forhånd hvis jeg har oversett noe åpenbart.
/cc @nats-io/core
Fungerende eksempel for embedded mode
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // for configuring the embeded NATS server
14 // set DonListen as true
15 DontListen: true,
16 }
17
18 // Initialiser ny server med opsjoner
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Start serveren via goroutine
26 go ns.Start()
27
28 // Vent til serveren er klar for tilkoblinger
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("ikke klar for tilkobling")
31 }
32
33 // Koble til server via in-process-tilkobling
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Abonner på emnet
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Skriv ut meldingsdata
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Steng ned serveren (valgfritt)
49 ns.Shutdown()
50 })
51
52 // Publiser data til emnet
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Vent på servernedstengning
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Nå kan man observere at det ikke oppstår ytterligere nettverkshopp, i tråd med intensjonen.
Under panseret
TL;DR
- Dette er et sekvensdiagram som illustrerer hvordan funksjonene internt opererer når koden kjøres fra
main.go
, og kjernen kan oppsummeres som følger:- Ved
DontListen: true
utelater serveren klientlyttefasenAcceptLoop
. - Hvis klientens Connect-opsjon
InProcessServer
aktiveres, opprettes en in-memory-forbindelse og en pipe vianet.Pipe
, hvoretter enden av pipen returneres til klienten som ennet.Conn
-type. - Klienten og serveren utfører in-process-kommunikasjon via denne forbindelsen.
- Ved
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Vent på klienter.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Først, hvis
DontListen
er true, utelates klientlyttefasenAcceptLoop
.
1// nats-server/server/server.go
2
3// AcceptLoop er eksportert for enklere testing.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Hvis vi skulle avslutte før lytteren er riktig satt opp,
6 // sørg for at vi lukker kanalen.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Øyeblikksbilde av serveropsjoner.
18 opts := s.getOpts()
19
20 // Sett opp tilstand som kan aktivere nedstengning
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Feil ved lytting på port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Lytter etter klienttilkoblinger på %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Varsle om TLS er aktivert.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS kreves for klienttilkoblinger")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Klienter som ikke bruker \"TLS Handshake First\"-opsjonen vil mislykkes i å koble til")
38 }
39 }
40
41 // Hvis serveren ble startet med RANDOM_PORT (-1), ville opts.Port være lik
42 // 0 i begynnelsen av denne funksjonen. Så vi må få den faktiske porten
43 if opts.Port == 0 {
44 // Skriv den løste porten tilbake til opsjonene.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Nå som porten er satt (hvis den ble satt til RANDOM), sett
49 // serverens info Host/Port med enten verdier fra Opsjoner eller
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Feil ved innstilling av server INFO med ClientAdvertise-verdi av %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Hold oversikt over klientens tilkoblings-URL-er. Vi kan trenge dem senere.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signaliser at vi ikke aksepterer nye klienter
65 s.ldmCh <- true
66 // Vent nå på nedstengningen...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // La den som kaller vite at vi er klare
75 close(clr)
76 clr = nil
77}
- For øvrig utfører AcceptLoop-funksjonen følgende prosesser: Dette omhandler nettverkskommunikasjonsrelaterte aspekter som
TLS
oghostPort
, som kan utelates da de er unødvendige ved in-process-kommunikasjon.
Klient
InProcessServer
1
2// nats-go/nats.go
3
4// Connect vil forsøke å koble til NATS-systemet.
5// URL-en kan inneholde brukernavn/passord-semantikk. f.eks. nats://derek:pass@localhost:4222
6// Komma-separerte arrayer støttes også, f.eks. urlA, urlB.
7// Opsjoner starter med standardverdier, men kan overstyres.
8// For å koble til en NATS-servers websockets-port, bruk `ws` eller `wss`-skjemaet, for eksempel
9// `ws://localhost:8080`. Merk at websockets-skjemaer ikke kan blandes med andre (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Opsjoner kan brukes til å opprette en tilpasset tilkobling.
4type Options struct {
5 // Url representerer en enkelt NATS-server-url som klienten
6 // vil koble til. Hvis Servers-opsjonen også er satt, blir den
7 // den første serveren i Servers-arrayet.
8 Url string
9
10 // InProcessServer representerer en NATS-server som kjører innenfor
11 // samme prosess. Hvis denne er satt, vil vi forsøke å koble
12 // til serveren direkte i stedet for å bruke eksterne TCP-forbindelser.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
Connect
-funksjonen, som håndterer tilkoblingen mellom NATS-serveren og NATS-klienten, tillater konfigurering av klient-URL og tilkoblingsopsjoner.Options
-strukturen, som samler disse opsjonene, inneholder et felt kaltInProcessServer
av typenInProcessConnProvider
-grensesnitt.
1// main.go of example code
2
3// Initialiser ny server med opsjoner
4ns, err := server.NewServer(opts)
5
6//...
7
8// Koble til server via in-process-tilkobling
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Når NATS-klienten etablerer en forbindelse og sender
nats.InProcessServer(ns)
tilInProcessServer
-feltet, vil følgende skje:
1// nats-go/nats.go
2
3// InProcessServer er en opsjon som vil forsøke å etablere en retning til en NATS-server
4// som kjører innenfor prosessen i stedet for å ringe via TCP.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- Opsjonens InProcessServer blir substituert med den innebygde NATS-serveren.
1// nats-go/nats.go
2
3// createConn vil koble til serveren og pakke inn de passende
4// bufio-strukturene. Den vil gjøre det riktige når en eksisterende
5// tilkobling er på plass.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Hvis vi har en referanse til en in-process-server, etabler en
15 // tilkobling ved å bruke den.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("kunne ikke hente in-process-tilkobling: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- Dette grensesnittet, når
InProcessServer
-opsjonen ikke er null (gyldig), utførerInProcessConn
-funksjonen til InProcessServer-opsjonen icreateConn
-funksjonen, som oppretter en tilkobling.
1// nats-server/server/server.go
2
3// InProcessConn returnerer en in-process-tilkobling til serveren,
4// og unngår behovet for å bruke en TCP-lytter for lokal tilkobling
5// innenfor samme prosess. Dette kan brukes uavhengig av
6// tilstanden til DontListen-opsjonen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("kunne ikke opprette tilkobling")
16 }
17 return pr, nil
18}
- Serveren kaller og utfører
InProcessConn
-funksjonen. - Denne funksjonen, i Go-klienten til NATS (
nats.go
), kalles nårInProcessServer
for nc (nats connection) ikke er null, og den oppretter en tilkobling (net.Conn
) og binder den til serverens tilkobling.
Consumer driven interface of Go
En type implementerer et grensesnitt ved å implementere dets metoder. Det er ingen eksplisitt erklæring om intensjon, ingen "implements"-nøkkelord. Implisitte grensesnitt frakobler definisjonen av et grensesnitt fra dets implementasjon, som deretter kan forekomme i hvilken som helst pakke uten forhåndsarrangement.
Grensesnitt er implementert implisitt, A Tour of Go
Hvis en type kun eksisterer for å implementere et grensesnitt og aldri vil ha eksporterte metoder utover det grensesnittet, er det ikke nødvendig å eksportere selve typen.
- Denne grensesnittdesignen inkorporerer det som ofte refereres til som consumer defined interface og structural typing (duck typing) i Go, og jeg ønsker å introdusere dette emnet sammen med det.
1// nats-go/nats.go
2
3// Opsjoner kan brukes til å opprette en tilpasset tilkobling.
4type Options struct {
5 // Url representerer en enkelt NATS-server-url som klienten
6 // vil koble til. Hvis Servers-opsjonen også er satt, blir den
7 // den første serveren i Servers-arrayet.
8 Url string
9
10 // InProcessServer representerer en NATS-server som kjører innenfor
11 // samme prosess. Hvis denne er satt, vil vi forsøke å koble
12 // til serveren direkte i stedet for å bruke eksterne TCP-forbindelser.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- La oss nå vende tilbake til koden. I nats.go-klienten er
InProcessServer
-opsjonsstrukturen definert som etInProcessConnProvider
-grensesnitt som kun utførerInProcessConn
.
1// nats-server/server/server.go
2
3// InProcessConn returnerer en in-process-tilkobling til serveren,
4// og unngår behovet for å bruke en TCP-lytter for lokal tilkobling
5// innenfor samme prosess. Dette kan brukes uavhengig av
6// tilstanden til DontListen-opsjonen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("kunne ikke opprette tilkobling")
16 }
17 return pr, nil
18}
- Imidlertid er typen som inngår i dette grensesnittet,
Server
fra nats-server, som utfører en rekke funksjoner utoverInProcessConn
. - Dette skyldes at klientens primære interesse i denne situasjonen er om
InProcessConn
-grensesnittet er tilgjengelig, mens andre aspekter er av mindre betydning. - Følgelig har nats.go-klienten kun opprettet og benytter et forbrukerdefinert grensesnitt kalt
InProcessConnProvider
, som kun definerer funksjonalitetenInProcessConn() (net.Conn, error)
.
Konklusjon
- Denne artikkelen har kortfattet behandlet NATS' embedded mode, dens virkemåte, og Go's consumer defined interface som kan observeres gjennom NATS' kode.
- Jeg håper denne informasjonen er nyttig for de som benytter NATS til lignende formål.