GoSuda

Hvordan kommunikerer indlejrede NATS med go-applikationen?

By prravda
views ...

Kom i gang

Om NATS

Softwareapplikationer og -tjenester skal udveksle data. NATS er en infrastruktur, der muliggør en sådan dataudveksling, opdelt i form af meddelelser. Vi kalder dette en "message oriented middleware".

Med NATS kan applikationsudviklere:

  • Ubesværet bygge distribuerede og skalerbare klient-server-applikationer.
  • Lagre og distribuere data i realtid på en generel måde. Dette kan fleksibelt opnås på tværs af forskellige miljøer, sprog, cloud-udbydere og on-premises systemer.

Hvad er NATS, NATS docs

  • NATS er en meddelelsesmægler, der er konfigureret i Go.

Embedded NATS

Hvis din applikation er i Go, og hvis den passer til dit use case og dine implementeringsscenarier, kan du endda indlejre en NATS-server i din applikation.

Embedding NATS, NATS docs

  • En særlig egenskab ved NATS er, at den understøtter embedded mode for applikationer bygget i Go.
  • Det betyder, at i stedet for den typiske metode for en meddelelsesmægler, hvor en separat mæglerserver kører, og applikationen kommunikerer via en klient til den server, kan mægleren selv indlejres (embed) i en applikation lavet i Go.

Fordele og use cases for embedded NATS

  • Der er en velbeskrevet Youtube-video, så jeg henviser til linket til videoen.
  • Selvom man ikke implementerer en separat meddelelsesmæglerserver, kan man drage fordel af at indlejre NATS for at skabe en modular monolith-applikation og opnå separation of concern. Derudover muliggør det også single binary deployment.
  • Det kan bruges fordelagtigt ikke kun på platforme uden netværk (wasm), men også i offline-first-applikationer.

Eksempel på officiel dokumentation

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialiser ny server med options
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start serveren via goroutine
22    go ns.Start()
23
24    // Vent på at serveren er klar til forbindelser
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Forbind til server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Abonner på emnet
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Udskriv meddelelsesdata
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Luk serveren ned (valgfrit)
45        ns.Shutdown()
46    })
47
48    // Udgiv data til emnet
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Vent på servernedlukning
52    ns.WaitForShutdown()
53}
  • Dette er et eksempel på Embedded NATS fra den officielle NATS-dokumentation, men hvis man følger denne eksempelkode, vil kommunikationen ikke foregå i embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Ved at køre kommandoen watch 'netstat -an | grep 127.0.0.1' for at kontrollere netværkstrafikken til og fra localhost (127.0.0.1), og derefter køre den pågældende go-fil med go run ., kan man se, at nye netværksanmodninger, der stammer fra NATS' standardport 4222, tilføjes.

Korrekte konfigurationer for embedding mode

  • For at opnå kommunikation i embedded mode som tilsigtet, kræves følgende to muligheder:

    • Klient: InProcessServer-indstillingen skal tilføjes.
    • Server: DontListen-flaget i Server.Options skal eksplicit angives som true.
  • Disse dele var ikke officielt dokumenteret, og funktionen kan spores tilbage til denne PR.

    Denne PR tilføjer tre ting:

    1. InProcessConn()-funktion til Server, som bygger en net.Pipe for at få en forbindelse til NATS-serveren uden at bruge TCP-sockets
    2. DontListen-indstilling, som fortæller NATS-serveren, at den ikke skal lytte på den sædvanlige TCP-lytter
    3. startupComplete-kanal, som lukkes lige før vi starter AcceptLoop, og readyForConnections vil vente på den

    Hovedmotivationen for dette er, at vi har en applikation, der kan køre enten i en monolit (enkeltproces) tilstand eller en polylit (flerproces) tilstand. Vi vil gerne kunne bruge NATS til begge tilstande for enkelhedens skyld, men monolit-tilstanden skal kunne imødekomme en række platforme, hvor åbning af socket-forbindelser enten ikke giver mening (mobil) eller simpelthen ikke er muligt (WASM). Disse ændringer vil give os mulighed for at bruge NATS udelukkende in-process i stedet.

    En tilhørende PR nats-io/nats.go#774 tilføjer support til klientsiden.

    Dette er min første PR til dette projekt, så undskyld på forhånd, hvis jeg har overset noget åbenlyst et sted.

    /cc @nats-io/core

Fungerende eksempel for embedded mode

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// til konfiguration af den embedded NATS-server
14		// sæt DonListen til true
15		DontListen: true,
16	}
17
18	// Initialiser ny server med options
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start serveren via goroutine
26	go ns.Start()
27
28	// Vent på at serveren er klar til forbindelser
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Forbind til server via in-process forbindelse
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Abonner på emnet
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Udskriv meddelelsesdata
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Luk serveren ned (valgfrit)
49		ns.Shutdown()
50	})
51
52	// Udgiv data til emnet
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Vent på servernedlukning
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Nu kan man se, at der ikke opstår yderligere network hops som tilsigtet.

Under the hood

TL;DR

diagram1

  • Dette er et sekvensdiagram, der viser, hvilke funktioner der kører internt, når koden eksekveres fra main.go, og hovedpunkterne er som følger:
    • Via DontListen: true udelader serveren AcceptLoop, som er klientens lyttefase.
    • Hvis klientens Connect option InProcessServer er aktiveret, oprettes en in-memory connection, og en pipe etableres via net.Pipe, hvorefter enden af pipen returneres til klienten som en net.Conn type.
    • Klienten og serveren udfører in-process communication via denne forbindelse.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Vent på klienter.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Først, hvis DontListen er true, udelades AcceptLoop, som er klientens lyttefase.
 1// nats-server/server/server.go
 2
 3// AcceptLoop eksporteres for lettere test.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Hvis vi skulle afslutte, før lytteren er korrekt opsat,
 6	// skal vi sørge for at lukke kanalen.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot serverindstillinger.
18	opts := s.getOpts()
19
20	// Opsæt tilstand, der kan muliggøre nedlukning
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Fejl under lytning på port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Lytter efter klientforbindelser på %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Advarsel om TLS aktiveret.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS krævet for klientforbindelser")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Klienter, der ikke bruger \"TLS Handshake First\"-indstillingen, vil mislykkes med at oprette forbindelse")
38		}
39	}
40
41	// Hvis serveren blev startet med RANDOM_PORT (-1), ville opts.Port være lig
42	// med 0 i starten af denne funktion. Så vi skal have den faktiske port.
43	if opts.Port == 0 {
44		// Skriv den løste port tilbage til indstillingerne.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Nu hvor porten er blevet indstillet (hvis den blev indstillet til RANDOM), skal
49	// serverens info Host/Port indstilles med enten værdier fra Options eller
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Fejl under indstilling af server INFO med ClientAdvertise-værdi af %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Hold styr på klientforbindelses-URL'er. Vi kan få brug for dem senere.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signalér at vi ikke accepterer nye klienter
65				s.ldmCh <- true
66				// Vent nu på nedlukningen...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Lad kaldende vide, at vi er klar
75	close(clr)
76	clr = nil
77}
  • Bemærk, at AcceptLoop-funktionen udfører følgende trin. Dette er dele relateret til netværkskommunikation, såsom TLS eller hostPort, som kan udelades, da de er unødvendige i in-process communication.

Klient

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect vil forsøge at oprette forbindelse til NATS-systemet.
 5// URL'en kan indeholde brugernavn/adgangskode-semantik. f.eks. nats://derek:pass@localhost:4222
 6// Kommaadskilte arrays understøttes også, f.eks. urlA, urlB.
 7// Indstillinger starter med standardværdierne, men kan overskrives.
 8// For at oprette forbindelse til en NATS-servers websocket-port, brug `ws` eller `wss` skemaet, f.eks.
 9// `ws://localhost:8080`. Bemærk, at websocket-skemaer ikke kan blandes med andre (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options kan bruges til at oprette en tilpasset forbindelse.
 4type Options struct {
 5	// Url repræsenterer en enkelt NATS-server-url, som klienten
 6	// vil oprette forbindelse til. Hvis Servers-indstillingen også er angivet,
 7	// bliver den den første server i Servers-arrayet.
 8	Url string
 9
10	// InProcessServer repræsenterer en NATS-server, der kører inden for
11	// samme proces. Hvis dette er indstillet, vil vi forsøge at oprette forbindelse
12	// direkte til serveren i stedet for at bruge eksterne TCP-forbindelser.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Connect-funktionen, der håndterer forbindelsen mellem NATS-serveren og NATS-klienten, kan indstille klient-URL'en og forbindelsesoptionen. I Options-strukturen, der samler disse optioner, findes et felt kaldet InProcessServer af typen InProcessConnProvider interface.
1// main.go af eksempelkode
2
3// Initialiser ny server med options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Forbind til server via in-process forbindelse
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Når nats-klienten opretter forbindelse, og nats.InProcessServer(ns) overføres til InProcessServer-feltet
 1// nats-go/nats.go
 2
 3// InProcessServer er en indstilling, der vil forsøge at etablere en retning til en NATS-server
 4// der kører inden for processen i stedet for at ringe via TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • optionens InProcessServer erstattes af den embedded NATS-server, og
 1// nats-go/nats.go
 2
 3// createConn vil oprette forbindelse til serveren og omslutte de relevante
 4// bufio-strukturer. Den vil gøre det rigtige, når en eksisterende
 5// forbindelse er på plads.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Hvis vi har en reference til en in-process server, så etabler en
15	// forbindelse ved hjælp af denne.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • dette interface kalder InProcesConn for InProcessServer i optionen, når InProcessServer-optionen ikke er nil (gyldig), og udfører den.
 1// nats-server/server/server.go
 2
 3// InProcessConn returnerer en in-process forbindelse til serveren,
 4// hvilket undgår behovet for at bruge en TCP-lytter til lokal forbindelse
 5// inden for den samme proces. Dette kan bruges uanset
 6// tilstanden af DontListen-indstillingen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Funktionen kalder den InProcessConn, der er implementeret på serveren.
  • Denne funktion kaldes i nats.go, som er NATS' Go-klient, hvis InProcessServer for nc (nats connection) ikke er nil. Den opretter en forbindelse (net.Conn) og binder den til serverens forbindelse.

Consumer driven interface af Go

En type implementerer et interface ved at implementere dets metoder. Der er ingen eksplicit erklæring om hensigt, intet "implementerer"-nøgleord. Implicitte interfaces afkobler definitionen af et interface fra dets implementering, som derefter kunne optræde i enhver pakke uden forudgående aftale.

Interfaces er implementeret implicit, A Tour of Go

Hvis en type kun eksisterer for at implementere et interface og aldrig vil have eksporterede metoder ud over dette interface, er der ingen grund til at eksportere selve typen.

Generality, Effective Go

  • Dette interfacedesign indkapsler godt det, man ofte kalder consumer defined interface og structural typing (duck typing) i Go, og jeg vil derfor gerne introducere dette emne.
 1// nats-go/nats.go
 2
 3// Options kan bruges til at oprette en tilpasset forbindelse.
 4type Options struct {
 5	// Url repræsenterer en enkelt NATS-server-url, som klienten
 6	// vil oprette forbindelse til. Hvis Servers-indstillingen også er angivet,
 7	// bliver den den første server i Servers-arrayet.
 8	Url string
 9
10	// InProcessServer repræsenterer en NATS-server, der kører inden for
11	// samme proces. Hvis dette er indstillet, vil vi forsøge at oprette forbindelse
12	// direkte til serveren i stedet for at bruge eksterne TCP-forbindelser.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Lad os vende tilbage til koden. I nats.go-klienten er InProcessServer-optionens struct-felt defineret som et InProcessConnProvider-interface, der kun udfører InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn returnerer en in-process forbindelse til serveren,
 4// hvilket undgår behovet for at bruge en TCP-lytter til lokal forbindelse
 5// inden for den samme proces. Dette kan bruges uanset
 6// tilstanden af DontListen-indstillingen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Typen, der indgår heri, er dog NATS-serverens Server, som udfører en række funktioner udover InProcessConn.
  • Dette skyldes, at klientens interesse i denne situation kun er, hvorvidt InProcessConn-interfacet er tilvejebragt eller ej, og andre aspekter er ikke af stor betydning.
  • Derfor har nats.go-klienten kun oprettet og anvender et consumer defined interface kaldet InProcessConnProvider, der kun definerer funktionen InProcessConn() (net.Conn, error).

Konklusion

  • Jeg har kortfattet behandlet NATS' embedded mode og dens funktionsmåde, samt Gos consumer defined interface, som kan bekræftes gennem NATS' kode.
  • Jeg håber, at disse oplysninger vil være nyttige for dem, der bruger NATS til de nævnte formål, og hermed afslutter jeg denne artikel.