GoSuda

Hur kommunicerar inbäddade NATS med Go-applikationer?

By prravda
views ...

Komma igång

Om NATS

Mjukvaruapplikationer och tjänster behöver utbyta data. NATS är en infrastruktur som möjliggör ett sådant datautbyte, segmenterat i form av meddelanden. Vi kallar detta en "meddelandeorienterad middleware".

Med NATS kan applikationsutvecklare:

  • Bygga distribuerade och skalbara klient-server-applikationer utan ansträngning.
  • Lagra och distribuera data i realtid på ett generellt sätt. Detta kan flexibelt uppnås över olika miljöer, språk, molnleverantörer och lokala system.

Vad är NATS, NATS docs

  • NATS är en meddelandemäklare som är konstruerad i Go.

Inbäddad NATS

Om din applikation är i Go, och om den passar dina användningsfall och driftsättningsscenarier, kan du till och med bädda in en NATS-server i din applikation.

Bädda in NATS, NATS docs

  • Och en särdrag hos NATS är att den stöder embedded mode för applikationer konstruerade i Go.
  • Det betyder att istället för den vanliga metoden för meddelandemäklare, där en separat mäklarserver startas och kommunikation sker via applikationens klienter till den servern, kan mäklaren själv bäddas in i en Go-byggd applikation.

Fördelar och användningsfall med inbäddad NATS

  • Det finns en välförklarande Youtube-video, så jag hänvisar till videolänken.
  • Även utan att distribuera en separat meddelandemäklare kan man skapa en modular monolith applictaion för att uppnå separate of concern, och samtidigt dra nytta av fördelen att NATS kan bäddas in. Dessutom blir single binary deployment möjligt.
  • Det kan användas användbart inte bara på plattformar utan nätverk (WASM), utan även i offline-first application.

Exempel på officiella dokument

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initiera ny server med alternativ
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Starta servern via goroutine
22    go ns.Start()
23
24    // Vänta tills servern är redo för anslutningar
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Anslut till servern
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Prenumerera på ämnet
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Skriv ut meddelandedata
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Stäng av servern (valfritt)
45        ns.Shutdown()
46    })
47
48    // Publicera data till ämnet
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Vänta på serveravstängning
52    ns.WaitForShutdown()
53}
  • Detta är ett exempel på inbäddad NATS som länkats i NATS officiella dokumentation, men om man följer denna exempelkod kommer kommunikationen inte att ske i embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Om du kör go run . med kommandot watch 'netstat -an | grep 127.0.0.1' för att kontrollera nätverkstrafiken som går till och från localhost (127.0.0.1), kan du se att nya nätverksförfrågningar som utgår från NATS standardport 4222 läggs till.

Rätt konfigurationer för inbäddat läge

  • För att kommunikationen ska ske i embedded mode som avsett, krävs följande två alternativ:

    • Client: InProcessServer-alternativet måste inkluderas.
    • Server: DontListen-flaggan i Server.Options måste explicit sättas till true.
  • Dessa delar var inte officiellt dokumenterade, och funktionen kan spåras till denna PR.

    Denna PR lägger till tre saker:

    1. InProcessConn() funktion till Server som bygger en net.Pipe för att få en anslutning till NATS-servern utan att använda TCP-sockets
    2. DontListen alternativ som säger åt NATS-servern att inte lyssna på den vanliga TCP-lyssnaren
    3. startupComplete kanal, som stängs precis innan vi startar AcceptLoop, och readyForConnections kommer att vänta på den

    Huvudmotiveringen för detta är att vi har en applikation som kan köras antingen i ett monolitläge (single-process) eller ett polylitläge (multi-process). Vi vill kunna använda NATS för båda lägena för enkelhetens skull, men monolitläget måste kunna tillgodose en mängd olika plattformar där öppnande av socketanslutningar antingen inte är meningsfullt (mobil) eller helt enkelt inte är möjligt (WASM). Dessa ändringar kommer att göra det möjligt för oss att använda NATS helt in-process istället.

    En medföljande PR nats-io/nats.go#774 lägger till stöd för klientsidan.

    Detta är min första PR till detta projekt så ursäkta i förväg om jag har missat något uppenbart någonstans.

    /cc @nats-io/core

Fungerande exempel för inbäddat läge

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// för att konfigurera den inbäddade NATS-servern
14		// sätt DonListen till true
15		DontListen: true,
16	}
17
18	// Initiera ny server med alternativ
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Starta servern via goroutine
26	go ns.Start()
27
28	// Vänta tills servern är redo för anslutningar
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Anslut till servern via in-process anslutning
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Prenumerera på ämnet
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Skriv ut meddelandedata
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Stäng av servern (valfritt)
49		ns.Shutdown()
50	})
51
52	// Publicera data till ämnet
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Vänta på serveravstängning
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Nu kan vi se att inga ytterligare network hops uppstår som avsett.

Under huven

TL;DR

diagram1

  • Detta är ett sequence diagram som visar vilka funktioner som körs internt när koden körs från main.go, och huvudpoängen är följande:
    • Genom DontListen: true hoppar servern över AcceptLoop, som är en client listening phase.
    • Om klientens Connect option InProcessServer är aktiverad, skapas en in-memory connection, ett pipe skapas via net.Pipe, och pipe-änden returneras till klienten som en net.Conn-typ.
    • Klienten och servern kommunicerar in-process via denna connection.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Vänta på klienter.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Först, om DontListen är true, hoppar servern över AcceptLoop, som är en client listening phase.
 1// nats-server/server/server.go
 2
 3// AcceptLoop är exporterad för enklare testning.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Om vi skulle avsluta innan lyssnaren är korrekt inställd,
 6	// se till att vi stänger kanalen.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Ögonblicksbild av serveralternativ.
18	opts := s.getOpts()
19
20	// Ställ in tillstånd som kan möjliggöra avstängning
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Fel vid lyssning på port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Lyssnar efter klientanslutningar på %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Meddela om TLS är aktiverat.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS krävs för klientanslutningar")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Klienter som inte använder \"TLS Handshake First\"-alternativet kommer inte att kunna ansluta")
38		}
39	}
40
41	// Om servern startades med RANDOM_PORT (-1), skulle opts.Port vara lika med
42	// 0 i början av denna funktion. Så vi måste få den faktiska porten
43	if opts.Port == 0 {
44		// Skriv den lösta porten tillbaka till alternativen.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Nu när porten har ställts in (om den var inställd på RANDOM), ställ in
49	// serverns info Host/Port med antingen värden från Options eller
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Fel vid inställning av serverINFO med ClientAdvertise-värde %s, fel=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Håll reda på klientanslutnings-URL:er. Vi kan behöva dem senare.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signalera att vi inte accepterar nya klienter
65				s.ldmCh <- true
66				// Vänta nu på avstängningen...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Låt anroparen veta att vi är redo
75	close(clr)
76	clr = nil
77}
  • För övrigt utför AcceptLoop-funktionen följande processer. Delar relaterade till nätverkskommunikation, såsom TLS eller hostPort, är onödiga vid in-process communication och kan därför utelämnas.

Klient

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect kommer att försöka ansluta till NATS-systemet.
 5// URL:en kan innehålla användarnamn/lösenord-semantik. t.ex. nats://derek:pass@localhost:4222
 6// Kommaavgränsade arrayer stöds också, t.ex. urlA, urlB.
 7// Alternativen börjar med standardvärden men kan åsidosättas.
 8// För att ansluta till en NATS Servers websocket-port, använd `ws` eller `wss` schemat, t.ex.
 9// `ws://localhost:8080`. Observera att websocket-scheman inte kan blandas med andra (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options kan användas för att skapa en anpassad anslutning.
 4type Options struct {
 5	// Url representerar en enskild NATS-server-URL som klienten
 6	// kommer att ansluta till. Om Servers-alternativet också är satt,
 7	// blir det den första servern i Servers-arrayen.
 8	Url string
 9
10	// InProcessServer representerar en NATS-server som körs inom
11	// samma process. Om detta är satt kommer vi att försöka ansluta
12	// direkt till servern istället för att använda externa TCP-anslutningar.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Connect-funktionen, som hanterar anslutningen mellan nats server och nats client, kan konfigurera client URL och connect Option. I Options-strukturen, som samlar dessa alternativ, finns ett fält InProcessServer av typen InProcessConnProvider interface.
1// main.go of example code
2
3// Initiera ny server med alternativ
4ns, err := server.NewServer(opts)
5
6//...
7
8// Anslut till servern via in-process anslutning
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • När nats client initierar en Connect-operation och skickar nats.InProcessServer(ns) till InProcessServer-fältet,
 1// nats-go/nats.go
 2
 3// createConn kommer att ansluta till servern och omsluta lämpliga
 4// bufio-strukturer. Den kommer att agera korrekt när en befintlig
 5// anslutning finns på plats.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Om vi har en referens till en in-process server, upprätta en
15	// anslutning med den.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • kommer alternativets InProcessServer att ersättas med den inbäddade nats servern, och
  • Detta gränssnitt kör InProcessConn för InProcessServer-alternativet, om det inte är nil (giltigt), i createConn-funktionen som skapar anslutningen.
 1// nats-server/server/server.go
 2
 3// InProcessConn returnerar en in-process anslutning till servern,
 4// vilket undviker behovet av att använda en TCP-lyssnare för lokal anslutning
 5// inom samma process. Detta kan användas oavsett
 6// DontListen-alternativets tillstånd.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • funktionen InProcessConn som är implementerad i servern anropas.
  • Denna funktion anropas när InProcessServer för nc (nats connection) i nats Go-klienten nats.go inte är nil. Den skapar en connection (net.Conn) och binder den till serverns connection.

Konsumentdriven Go-gränssnitt

En typ implementerar ett gränssnitt genom att implementera dess metoder. Det finns ingen explicit deklaration av avsikt, inget "implements"-nyckelord. Implicita gränssnitt kopplar bort definitionen av ett gränssnitt från dess implementering, som sedan kan dyka upp i vilket paket som helst utan förhandsarrangemang.

Gränssnitt implementeras implicit, A Tour of Go

Om en typ endast existerar för att implementera ett gränssnitt och aldrig kommer att ha exporterade metoder utöver det gränssnittet, finns det ingen anledning att exportera själva typen.

Allmänhet, Effektiv Go

  • Denna gränssnittsdesign exemplifierar väl vad man i Go ofta kallar consumer defined interface och structural typing (duck typing), och jag tänkte kortfattat presentera detta ämne.
 1// nats-go/nats.go
 2
 3// Options kan användas för att skapa en anpassad anslutning.
 4type Options struct {
 5	// Url representerar en enskild NATS-server-URL som klienten
 6	// kommer att ansluta till. Om Servers-alternativet också är satt,
 7	// blir det den första servern i Servers-arrayen.
 8	Url string
 9
10	// InProcessServer representerar en NATS-server som körs inom
11	// samma process. Om detta är satt kommer vi att försöka ansluta
12	// direkt till servern istället för att använda externa TCP-anslutningar.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Låt oss återgå till koden. I nats.go-klienten är fältet InProcessServer i option-strukturen definierat som ett InProcessConnProvider-gränssnitt som endast utför InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn returnerar en in-process anslutning till servern,
 4// vilket undviker behovet av att använda en TCP-lyssnare för lokal anslutning
 5// inom samma process. Detta kan användas oavsett
 6// DontListen-alternativets tillstånd.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Men typen som används är nats-serverns Server, som utför en mängd olika funktioner utöver endast InProcessConn.
  • Detta beror på att klientens intresse i denna situation enbart ligger i om gränssnittet InProcessConn har tillhandahållits eller inte, medan andra aspekter inte är av stor betydelse.
  • Därför har nats.go-klienten endast skapat och använder ett consumer defined interface kallat InProcessConnProvider, som endast definierar funktionen InProcessConn() (net.Conn, error).

Slutsats

  • Jag har kortfattat behandlat NATS embedded mode och dess funktionssätt, samt Go:s consumer defined interface som kan verifieras genom NATS-koden.
  • Jag hoppas att denna information är till hjälp för dem som använder NATS för liknande ändamål, och härmed avslutar jag denna artikel.