Hvordan kommuniserer innebygde NATS med Go-applikasjoner?
Komme i gang
Om NATS
Programvareapplikasjoner og -tjenester må utveksle data. NATS er en infrastruktur som tillater slik datautveksling, segmentert i form av meldinger. Vi kaller dette en " message oriented middleware".
Med NATS kan applikasjonsutviklere:
- Bygge distribuerte og skalerbare klient-server-applikasjoner uten anstrengelse.
- Lagre og distribuere data i sanntid på en generell måte. Dette kan fleksibelt oppnås på tvers av ulike miljøer, språk, skyleverandører og on-premises systemer.
- NATS er en message broker utviklet i Go.
Embedded NATS
Hvis applikasjonen din er i Go, og hvis det passer ditt bruksområde og dine distribusjonsscenarier, kan du til og med embedde en NATS server inne i applikasjonen din.
- Og det er en spesiell egenskap ved NATS, nemlig at den støtter embedded mode for applikasjoner utviklet i Go.
- Det betyr at i stedet for den vanlige metoden for message brokers, som innebærer å starte en separat broker-server og kommunisere med den via applikasjonens klient, kan brokeren embeddes direkte i Go-applikasjonen.
Fordeler og bruksområder for embedded NATS
- Det er en godt forklarende Youtube-video som jeg henviser til.
- Selv uten å distribuere en separat message broker-server, kan man oppnå fordelen med å embedde NATS ved å lage en modular monolith application, som også oppnår separate of concern. I tillegg muliggjør dette single binary deployment.
- Den kan brukes for platform with no network (WASM), samt i offline-first applicationer.
Eksempel fra offisielle dokumenter
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialiser ny server med opsjoner
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Start serveren via goroutine
22 go ns.Start()
23
24 // Vent til serveren er klar for tilkoblinger
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Koble til serveren
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Abonner på emnet
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Skriv ut meldingsdata
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Stopp serveren (valgfritt)
45 ns.Shutdown()
46 })
47
48 // Publiser data til emnet
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Vent på at serveren skal stoppe
52 ns.WaitForShutdown()
53}
- Dette er et eksempel på embedded NATS fra NATs' offisielle dokumentasjon, men hvis man følger denne eksempelkode, vil kommunikasjonen ikke skje i embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Når man kjører
go run .med denne Go-filen, samtidig som man overvåker nettverkstrafikken til localhost (127.0.0.1) med kommandoenwatch 'netstat -an | grep 127.0.0.1', kan man observere at nye nettverksforespørsler som starter fra NATS' standardport4222blir lagt til.
Riktige konfigurasjoner for embedding mode
For å kommunisere i embedded mode som tiltenkt, er følgende to opsjoner nødvendige.
- Klient:
InProcessServer-opsjonen må inkluderes. - Server:
DontListen-flagget iServer.Optionsmå settes tiltrue.
- Klient:
Disse delene var ikke offisielt dokumentert, og opprinnelsen til denne funksjonaliteten kan spores tilbake til denne PR-en.
This PR adds three things:
InProcessConn()function toServerwhich builds anet.Pipeto get a connection to the NATS server without using TCP socketsDontListenoption which tells the NATS server not to listen on the usual TCP listenerstartupCompletechannel, which is closed right before we startAcceptLoop, andreadyForConnectionswill wait for it
The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.
An accompanying PR nats-io/nats.go#774 adds support to the client side.
This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.
/cc @nats-io/core
Fungerende eksempel for embedded mode
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // for configuring the embeded NATS server
14 // set DonListen as true
15 DontListen: true,
16 }
17
18 // Initialiser ny server med opsjoner
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Start serveren via goroutine
26 go ns.Start()
27
28 // Vent til serveren er klar for tilkoblinger
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Koble til serveren via in-process-tilkobling
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Abonner på emnet
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Skriv ut meldingsdata
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Stopp serveren (valgfritt)
49 ns.Shutdown()
50 })
51
52 // Publiser data til emnet
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Vent på at serveren skal stoppe
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Nå kan man se at det ikke oppstår ytterligere network hops, som tiltenkt.
Under the hood
TL;DR
- Dette er et sequence diagram som viser hvilke funksjoner som opererer internt og hvordan de fungerer når koden kjøres i
main.go, og hovedpunktene er som følger:- Ved
DontListen: truehopper serveren over klientens lyttingfase kaltAcceptLoop. - Hvis klientens
Connect-opsjonInProcessServerer aktivert, opprettes en in-memory connection og et pipe vianet.Pipe, deretter returneres slutten av pipen til klienten som ennet.Conn-type. - Klienten og serveren utfører in-process communication via denne connectionen.
- Ved
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Vent på klienter.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Først, hvis
DontListenertrue, hopper serveren over klientens lyttingfase kaltAcceptLoop.
1// nats-server/server/server.go
2
3// AcceptLoop er eksportert for enklere testing.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Hvis vi skulle avslutte før lytteren er satt opp riktig,
6 // sørg for at vi lukker kanalen.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Snapshot serveropsjoner.
18 opts := s.getOpts()
19
20 // Sett opp tilstand som kan muliggjøre nedstengning
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Error listening on port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Listening for client connections on %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Varsle om TLS er aktivert.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS required for client connections")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38 }
39 }
40
41 // Hvis serveren ble startet med RANDOM_PORT (-1), ville opts.Port være lik
42 // 0 i begynnelsen av denne funksjonen. Så vi må få den faktiske porten
43 if opts.Port == 0 {
44 // Skriv løst port tilbake til opsjoner.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Nå som porten er satt (hvis den ble satt til RANDOM), sett
49 // serverens info Host/Port med enten verdier fra Options eller
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Hold oversikt over klienttilkoblings-URL-er. Vi kan trenge dem senere.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signaliser at vi ikke godtar nye klienter
65 s.ldmCh <- true
66 // Nå venter vi på Shutdown...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // La den som kaller vite at vi er klare
75 close(clr)
76 clr = nil
77}
- For øvrig utfører
AcceptLoop-funksjonen følgende prosesser: Den omhandler nettverkskommunikasjonsrelaterte deler somTLSoghostPort. Disse delene er overflødige ved in-process communication og kan derfor utelates.
Klient
InProcessServer
1
2// nats-go/nats.go
3
4// Connect vil forsøke å koble til NATS-systemet.
5// URL-en kan inneholde brukernavn/passord-semantikk. f.eks. nats://derek:pass@localhost:4222
6// Komma-separerte arrays støttes også, f.eks. urlA, urlB.
7// Opsjoner starter med standardinnstillingene, men kan overstyres.
8// For å koble til en NATS Servers websocket-port, bruk `ws` eller `wss` skjemaet, for eksempel
9// `ws://localhost:8080`. Merk at websocket-skjemaer ikke kan blandes med andre (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options kan brukes til å opprette en tilpasset tilkobling.
4type Options struct {
5 // Url representerer en enkelt NATS-server-URL som klienten
6 // vil koble til. Hvis Servers-opsjonen også er satt, blir den
7 // den første serveren i Servers-arrayet.
8 Url string
9
10 // InProcessServer representerer en NATS-server som kjører innenfor
11 // samme prosess. Hvis dette er satt, vil vi forsøke å koble til
12 // serveren direkte i stedet for å bruke eksterne TCP-tilkoblinger.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
Connect-funksjonen, som håndterer tilkoblingen mellom NATS server og NATS client, lar deg konfigurere klientens URL og tilkoblingsalternativer.Options-structen, som samler disse alternativene, inneholder et felt kaltInProcessServerav typenInProcessConnProviderinterface.
1// main.go of example code
2
3// Initialiser ny server med opsjoner
4ns, err := server.NewServer(opts)
5
6//...
7
8// Koble til serveren via in-process-tilkobling
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Når
Connectutføres på nats client, ognats.InProcessServer(ns)sendes somInProcessServer-feltet, så
1// nats-go/nats.go
2
3// createConn vil koble til serveren og pakke inn de passende
4// bufio-strukturene. Den vil gjøre det riktige når en eksisterende
5// tilkobling er på plass.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Hvis vi har en referanse til en in-process-server, etablerer vi en
15 // tilkobling ved å bruke den.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("failed to get in-process connection: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- vil dette grensesnittet, i
createConn-funksjonen som oppretter en tilkobling, utføreInProcessConnfraInProcessServer-opsjonen nårInProcessServer-opsjonen ikke ernil(gyldig).
1// nats-server/server/server.go
2
3// InProcessConn returnerer en in-process-tilkobling til serveren,
4// og unngår behovet for å bruke en TCP-lytter for lokal tilkobling
5// innenfor samme prosess. Dette kan brukes uavhengig av
6// tilstanden til DontListen-opsjonen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- kaller og utfører
InProcessConnimplementert på serveren. - Denne funksjonen kalles når
nc(nats connection) sinInProcessServerikke ernili NATs' Go clientnats.go. Den oppretter en connection (net.Conn) og binder den til serverens connection.
Consumer driven interface of Go
En type implementerer et grensesnitt ved å implementere dets metoder. Det er ingen eksplisitt erklæring av intensjon, ingen "implements"-nøkkelord. Implisitte grensesnitt frakobler definisjonen av et grensesnitt fra dets implementasjon, som da kan dukke opp i enhver pakke uten forhåndsavtale.
Interfaces are implemented implicitly, A Tour of Go
If a type exists only to implement an interface and will never have exported methods beyond that interface, there is no need to export the type itself.
- Denne grensesnittdesignen inkorporerer godt det som ofte kalles consumer defined interface og structural typing (duck typing) i Go, så jeg ønsker å presentere dette temaet samtidig.
1// nats-go/nats.go
2
3// Options kan brukes til å opprette en tilpasset tilkobling.
4type Options struct {
5 // Url representerer en enkelt NATS-server-URL som klienten
6 // vil koble til. Hvis Servers-opsjonen også er satt, blir den
7 // den første serveren i Servers-arrayet.
8 Url string
9
10 // InProcessServer representerer en NATS-server som kjører innenfor
11 // samme prosess. Hvis dette er satt, vil vi forsøke å koble til
12 // serveren direkte i stedet for å bruke eksterne TCP-tilkoblinger.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- La oss gå tilbake til koden. I nats.go-klienten er
InProcessServer-opsjonens struct-felt definert somInProcessConnProvider-grensesnittet, som kun utførerInProcessConn.
1// nats-server/server/server.go
2
3// InProcessConn returnerer en in-process-tilkobling til serveren,
4// og unngår behovet for å bruke en TCP-lytter for lokal tilkobling
5// innenfor samme prosess. Dette kan brukes uavhengig av
6// tilstanden til DontListen-opsjonen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- Imidlertid er typen som legges inn i den
Serverfra nats-server, som utfører en rekke funksjoner i tillegg tilInProcessConn. - Dette skyldes at klientens interesse i denne situasjonen kun er om
InProcessConn-grensesnittet er levert eller ikke, og andre ting er ikke særlig viktige. - Derfor har nats.go-klienten kun laget og brukt et consumer defined interface kalt
InProcessConnProvider, som kun definerer funksjonalitetenInProcessConn() (net.Conn, error).
Konklusjon
- Jeg har kort behandlet NATs' embedded mode og dens virkemåte, samt Go's consumer defined interface som kan bekreftes gjennom NATs' kode.
- Jeg håper denne informasjonen er nyttig for de som bruker NATS til lignende formål, og med dette avslutter jeg denne artikkelen.