Hvordan kommuniserer innebygde NATS med Go-applikasjoner?
Komme i gang
Om NATS
Programvareapplikasjoner og tjenester må utveksle data. NATS er en infrastruktur som tillater slik datautveksling, segmentert i form av meldinger. Vi kaller dette en «meldingsorientert mellomvare».
Med NATS kan applikasjonsutviklere:
- Uanstrengt bygge distribuerte og skalerbare klient-server-applikasjoner.
- Lagre og distribuere data i sanntid på en generell måte. Dette kan fleksibelt oppnås på tvers av ulike miljøer, språk, skyleverandører og lokale systemer.
- NATS er en meldingsmegler konstruert i Go.
Embedded NATS
Hvis applikasjonen din er i Go, og hvis den passer til bruksområdet og distribusjonsscenariene dine, kan du til og med bygge inn en NATS-server i applikasjonen din.
Embedding NATS, NATS-dokumenter
- En særegenhet ved NATS er at den støtter embedded mode for applikasjoner konstruert i Go.
- Dette innebærer at i stedet for den konvensjonelle tilnærmingen med meldingsmeglere, som involverer drift av en separat megler-server og kommunikasjon via applikasjonens klient, er det mulig å bygge inn (embed) selve megleren direkte i en Go-basert applikasjon.
Fordeler og bruksområder for embedded NATS
- En utmerket YouTube-video forklarer dette, og jeg henviser derfor til lenken til videoen.
- Selv uten å distribuere en separat meldingsmeglerserver, kan man oppnå separate concerns ved å konstruere en modular monolith application, og dermed dra nytte av fordelene ved å integrere NATS i embedded-modus. I tillegg blir single binary deployment også mulig.
- Dette kan anvendes for platform with no network (WASM) så vel som for offline-first applikasjoner.
Eksempel i offisiell dokumentasjon
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialiser ny server med opsjoner
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Start serveren via goroutine
22 go ns.Start()
23
24 // Vent til serveren er klar for tilkoblinger
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("ikke klar for tilkobling")
27 }
28
29 // Koble til server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Abonner på emnet
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Skriv ut meldingsdata
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Steng ned serveren (valgfritt)
45 ns.Shutdown()
46 })
47
48 // Publiser data til emnet
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Vent på servernedstengning
52 ns.WaitForShutdown()
53}
- Dette er et eksempel på Embedded NATS som er publisert i den offisielle NATS-dokumentasjonen, men hvis man følger denne eksempelkoden, vil kommunikasjon i embedded mode ikke finne sted.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Ved å kjøre
go run .med kommandoenwatch 'netstat -an | grep 127.0.0.1'for å overvåke nettverkstrafikken til localhost (127.0.0.1), kan man observere at nye nettverksforespørsler som stammer fra NATS' standardport4222blir lagt til.
Korrekte konfigurasjoner for embedding mode
For å oppnå den tilsiktede kommunikasjonen i embedded mode, er det nødvendig med følgende to opsjoner:
- Klient: Opsjonen
InProcessServermå inkluderes. - Server: Flagget
DontListeniServer.Optionsmå eksplisitt settes tiltrue.
- Klient: Opsjonen
Disse spesifikke detaljene er ikke formelt dokumentert, men funksjonens opprinnelse kan spores tilbake til denne PR-en.
Denne PR-en legger til tre ting:
InProcessConn()-funksjon tilServersom bygger ennet.Pipefor å få en forbindelse til NATS-serveren uten å bruke TCP-socketsDontListen-opsjon som instruerer NATS-serveren om ikke å lytte på den vanlige TCP-lytterenstartupComplete-kanal, som lukkes rett før vi starterAcceptLoop, ogreadyForConnectionsvil vente på den
Hovedmotivasjonen for dette er at vi har en applikasjon som kan kjøre enten i en monolitisk (single-process) modus eller en polylitisk (multi-process) modus. Vi ønsker å kunne bruke NATS for begge moduser for enkelthets skyld, men monolitisk modus må kunne imøtekomme en rekke plattformer der åpning av socket-forbindelser enten ikke gir mening (mobil) eller rett og slett ikke er mulig (WASM). Disse endringene vil tillate oss å bruke NATS fullstendig in-process i stedet.
En medfølgende PR nats-io/nats.go#774 legger til støtte på klientsiden.
Dette er min første PR til dette prosjektet, så beklager på forhånd hvis jeg har oversett noe åpenbart.
/cc @nats-io/core
Fungerende eksempel for embedded mode
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // for configuring the embeded NATS server
14 // set DonListen as true
15 DontListen: true,
16 }
17
18 // Initialiser ny server med opsjoner
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Start serveren via goroutine
26 go ns.Start()
27
28 // Vent til serveren er klar for tilkoblinger
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("ikke klar for tilkobling")
31 }
32
33 // Koble til server via in-process-tilkobling
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Abonner på emnet
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Skriv ut meldingsdata
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Steng ned serveren (valgfritt)
49 ns.Shutdown()
50 })
51
52 // Publiser data til emnet
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Vent på servernedstengning
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Nå kan man observere at det ikke oppstår ytterligere nettverkshopp, i tråd med intensjonen.
Under panseret
TL;DR
- Dette er et sekvensdiagram som illustrerer hvordan funksjonene internt opererer når koden kjøres fra
main.go, og kjernen kan oppsummeres som følger:- Ved
DontListen: trueutelater serveren klientlyttefasenAcceptLoop. - Hvis klientens Connect-opsjon
InProcessServeraktiveres, opprettes en in-memory-forbindelse og en pipe vianet.Pipe, hvoretter enden av pipen returneres til klienten som ennet.Conn-type. - Klienten og serveren utfører in-process-kommunikasjon via denne forbindelsen.
- Ved
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Vent på klienter.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Først, hvis
DontListener true, utelates klientlyttefasenAcceptLoop.
1// nats-server/server/server.go
2
3// AcceptLoop er eksportert for enklere testing.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Hvis vi skulle avslutte før lytteren er riktig satt opp,
6 // sørg for at vi lukker kanalen.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Øyeblikksbilde av serveropsjoner.
18 opts := s.getOpts()
19
20 // Sett opp tilstand som kan aktivere nedstengning
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Feil ved lytting på port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Lytter etter klienttilkoblinger på %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Varsle om TLS er aktivert.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS kreves for klienttilkoblinger")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Klienter som ikke bruker \"TLS Handshake First\"-opsjonen vil mislykkes i å koble til")
38 }
39 }
40
41 // Hvis serveren ble startet med RANDOM_PORT (-1), ville opts.Port være lik
42 // 0 i begynnelsen av denne funksjonen. Så vi må få den faktiske porten
43 if opts.Port == 0 {
44 // Skriv den løste porten tilbake til opsjonene.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Nå som porten er satt (hvis den ble satt til RANDOM), sett
49 // serverens info Host/Port med enten verdier fra Opsjoner eller
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Feil ved innstilling av server INFO med ClientAdvertise-verdi av %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Hold oversikt over klientens tilkoblings-URL-er. Vi kan trenge dem senere.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signaliser at vi ikke aksepterer nye klienter
65 s.ldmCh <- true
66 // Vent nå på nedstengningen...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // La den som kaller vite at vi er klare
75 close(clr)
76 clr = nil
77}
- For øvrig utfører AcceptLoop-funksjonen følgende prosesser: Dette omhandler nettverkskommunikasjonsrelaterte aspekter som
TLSoghostPort, som kan utelates da de er unødvendige ved in-process-kommunikasjon.
Klient
InProcessServer
1
2// nats-go/nats.go
3
4// Connect vil forsøke å koble til NATS-systemet.
5// URL-en kan inneholde brukernavn/passord-semantikk. f.eks. nats://derek:pass@localhost:4222
6// Komma-separerte arrayer støttes også, f.eks. urlA, urlB.
7// Opsjoner starter med standardverdier, men kan overstyres.
8// For å koble til en NATS-servers websockets-port, bruk `ws` eller `wss`-skjemaet, for eksempel
9// `ws://localhost:8080`. Merk at websockets-skjemaer ikke kan blandes med andre (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Opsjoner kan brukes til å opprette en tilpasset tilkobling.
4type Options struct {
5 // Url representerer en enkelt NATS-server-url som klienten
6 // vil koble til. Hvis Servers-opsjonen også er satt, blir den
7 // den første serveren i Servers-arrayet.
8 Url string
9
10 // InProcessServer representerer en NATS-server som kjører innenfor
11 // samme prosess. Hvis denne er satt, vil vi forsøke å koble
12 // til serveren direkte i stedet for å bruke eksterne TCP-forbindelser.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
Connect-funksjonen, som håndterer tilkoblingen mellom NATS-serveren og NATS-klienten, tillater konfigurering av klient-URL og tilkoblingsopsjoner.Options-strukturen, som samler disse opsjonene, inneholder et felt kaltInProcessServerav typenInProcessConnProvider-grensesnitt.
1// main.go of example code
2
3// Initialiser ny server med opsjoner
4ns, err := server.NewServer(opts)
5
6//...
7
8// Koble til server via in-process-tilkobling
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Når NATS-klienten etablerer en forbindelse og sender
nats.InProcessServer(ns)tilInProcessServer-feltet, vil følgende skje:
1// nats-go/nats.go
2
3// InProcessServer er en opsjon som vil forsøke å etablere en retning til en NATS-server
4// som kjører innenfor prosessen i stedet for å ringe via TCP.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- Opsjonens InProcessServer blir substituert med den innebygde NATS-serveren.
1// nats-go/nats.go
2
3// createConn vil koble til serveren og pakke inn de passende
4// bufio-strukturene. Den vil gjøre det riktige når en eksisterende
5// tilkobling er på plass.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Hvis vi har en referanse til en in-process-server, etabler en
15 // tilkobling ved å bruke den.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("kunne ikke hente in-process-tilkobling: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- Dette grensesnittet, når
InProcessServer-opsjonen ikke er null (gyldig), utførerInProcessConn-funksjonen til InProcessServer-opsjonen icreateConn-funksjonen, som oppretter en tilkobling.
1// nats-server/server/server.go
2
3// InProcessConn returnerer en in-process-tilkobling til serveren,
4// og unngår behovet for å bruke en TCP-lytter for lokal tilkobling
5// innenfor samme prosess. Dette kan brukes uavhengig av
6// tilstanden til DontListen-opsjonen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("kunne ikke opprette tilkobling")
16 }
17 return pr, nil
18}
- Serveren kaller og utfører
InProcessConn-funksjonen. - Denne funksjonen, i Go-klienten til NATS (
nats.go), kalles nårInProcessServerfor nc (nats connection) ikke er null, og den oppretter en tilkobling (net.Conn) og binder den til serverens tilkobling.
Consumer driven interface of Go
En type implementerer et grensesnitt ved å implementere dets metoder. Det er ingen eksplisitt erklæring om intensjon, ingen "implements"-nøkkelord. Implisitte grensesnitt frakobler definisjonen av et grensesnitt fra dets implementasjon, som deretter kan forekomme i hvilken som helst pakke uten forhåndsarrangement.
Grensesnitt er implementert implisitt, A Tour of Go
Hvis en type kun eksisterer for å implementere et grensesnitt og aldri vil ha eksporterte metoder utover det grensesnittet, er det ikke nødvendig å eksportere selve typen.
- Denne grensesnittdesignen inkorporerer det som ofte refereres til som consumer defined interface og structural typing (duck typing) i Go, og jeg ønsker å introdusere dette emnet sammen med det.
1// nats-go/nats.go
2
3// Opsjoner kan brukes til å opprette en tilpasset tilkobling.
4type Options struct {
5 // Url representerer en enkelt NATS-server-url som klienten
6 // vil koble til. Hvis Servers-opsjonen også er satt, blir den
7 // den første serveren i Servers-arrayet.
8 Url string
9
10 // InProcessServer representerer en NATS-server som kjører innenfor
11 // samme prosess. Hvis denne er satt, vil vi forsøke å koble
12 // til serveren direkte i stedet for å bruke eksterne TCP-forbindelser.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- La oss nå vende tilbake til koden. I nats.go-klienten er
InProcessServer-opsjonsstrukturen definert som etInProcessConnProvider-grensesnitt som kun utførerInProcessConn.
1// nats-server/server/server.go
2
3// InProcessConn returnerer en in-process-tilkobling til serveren,
4// og unngår behovet for å bruke en TCP-lytter for lokal tilkobling
5// innenfor samme prosess. Dette kan brukes uavhengig av
6// tilstanden til DontListen-opsjonen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("kunne ikke opprette tilkobling")
16 }
17 return pr, nil
18}
- Imidlertid er typen som inngår i dette grensesnittet,
Serverfra nats-server, som utfører en rekke funksjoner utoverInProcessConn. - Dette skyldes at klientens primære interesse i denne situasjonen er om
InProcessConn-grensesnittet er tilgjengelig, mens andre aspekter er av mindre betydning. - Følgelig har nats.go-klienten kun opprettet og benytter et forbrukerdefinert grensesnitt kalt
InProcessConnProvider, som kun definerer funksjonalitetenInProcessConn() (net.Conn, error).
Konklusjon
- Denne artikkelen har kortfattet behandlet NATS' embedded mode, dens virkemåte, og Go's consumer defined interface som kan observeres gjennom NATS' kode.
- Jeg håper denne informasjonen er nyttig for de som benytter NATS til lignende formål.