Hvordan kommunikerer indlejrede NATS med go-applikationer?
Kom i gang
Om NATS
Softwareapplikationer og -tjenester skal udveksle data. NATS er en infrastruktur, der muliggør en sådan dataudveksling, segmenteret i form af beskeder. Vi kalder dette for en " meddelelsesorienteret middleware ".
Med NATS kan applikationsudviklere:
- Ubesværet bygge distribuerede og skalerbare klient-server-applikationer.
- Gemme og distribuere data i realtid på en generel måde. Dette kan fleksibelt opnås på tværs af forskellige miljøer, sprog, cloud-udbydere og on-premises systemer.
- NATS er en meddelelsesmægler bygget i Go.
Embedded NATS
Hvis din applikation er i Go, og hvis den passer til dit brugsscenarie og dine implementeringsscenarier, kan du endda indlejre en NATS-server i din applikation.
- En særlig egenskab ved NATS er, at den understøtter embedded mode for applikationer bygget i Go.
- Det betyder, at i stedet for den sædvanlige metode for meddelelsesmæglere, hvor en separat brokenserver køres og kommunikation foregår via klienter i applikationen, kan selve mægleren indlejres i en applikation lavet med Go.
Fordele og brugsscenarier for embedded NATS
- Der er en velforklaret Youtube-video, som jeg vil henvise til via linket.
- Selvom der ikke er behov for at implementere en separat meddelelsesmægler-server, kan man opnå fordelene ved at indlejre NATS i en modular monolith-applikation, hvilket muliggør "separation of concern". Derudover bliver single binary deployment også muligt.
- Den kan bruges ikke kun på platforme uden netværk (WASM), men også i offline-first applikationer.
Eksempel på officiel dokumentation
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialiser ny server med indstillinger
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Start serveren via goroutine
22 go ns.Start()
23
24 // Vent på, at serveren er klar til forbindelser
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("ikke klar til forbindelse")
27 }
28
29 // Opret forbindelse til serveren
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Abonner på emnet
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Udskriv meddelelsesdata
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Luk serveren ned (valgfrit)
45 ns.Shutdown()
46 })
47
48 // Udgiv data til emnet
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Vent på servernedlukning
52 ns.WaitForShutdown()
53}
- Dette er et eksempel på Embedded NATS fra den officielle NATS-dokumentation, men hvis du følger denne eksempelkode, vil kommunikationen ikke foregå i embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Når du kører Go-filen med
go run .
og overvåger netværket til og fra localhost (127.0.0.1) med kommandoenwatch 'netstat -an | grep 127.0.0.1'
, kan du se, at nye netværksanmodninger fra NATS' standardport4222
tilføjes.
Korrekte konfigurationer for embedding mode
For at opnå den tilsigtede embedded mode-kommunikation kræves følgende to muligheder:
- Klient:
InProcessServer
-optionen skal tilføjes. - Server: Flaget
DontListen
skal eksplicit sættes tiltrue
iServer.Options
.
- Klient:
Disse dele er ikke officielt dokumenteret, men funktionaliteten kan spores via denne PR.
Denne PR tilføjer tre ting:
InProcessConn()
-funktion tilServer
, som bygger etnet.Pipe
for at få en forbindelse til NATS-serveren uden at bruge TCP-socketsDontListen
-option, som fortæller NATS-serveren ikke at lytte på den sædvanlige TCP-lytterstartupComplete
-kanal, som lukkes lige før vi starterAcceptLoop
, ogreadyForConnections
vil vente på den
Hovedmotivationen for dette er, at vi har en applikation, der kan køre enten i en monolit (single-process) tilstand eller en polylit (multi-process) tilstand. Vi vil gerne kunne bruge NATS til begge tilstande for enkelhedens skyld, men monolit-tilstanden skal kunne imødekomme en række platforme, hvor åbning af socket-forbindelser enten ikke giver mening (mobil) eller simpelthen ikke er muligt (WASM). Disse ændringer vil give os mulighed for at bruge NATS fuldstændigt in-process i stedet.
En medfølgende PR nats-io/nats.go#774 tilføjer support til klientsiden.
Dette er min første PR til dette projekt, så undskyld på forhånd, hvis jeg har overset noget åbenlyst et sted.
/cc @nats-io/core
Fungerende eksempel for embedded mode
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // til konfiguration af den indlejrede NATS-server
14 // sæt DontListen til true
15 DontListen: true,
16 }
17
18 // Initialiser ny server med indstillinger
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Start serveren via goroutine
26 go ns.Start()
27
28 // Vent på, at serveren er klar til forbindelser
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("ikke klar til forbindelse")
31 }
32
33 // Opret forbindelse til serveren via in-process forbindelse
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Abonner på emnet
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Udskriv meddelelsesdata
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Luk serveren ned (valgfrit)
49 ns.Shutdown()
50 })
51
52 // Udgiv data til emnet
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Vent på servernedlukning
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Nu kan man se, at der ikke opstår yderligere netværks-hop, som tilsigtet.
Under the hood
TL;DR
- Dette er et sekvensdiagram, der viser, hvordan interne funktioner arbejder, når koden køres i
main.go
, og kernen kan forklares som følger:- Serveren springer klientens lyttende fase,
AcceptLoop
, over viaDontListen: true
. - Hvis
InProcessServer
aktiveres blandt klientens Connect options, oprettes en in-memory forbindelse, og der dannes en pipe vianet.Pipe
. Derefter returneres enden af pipen til klienten som ennet.Conn
type. - Klienten og serveren udfører in-process kommunikation via denne forbindelse.
- Serveren springer klientens lyttende fase,
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Vent på klienter.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- For det første, hvis
DontListen
er sandt, springes klientens lyttefase,AcceptLoop
, over.
1// nats-server/server/server.go
2
3// AcceptLoop eksporteres for lettere test.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Hvis vi skulle afslutte, før lytteren er korrekt sat op,
6 // sørg for at lukke kanalen.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Snapshot serverindstillinger.
18 opts := s.getOpts()
19
20 // Opsæt tilstand, der kan aktivere nedlukning
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Fejl ved lytning på port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Lytter efter klientforbindelser på %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Advarsel om TLS aktiveret.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS krævet for klientforbindelser")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Klienter, der ikke bruger \"TLS Handshake First\"-optionen, vil ikke kunne oprette forbindelse")
38 }
39 }
40
41 // Hvis serveren blev startet med RANDOM_PORT (-1), ville opts.Port være lig med
42 // 0 i begyndelsen af denne funktion. Så vi skal hente den faktiske port
43 if opts.Port == 0 {
44 // Skriv den løste port tilbage til indstillingerne.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Nu hvor porten er blevet indstillet (hvis den blev indstillet til RANDOM), indstil
49 // serverens info Host/Port med enten værdier fra Options eller
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Fejl ved indstilling af server INFO med ClientAdvertise-værdi af %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Hold styr på klientforbindelses-URL'er. Vi får måske brug for dem senere.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signal, at vi ikke accepterer nye klienter
65 s.ldmCh <- true
66 // Vent nu på nedlukningen...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Lad kaldende vide, at vi er klar
75 close(clr)
76 clr = nil
77}
- For reference udfører funktionen AcceptLoop følgende processer: Dette er dele, der er relateret til netværkskommunikation, såsom
TLS
oghostPort
, og som kan udelades, da de er unødvendige ved in-process kommunikation.
Klient
InProcessServer
1
2// nats-go/nats.go
3
4// Connect vil forsøge at oprette forbindelse til NATS-systemet.
5// URL'en kan indeholde brugernavn/adgangskode-semantik. f.eks. nats://derek:pass@localhost:4222
6// Komma-separerede arrays understøttes også, f.eks. urlA, urlB.
7// Indstillinger starter med standardværdierne, men kan tilsidesættes.
8// For at oprette forbindelse til en NATS-servers websocket-port, brug `ws` eller `wss` skemaet, f.eks.
9// `ws://localhost:8080`. Bemærk, at websocket-skemaer ikke kan blandes med andre (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options kan bruges til at oprette en tilpasset forbindelse.
4type Options struct {
5 // Url repræsenterer en enkelt NATS-server-url, som klienten
6 // vil oprette forbindelse til. Hvis Servers-optionen også er sat,
7 // bliver den den første server i Servers-arrayet.
8 Url string
9
10 // InProcessServer repræsenterer en NATS-server, der kører inden for
11 // samme proces. Hvis dette er sat, vil vi forsøge at oprette forbindelse
12 // direkte til serveren i stedet for at bruge eksterne TCP-forbindelser.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Funktionen
Connect
, der håndterer forbindelsen mellem NATS-serveren og NATS-klienten, giver mulighed for at konfigurere klient-URL'en og forbindelsesoptionen. I Options-strukturen, der samler disse options, findes et feltInProcessServer
af typenInProcessConnProvider
interface.
1// main.go of example code
2
3// Initialiser ny server med indstillinger
4ns, err := server.NewServer(opts)
5
6//...
7
8// Opret forbindelse til serveren via in-process forbindelse
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Når nats-klienten opretter forbindelse, og
nats.InProcessServer(ns)
sendes tilInProcessServer
-feltet, så
1// nats-go/nats.go
2
3// createConn vil oprette forbindelse til serveren og pakke de relevante
4// bufio-strukturer. Den vil gøre det rigtige, når en eksisterende
5// forbindelse er på plads.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Hvis vi har en reference til en in-process server, etableres en
15 // forbindelse ved hjælp af denne.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("kunne ikke opnå in-process forbindelse: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- dette interface udfører
InProcesConn
for InProcessServer-optionen, når den ikke er nul (gyldig) i funktionencreateConn
, der opretter forbindelsen, og
1// nats-server/server/server.go
2
3// InProcessConn returnerer en in-process forbindelse til serveren,
4// hvilket undgår behovet for at bruge en TCP-lytter til lokal forbindelse
5// inden for den samme proces. Dette kan bruges uanset
6// tilstanden af DontListen-optionen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("kunne ikke oprette forbindelse")
16 }
17 return pr, nil
18}
- Den kalder og udfører
InProcessConn
, der er implementeret på serveren. - Denne funktion kaldes i
nats.go
, nats' Go-klient, når nc (nats connection) 'sInProcessServer
ikke er nul. Den opretter en forbindelse (net.Conn
) og binder den til serverens forbindelse.
Forbrugerdrevet grænseflade af Go
En type implementerer en grænseflade ved at implementere dens metoder. Der er ingen eksplicit erklæring om hensigt, intet "implementerer" nøgleord. Implicitte grænseflader afkobler definitionen af en grænseflade fra dens implementering, som derefter kan optræde i enhver pakke uden forudgående aftale.
Grænseflader implementeres implicit, A Tour of Go
Hvis en type kun eksisterer for at implementere en grænseflade og aldrig vil have eksporterede metoder ud over den grænseflade, er der ingen grund til at eksportere selve typen.
- Dette grænsefladedesign indkapsler godt det, der ofte omtales som forbrugerdefinerede grænseflader og strukturel typning (duck typing) i Go, så jeg vil gerne introducere dette emne også.
1// nats-go/nats.go
2
3// Options kan bruges til at oprette en tilpasset forbindelse.
4type Options struct {
5 // Url repræsenterer en enkelt NATS-server-url, som klienten
6 // vil oprette forbindelse til. Hvis Servers-optionen også er sat,
7 // bliver den den første server i Servers-arrayet.
8 Url string
9
10 // InProcessServer repræsenterer en NATS-server, der kører inden for
11 // samme proces. Hvis dette er sat, vil vi forsøge at oprette forbindelse
12 // direkte til serveren i stedet for at bruge eksterne TCP-forbindelser.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Lad os vende tilbage til koden. I nats.go-klienten er
InProcessServer
-optionens strukturfelt defineret somInProcessConnProvider
-interfacet, som kun udførerInProcessConn
.
1// nats-server/server/server.go
2
3// InProcessConn returnerer en in-process forbindelse til serveren,
4// hvilket undgår behovet for at bruge en TCP-lytter til lokal forbindelse
5// inden for den samme proces. Dette kan bruges uanset
6// tilstanden af DontListen-optionen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("kunne ikke oprette forbindelse")
16 }
17 return pr, nil
18}
- Men den type, der indgår i dette, er
Server
fra nats-serveren, som udfører en række funktioner ud overInProcessConn
. - Årsagen er, at klientens interesse i denne situation kun er, om
InProcessConn
-interfacet er blevet leveret eller ej; andre ting er ikke afgørende. - Derfor har nats.go-klienten kun oprettet og bruger
InProcessConnProvider
, et forbrugerdefineret interface, der kun definerer funktionenInProcessConn() (net.Conn, error)
.
Konklusion
- Jeg har kort behandlet NATS' embedded mode og dens funktionsmåde, samt Go's forbrugerdefinerede interface, som kan bekræftes via NATS' kode.
- Jeg håber, at denne information er nyttig for dem, der bruger NATS til de nævnte formål, og hermed afslutter jeg denne artikel.