GoSuda

Hur kommunicerar inbäddade NATS med Go-applikationer?

By prravda
views ...

Komma igång

Om NATS

Programvaruapplikationer och tjänster behöver utbyta data. NATS är en infrastruktur som möjliggör sådant datautbyte, segmenterat i form av meddelanden. Vi kallar detta en "meddelandeorienterad middleware".

Med NATS kan applikationsutvecklare:

  • Enkelt bygga distribuerade och skalbara klient-server-applikationer.
  • Lagra och distribuera data i realtid på ett generellt sätt. Detta kan flexibelt uppnås över olika miljöer, språk, molnleverantörer och lokala system.

Vad är NATS, NATS docs

  • NATS är en meddelandemäklare som är konstruerad i Go.

Embedded NATS

Om din applikation är skriven i Go, och om det passar ditt användningsfall och dina driftsättningsscenarier, kan du till och med bädda in en NATS-server i din applikation.

Embedding NATS, NATS docs

  • NATS har dessutom en särart: för applikationer konstruerade i Go stöds ett embedded mode.
  • Detta innebär att istället för den vanliga metoden med meddelandemäklare, där en separat mäklarserver körs och kommunikation sker via applikationens klient till denna server, kan mäklaren själv bäddas in i en Go-applikation.

Fördelar och användningsfall med embedded NATS

  • En välgjord Youtube-video finns tillgänglig, och jag hänvisar till länken för ytterligare information.
  • Även utan att distribuera en separat meddelandemäklarserver kan man skapa en modular monolith applictaion och uppnå separate of concern, samtidigt som man drar nytta av att kunna bädda in NATS. Dessutom möjliggörs single binary deployment.
  • Detta kan användas för platform with no network (WASM) samt för offline-first application.

Exempel i officiell dokumentation

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialize new server with options
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start the server via goroutine
22    go ns.Start()
23
24    // Wait for server to be ready for connections
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Connect to server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Subscribe to the subject
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Print message data
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Shutdown the server (optional)
45        ns.Shutdown()
46    })
47
48    // Publish data to the subject
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Wait for server shutdown
52    ns.WaitForShutdown()
53}
  • Detta är ett exempel på Embedded NATS som länkats i den officiella NATS-dokumentationen, men om man följer denna exempelkod kommer kommunikationen inte att ske i embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Om man övervakar nätverkstrafiken till och från localhost (127.0.0.1) med kommandot watch 'netstat -an | grep 127.0.0.1' medan man kör go-filen med go run ., kan man observera att nya nätverksförfrågningar som härstammar från NATS standardport 4222 läggs till.

Rätt konfigurationer för embedding mode

  • För att kommunicera i embedded mode som avsett, krävs följande två alternativ:

    • Klienten: Alternativet InProcessServer måste inkluderas.
    • Servern: Flaggan DontListen i Server.Options måste uttryckligen anges till true.
  • Dessa aspekter var inte officiellt dokumenterade, men funktionalitetens ursprung kan utläsas via denna PR.

    This PR adds three things:

    1. InProcessConn() function to Server which builds a net.Pipe to get a connection to the NATS server without using TCP sockets
    2. DontListen option which tells the NATS server not to listen on the usual TCP listener
    3. startupComplete channel, which is closed right before we start AcceptLoop, and readyForConnections will wait for it

    The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.

    An accompanying PR nats-io/nats.go#774 adds support to the client side.

    This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.

    /cc @nats-io/core

Fungerande exempel för embedded mode

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// for configuring the embeded NATS server
14		// set DonListen as true
15		DontListen: true,
16	}
17
18	// Initialize new server with options
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start the server via goroutine
26	go ns.Start()
27
28	// Wait for server to be ready for connections
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Connect to server via in-process connection
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Subscribe to the subject
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Print message data
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Shutdown the server (optional)
49		ns.Shutdown()
50	})
51
52	// Publish data to the subject
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Wait for server shutdown
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Nu kan man observera att inga ytterligare network hops uppstår, vilket var avsikten.

Under huven

TL;DR

diagram1

  • Detta är ett sekvensdiagram som illustrerar hur funktionerna internt fungerar när koden körs från main.go, och huvudpunkterna är följande:
    • Genom DontListen: true hoppar servern över AcceptLoop, vilket är klientens lyssningsfas.
    • Om klientens Connect-option InProcessServer aktiveras, skapas en in-memory connection, en pipe skapas via net.Pipe, och en ände av pipen returneras till klienten som en net.Conn-typ.
    • Klienten och servern kommunicerar in-process via denna connection.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Först, om DontListen är true, hoppas AcceptLoop, klientens lyssningsfas, över.
 1// nats-server/server/server.go
 2
 3// AcceptLoop is exported for easier testing.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// If we were to exit before the listener is setup properly,
 6	// make sure we close the channel.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot server options.
18	opts := s.getOpts()
19
20	// Setup state that can enable shutdown
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Alert of TLS enabled.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS required for client connections")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38		}
39	}
40
41	// If server was started with RANDOM_PORT (-1), opts.Port would be equal
42	// to 0 at the beginning this function. So we need to get the actual port
43	if opts.Port == 0 {
44		// Write resolved port back to options.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Now that port has been set (if it was set to RANDOM), set the
49	// server's info Host/Port with either values from Options or
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Keep track of client connect URLs. We may need them later.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signal that we are not accepting new clients
65				s.ldmCh <- true
66				// Now wait for the Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Let the caller know that we are ready
75	close(clr)
76	clr = nil
77}
  • AcceptLoop-funktionen utför för övrigt följande steg. Dessa delar, såsom TLS och hostPort, är relaterade till nätverkskommunikation och är överflödiga vid in-process kommunikation, varför de kan utelämnas.

Klient

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect will attempt to connect to the NATS system.
 5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
 6// Comma separated arrays are also supported, e.g. urlA, urlB.
 7// Options start with the defaults but can be overridden.
 8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
 9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Funktionen Connect, som hanterar anslutningen mellan NATS-servern och NATS-klienten, tillåter konfiguration av klientens URL och anslutningsalternativ. I Options-structen, som innehåller dessa alternativ, finns ett fält InProcessServer av typen InProcessConnProvider interface.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • När en Connect-operation utförs i nats-klienten, om nats.InProcessServer(ns) skickas som InProcessServer-fältet,
 1// nats-go/nats.go
 2
 3// createConn will connect to the server and wrap the appropriate
 4// bufio structures. It will do the right thing when an existing
 5// connection is in place.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// If we have a reference to an in-process server then establish a
15	// connection using that.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • Detta alternativs InProcessServer ersätts med den inbäddade NATS-servern, och
  • Detta interface anropas i funktionen createConn, som skapar en connection, om InProcessServer-alternativet inte är nil (dvs. giltigt), och exekverar InProcessConn från det aktuella alternativet.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Funktionen InProcessConn, som är implementerad på servern, anropas och exekveras.
  • Denna funktion anropas från NATS Go-klienten nats.go om InProcessServer för nc (NATS connection) inte är nil, skapar en connection (net.Conn), och binder denna till serverns connection.

Konsumentdriven interface av Go

A type implements an interface by implementing its methods. There is no explicit declaration of intent, no "implements" keyword. Implicit interfaces decouple the definition of an interface from its implementation, which could then appear in any package without prearrangement.

Interfaces are implemented implicitly, A Tour of Go

If a type exists only to implement an interface and will never have exported methods beyond that interface, there is no need to export the type itself.

Generality, Effective Go

  • Denna interface design återspeglar väl vad som ofta kallas consumer defined interface och structural typing (duck typing) i Go, varför jag även vill introducera detta ämne.
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Låt oss återgå till koden. Fältet InProcessServer i options-structen i nats.go-klienten definieras som ett InProcessConnProvider-interface, som endast utför InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Dock är den typ som används för detta NATS-serverns Server, vilken utför inte bara InProcessConn utan även en mängd andra funktioner.
  • Anledningen är att klientens intresse i detta sammanhang endast rör huruvida ett interface som tillhandahåller InProcessConn erbjuds, medan andra aspekter är av mindre betydelse.
  • Därför har nats.go-klienten skapat och använder endast ett consumer defined interface kallat InProcessConnProvider, som definierar endast funktionen InProcessConn() (net.Conn, error).

Slutsats

  • Denna text har översiktligt behandlat NATS embedded mode och dess funktionssätt, samt Go:s consumer defined interface, vilket kan observeras i NATS källkod.
  • Jag hoppas att denna information kan vara till nytta för de som använder NATS för liknande ändamål, och därmed avslutar jag denna artikel.