GoSuda

Hvordan kommunikerer indlejrede NATS med go-applikationer?

By prravda
views ...

Kom i gang

Om NATS

Softwareapplikationer og -tjenester skal udveksle data. NATS er en infrastruktur, der muliggør en sådan dataudveksling, segmenteret i form af beskeder. Vi kalder dette for en " meddelelsesorienteret middleware ".

Med NATS kan applikationsudviklere:

  • Ubesværet bygge distribuerede og skalerbare klient-server-applikationer.
  • Gemme og distribuere data i realtid på en generel måde. Dette kan fleksibelt opnås på tværs af forskellige miljøer, sprog, cloud-udbydere og on-premises systemer.

Hvad er NATS, NATS docs

  • NATS er en meddelelsesmægler bygget i Go.

Embedded NATS

Hvis din applikation er i Go, og hvis den passer til dit brugsscenarie og dine implementeringsscenarier, kan du endda indlejre en NATS-server i din applikation.

Embedding NATS, NATS docs

  • En særlig egenskab ved NATS er, at den understøtter embedded mode for applikationer bygget i Go.
  • Det betyder, at i stedet for den sædvanlige metode for meddelelsesmæglere, hvor en separat brokenserver køres og kommunikation foregår via klienter i applikationen, kan selve mægleren indlejres i en applikation lavet med Go.

Fordele og brugsscenarier for embedded NATS

  • Der er en velforklaret Youtube-video, som jeg vil henvise til via linket.
  • Selvom der ikke er behov for at implementere en separat meddelelsesmægler-server, kan man opnå fordelene ved at indlejre NATS i en modular monolith-applikation, hvilket muliggør "separation of concern". Derudover bliver single binary deployment også muligt.
  • Den kan bruges ikke kun på platforme uden netværk (WASM), men også i offline-first applikationer.

Eksempel på officiel dokumentation

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialiser ny server med indstillinger
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start serveren via goroutine
22    go ns.Start()
23
24    // Vent på, at serveren er klar til forbindelser
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("ikke klar til forbindelse")
27    }
28
29    // Opret forbindelse til serveren
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Abonner på emnet
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Udskriv meddelelsesdata
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Luk serveren ned (valgfrit)
45        ns.Shutdown()
46    })
47
48    // Udgiv data til emnet
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Vent på servernedlukning
52    ns.WaitForShutdown()
53}
  • Dette er et eksempel på Embedded NATS fra den officielle NATS-dokumentation, men hvis du følger denne eksempelkode, vil kommunikationen ikke foregå i embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Når du kører Go-filen med go run . og overvåger netværket til og fra localhost (127.0.0.1) med kommandoen watch 'netstat -an | grep 127.0.0.1', kan du se, at nye netværksanmodninger fra NATS' standardport 4222 tilføjes.

Korrekte konfigurationer for embedding mode

  • For at opnå den tilsigtede embedded mode-kommunikation kræves følgende to muligheder:

    • Klient: InProcessServer-optionen skal tilføjes.
    • Server: Flaget DontListen skal eksplicit sættes til true i Server.Options.
  • Disse dele er ikke officielt dokumenteret, men funktionaliteten kan spores via denne PR.

    Denne PR tilføjer tre ting:

    1. InProcessConn()-funktion til Server, som bygger et net.Pipe for at få en forbindelse til NATS-serveren uden at bruge TCP-sockets
    2. DontListen-option, som fortæller NATS-serveren ikke at lytte på den sædvanlige TCP-lytter
    3. startupComplete-kanal, som lukkes lige før vi starter AcceptLoop, og readyForConnections vil vente på den

    Hovedmotivationen for dette er, at vi har en applikation, der kan køre enten i en monolit (single-process) tilstand eller en polylit (multi-process) tilstand. Vi vil gerne kunne bruge NATS til begge tilstande for enkelhedens skyld, men monolit-tilstanden skal kunne imødekomme en række platforme, hvor åbning af socket-forbindelser enten ikke giver mening (mobil) eller simpelthen ikke er muligt (WASM). Disse ændringer vil give os mulighed for at bruge NATS fuldstændigt in-process i stedet.

    En medfølgende PR nats-io/nats.go#774 tilføjer support til klientsiden.

    Dette er min første PR til dette projekt, så undskyld på forhånd, hvis jeg har overset noget åbenlyst et sted.

    /cc @nats-io/core

Fungerende eksempel for embedded mode

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// til konfiguration af den indlejrede NATS-server
14		// sæt DontListen til true
15		DontListen: true,
16	}
17
18	// Initialiser ny server med indstillinger
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start serveren via goroutine
26	go ns.Start()
27
28	// Vent på, at serveren er klar til forbindelser
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("ikke klar til forbindelse")
31	}
32
33	// Opret forbindelse til serveren via in-process forbindelse
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Abonner på emnet
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Udskriv meddelelsesdata
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Luk serveren ned (valgfrit)
49		ns.Shutdown()
50	})
51
52	// Udgiv data til emnet
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Vent på servernedlukning
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Nu kan man se, at der ikke opstår yderligere netværks-hop, som tilsigtet.

Under the hood

TL;DR

diagram1

  • Dette er et sekvensdiagram, der viser, hvordan interne funktioner arbejder, når koden køres i main.go, og kernen kan forklares som følger:
    • Serveren springer klientens lyttende fase, AcceptLoop, over via DontListen: true.
    • Hvis InProcessServer aktiveres blandt klientens Connect options, oprettes en in-memory forbindelse, og der dannes en pipe via net.Pipe. Derefter returneres enden af pipen til klienten som en net.Conn type.
    • Klienten og serveren udfører in-process kommunikation via denne forbindelse.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Vent på klienter.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • For det første, hvis DontListen er sandt, springes klientens lyttefase, AcceptLoop, over.
 1// nats-server/server/server.go
 2
 3// AcceptLoop eksporteres for lettere test.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Hvis vi skulle afslutte, før lytteren er korrekt sat op,
 6	// sørg for at lukke kanalen.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot serverindstillinger.
18	opts := s.getOpts()
19
20	// Opsæt tilstand, der kan aktivere nedlukning
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Fejl ved lytning på port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Lytter efter klientforbindelser på %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Advarsel om TLS aktiveret.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS krævet for klientforbindelser")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Klienter, der ikke bruger \"TLS Handshake First\"-optionen, vil ikke kunne oprette forbindelse")
38		}
39	}
40
41	// Hvis serveren blev startet med RANDOM_PORT (-1), ville opts.Port være lig med
42	// 0 i begyndelsen af denne funktion. Så vi skal hente den faktiske port
43	if opts.Port == 0 {
44		// Skriv den løste port tilbage til indstillingerne.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Nu hvor porten er blevet indstillet (hvis den blev indstillet til RANDOM), indstil
49	// serverens info Host/Port med enten værdier fra Options eller
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Fejl ved indstilling af server INFO med ClientAdvertise-værdi af %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Hold styr på klientforbindelses-URL'er. Vi får måske brug for dem senere.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signal, at vi ikke accepterer nye klienter
65				s.ldmCh <- true
66				// Vent nu på nedlukningen...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Lad kaldende vide, at vi er klar
75	close(clr)
76	clr = nil
77}
  • For reference udfører funktionen AcceptLoop følgende processer: Dette er dele, der er relateret til netværkskommunikation, såsom TLS og hostPort, og som kan udelades, da de er unødvendige ved in-process kommunikation.

Klient

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect vil forsøge at oprette forbindelse til NATS-systemet.
 5// URL'en kan indeholde brugernavn/adgangskode-semantik. f.eks. nats://derek:pass@localhost:4222
 6// Komma-separerede arrays understøttes også, f.eks. urlA, urlB.
 7// Indstillinger starter med standardværdierne, men kan tilsidesættes.
 8// For at oprette forbindelse til en NATS-servers websocket-port, brug `ws` eller `wss` skemaet, f.eks.
 9// `ws://localhost:8080`. Bemærk, at websocket-skemaer ikke kan blandes med andre (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options kan bruges til at oprette en tilpasset forbindelse.
 4type Options struct {
 5	// Url repræsenterer en enkelt NATS-server-url, som klienten
 6	// vil oprette forbindelse til. Hvis Servers-optionen også er sat,
 7	// bliver den den første server i Servers-arrayet.
 8	Url string
 9
10	// InProcessServer repræsenterer en NATS-server, der kører inden for
11	// samme proces. Hvis dette er sat, vil vi forsøge at oprette forbindelse
12	// direkte til serveren i stedet for at bruge eksterne TCP-forbindelser.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Funktionen Connect, der håndterer forbindelsen mellem NATS-serveren og NATS-klienten, giver mulighed for at konfigurere klient-URL'en og forbindelsesoptionen. I Options-strukturen, der samler disse options, findes et felt InProcessServer af typen InProcessConnProvider interface.
1// main.go of example code
2
3// Initialiser ny server med indstillinger
4ns, err := server.NewServer(opts)
5
6//...
7
8// Opret forbindelse til serveren via in-process forbindelse
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Når nats-klienten opretter forbindelse, og nats.InProcessServer(ns) sendes til InProcessServer-feltet, så
 1// nats-go/nats.go
 2
 3// createConn vil oprette forbindelse til serveren og pakke de relevante
 4// bufio-strukturer. Den vil gøre det rigtige, når en eksisterende
 5// forbindelse er på plads.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Hvis vi har en reference til en in-process server, etableres en
15	// forbindelse ved hjælp af denne.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("kunne ikke opnå in-process forbindelse: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • dette interface udfører InProcesConn for InProcessServer-optionen, når den ikke er nul (gyldig) i funktionen createConn, der opretter forbindelsen, og
 1// nats-server/server/server.go
 2
 3// InProcessConn returnerer en in-process forbindelse til serveren,
 4// hvilket undgår behovet for at bruge en TCP-lytter til lokal forbindelse
 5// inden for den samme proces. Dette kan bruges uanset
 6// tilstanden af DontListen-optionen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("kunne ikke oprette forbindelse")
16	}
17	return pr, nil
18}
  • Den kalder og udfører InProcessConn, der er implementeret på serveren.
  • Denne funktion kaldes i nats.go, nats' Go-klient, når nc (nats connection) 's InProcessServer ikke er nul. Den opretter en forbindelse (net.Conn) og binder den til serverens forbindelse.

Forbrugerdrevet grænseflade af Go

En type implementerer en grænseflade ved at implementere dens metoder. Der er ingen eksplicit erklæring om hensigt, intet "implementerer" nøgleord. Implicitte grænseflader afkobler definitionen af en grænseflade fra dens implementering, som derefter kan optræde i enhver pakke uden forudgående aftale.

Grænseflader implementeres implicit, A Tour of Go

Hvis en type kun eksisterer for at implementere en grænseflade og aldrig vil have eksporterede metoder ud over den grænseflade, er der ingen grund til at eksportere selve typen.

Generality, Effective Go

  • Dette grænsefladedesign indkapsler godt det, der ofte omtales som forbrugerdefinerede grænseflader og strukturel typning (duck typing) i Go, så jeg vil gerne introducere dette emne også.
 1// nats-go/nats.go
 2
 3// Options kan bruges til at oprette en tilpasset forbindelse.
 4type Options struct {
 5	// Url repræsenterer en enkelt NATS-server-url, som klienten
 6	// vil oprette forbindelse til. Hvis Servers-optionen også er sat,
 7	// bliver den den første server i Servers-arrayet.
 8	Url string
 9
10	// InProcessServer repræsenterer en NATS-server, der kører inden for
11	// samme proces. Hvis dette er sat, vil vi forsøge at oprette forbindelse
12	// direkte til serveren i stedet for at bruge eksterne TCP-forbindelser.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Lad os vende tilbage til koden. I nats.go-klienten er InProcessServer-optionens strukturfelt defineret som InProcessConnProvider-interfacet, som kun udfører InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn returnerer en in-process forbindelse til serveren,
 4// hvilket undgår behovet for at bruge en TCP-lytter til lokal forbindelse
 5// inden for den samme proces. Dette kan bruges uanset
 6// tilstanden af DontListen-optionen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("kunne ikke oprette forbindelse")
16	}
17	return pr, nil
18}
  • Men den type, der indgår i dette, er Server fra nats-serveren, som udfører en række funktioner ud over InProcessConn.
  • Årsagen er, at klientens interesse i denne situation kun er, om InProcessConn-interfacet er blevet leveret eller ej; andre ting er ikke afgørende.
  • Derfor har nats.go-klienten kun oprettet og bruger InProcessConnProvider, et forbrugerdefineret interface, der kun definerer funktionen InProcessConn() (net.Conn, error).

Konklusion

  • Jeg har kort behandlet NATS' embedded mode og dens funktionsmåde, samt Go's forbrugerdefinerede interface, som kan bekræftes via NATS' kode.
  • Jeg håber, at denne information er nyttig for dem, der bruger NATS til de nævnte formål, og hermed afslutter jeg denne artikel.