GoSuda

Jak vestavěné NATS komunikuje s aplikací go?

By prravda
views ...

Začínáme

O NATS

Softwarové aplikace a služby si potřebují vyměňovat data. NATS je infrastruktura, která umožňuje takovou výměnu dat, segmentovanou ve formě zpráv. Označujeme to jako „message oriented middleware“.

S NATS mohou vývojáři aplikací:

  • Bez námahy vytvářet distribuované a škálovatelné klientsko-serverové aplikace.
  • Ukládat a distribuovat data v reálném čase obecným způsobem. Toho lze flexibilně dosáhnout napříč různými prostředími, jazyky, poskytovateli cloudu a on-premise systémy.

Co je NATS, NATS docs

  • NATS je message broker napsaný v Go.

Embedded NATS

Pokud je vaše aplikace v Go a pokud vyhovuje vašemu případu použití a scénářům nasazení, můžete dokonce vložit NATS server do své aplikace.

Vložení NATS, NATS docs

  • A je tu zvláštnost NATS: pro aplikace napsané v Go podporuje embedded mode.
  • To znamená, že namísto typického způsobu message brokeru, kdy se po spuštění samostatného broker serveru komunikuje s tímto serverem prostřednictvím klienta aplikace, lze samotný broker vložit (embed) do aplikace napsané v Go.

Výhody a případy použití embedded NATS

  • Existuje Youtube video, které to dobře vysvětluje, takže odkážu na něj.
  • I bez nasazení samostatného message broker serveru lze vytvořit modular monolith application a dosáhnout separate of concern, přičemž se využije výhoda embedded NATS. Navíc je možné single binary deployment.
  • Lze jej užitečně použít nejen pro platform with no network (wasm), ale také v offline-first applications.

Příklad v oficiální dokumentaci

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialize new server with options
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start the server via goroutine
22    go ns.Start()
23
24    // Wait for server to be ready for connections
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Connect to server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Subscribe to the subject
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Print message data
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Shutdown the server (optional)
45        ns.Shutdown()
46    })
47
48    // Publish data to the subject
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Wait for server shutdown
52    ns.WaitForShutdown()
53}
  • Toto je příklad embedded NATS z oficiální dokumentace NATS, ale pokud se tento příkladový kód spustí, komunikace neproběhne v embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Pokud spustíte soubor Go s go run . a sledujete síťový provoz na localhost (127.0.0.1) pomocí příkazu watch 'netstat -an | grep 127.0.0.1', uvidíte nové síťové požadavky vycházející z výchozího portu NATS 4222.

Správné konfigurace pro embedding mode

  • Pro komunikaci v embedded mode tak, jak je zamýšleno, jsou zapotřebí následující dvě možnosti:

    • Klient: Musí být přidána volba InProcessServer.
    • Server: V Server.Options musí být flag DontListen explicitně nastaven na true.
  • Tyto části nebyly oficiálně zdokumentovány a počátek této funkce lze vysledovat prostřednictvím tohoto PR.

    This PR adds three things:

    1. InProcessConn() function to Server which builds a net.Pipe to get a connection to the NATS server without using TCP sockets
    2. DontListen option which tells the NATS server not to listen on the usual TCP listener
    3. startupComplete channel, which is closed right before we start AcceptLoop, and readyForConnections will wait for it

    The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.

    An accompanying PR nats-io/nats.go#774 adds support to the client side.

    This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.

    /cc @nats-io/core

Funkční příklad pro embedded mode

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// for configuring the embeded NATS server
14		// set DonListen as true
15		DontListen: true,
16	}
17
18	// Initialize new server with options
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start the server via goroutine
26	go ns.Start()
27
28	// Wait for server to be ready for connections
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Connect to server via in-process connection
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Subscribe to the subject
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Print message data
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Shutdown the server (optional)
49		ns.Shutdown()
50	})
51
52	// Publish data to the subject
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Wait for server shutdown
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Nyní je vidět, že nedochází k žádnému dodatečnému network hop, jak bylo zamýšleno.

Pod pokličkou

TL;DR

diagram1

  • Toto je sekvenční diagram, který ukazuje, jaké funkce a jak interně fungují, když se tento kód spustí v main.go, a hlavní body jsou následující:
    • Pomocí DontListen: true server přeskočí fázi naslouchání klientům nazvanou AcceptLoop.
    • Pokud je aktivována volba InProcessServer v Connect option klienta, vytvoří se in-memory connection a pomocí net.Pipe se vytvoří pipe, jejíž konec se vrátí klientovi jako typ net.Conn.
    • Klient a server komunikují in-process prostřednictvím tohoto connection.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Nejprve, pokud je DontListen nastaveno na true, přeskočí se fáze naslouchání klientům nazvaná AcceptLoop.
 1// nats-server/server/server.go
 2
 3// AcceptLoop is exported for easier testing.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// If we were to exit before the listener is setup properly,
 6	// make sure we close the channel.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot server options.
18	opts := s.getOpts()
19
20	// Setup state that can enable shutdown
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Alert of TLS enabled.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS required for client connections")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38		}
39	}
40
41	// If server was started with RANDOM_PORT (-1), opts.Port would be equal
42	// to 0 at the beginning this function. So we need to get the actual port
43	if opts.Port == 0 {
44		// Write resolved port back to options.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Now that port has been set (if it was set to RANDOM), set the
49	// server's info Host/Port with either values from Options or
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Keep track of client connect URLs. We may need them later.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signal that we are not accepting new clients
65				s.ldmCh <- true
66				// Now wait for the Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Let the caller know that we are ready
75	close(clr)
76	clr = nil
77}
  • Pro informaci, funkce AcceptLoop provádí následující procesy. Jelikož se jedná o části související se síťovou komunikací, jako je TLS nebo hostPort, a jsou zbytečné při in-process communication, lze je vynechat.

Klient

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect will attempt to connect to the NATS system.
 5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
 6// Comma separated arrays are also supported, e.g. urlA, urlB.
 7// Options start with the defaults but can be overridden.
 8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
 9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Funkce Connect, která provádí připojení mezi NATS serverem a NATS klientem, umožňuje nastavit client URL a connect Option, a ve struktuře Options, která sdružuje tyto Option, existuje pole InProcessServer typu interface InProcessConnProvider.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Když nats klient provede Connect a předá nats.InProcessServer(ns) do pole InProcessServer, pak
 1// nats-go/nats.go
 2
 3// createConn will connect to the server and wrap the appropriate
 4// bufio structures. It will do the right thing when an existing
 5// connection is in place.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// If we have a reference to an in-process server then establish a
15	// connection using that.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • se InProcessServer option nahradí embedded nats serverem a
  • toto rozhraní se spustí ve funkci createConn, která vytváří connection, když option InProcessServer není nil (je platná), a to spuštěním InProcesConn z InProcessServer v option, čímž se
  • zavolá a spustí InProcessConn implementovaný na serveru.
  • Tato funkce je volána v nats.go, klientovi Go pro NATS, pokud InProcessServer nc (NATS connection) není nil, a vytvoří connection (net.Conn), které se pak naváže na connection serveru.

Consumer driven interface of Go

A type implements an interface by implementing its methods. There is no explicit declaration of intent, no "implements" keyword. Implicit interfaces decouple the definition of an interface from its implementation, which could then appear in any package without prearrangement.

Interfaces are implemented implicitly, A Tour of Go

If a type exists only to implement an interface and will never have exported methods beyond that interface, there is no need to export the type itself.

Generality, Effective Go

  • Tento návrh rozhraní dobře vystihuje to, co se v Go běžně označuje jako consumer defined interface a structural typing (duck typing), a proto bych rád představil i toto téma.
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Vraťme se zpět ke kódu. Pole struktury option InProcessServer v klientovi nats.go je definováno jako rozhraní InProcessConnProvider, které provádí pouze InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Nicméně typ, který do něj vstupuje, je Server z nats-serveru, který vykonává různé funkce, nejen InProcessConn.
  • Důvodem je, že v této situaci je zájmem klienta pouze to, zda bylo poskytnuto rozhraní InProcessConn, a jiné věci nejsou příliš důležité.
  • Proto klient nats.go vytvořil a používá pouze InProcessConnProvider, consumer defined interface, které definuje pouze funkci InProcessConn() (net.Conn, error).

Závěr

  • Stručně jsme se zabývali embedded mode NATS a jeho fungováním, a také consumer defined interface v Go, které lze ověřit prostřednictvím kódu NATS.
  • Doufám, že tyto informace budou užitečné pro ty, kteří používají NATS k výše uvedeným účelům, a tímto svůj článek uzavírám.