GoSuda

Akým spôsobom komunikuje embedded NATS s go application?

By prravda
views ...

Začíname

O systéme NATS

Softvérové aplikácie a služby potrebujú vymieňať dáta. NATS predstavuje infraštruktúru, ktorá umožňuje takúto výmenu dát, segmentovanú vo forme správ. Tento systém nazývame "message oriented middleware".

S nástrojom NATS môžu vývojári aplikácií:

  • Bez námahy vytvárať distribuované a škálovateľné klientsko-serverové aplikácie.
  • Ukladať a distribuovať dáta v reálnom čase všeobecným spôsobom. To je možné flexibilne dosiahnuť naprieč rôznymi prostrediami, jazykmi, poskytovateľmi cloudu a systémami on-premises.

What is NATS, NATS docs

  • NATS je message broker zložený v jazyku Go.

Embedded NATS

Ak je vaša aplikácia v jazyku Go a ak sa hodí pre váš prípad použitia a scenáre nasadenia, môžete dokonca vložiť (embed) NATS server priamo do svojej aplikácie.

Embedding NATS, NATS docs

  • A špecifikom NATS je, že podporuje režim embedded pre aplikácie vyvinuté v jazyku Go.
  • To znamená, že namiesto bežného prístupu message brokera, kde sa spustí samostatný server brokera a komunikuje sa s ním prostredníctvom klienta aplikácie, je možné samotný broker vbudovať (embed) do aplikácie vytvorenej v jazyku Go.

Výhody a prípady použitia embedded NATS

  • Keďže existuje dobre vysvetlené Youtube video, odkazujem sa na odkaz na toto video.
  • Je možné dosiahnuť separate of concern vytvorením modular monolith application bez nutnosti nasadiť samostatný server message brokera, pričom sa využívajú výhody vloženia NATS v režime embedded. Navyše sa tým umožní aj single binary deployment.
  • Je to užitočné nielen pre platformy bez siete (platform with no network - wasm), ale aj pre offline-first application.

Príklad z oficiálnej dokumentácie

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialize new server with options
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start the server via goroutine
22    go ns.Start()
23
24    // Wait for server to be ready for connections
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Connect to server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Subscribe to the subject
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Print message data
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Shutdown the server (optional)
45        ns.Shutdown()
46    })
47
48    // Publish data to the subject
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Wait for server shutdown
52    ns.WaitForShutdown()
53}
  • Príklad Embedded NATS z oficiálnej dokumentácie NATS však neumožňuje komunikáciu v režime embedding, ak sa vykoná presne podľa uvedeného príkladového kódu.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Po spustení daného súboru .go pomocou príkazu go run . a sledovaní sieťovej prevádzky medzi localhost (127.0.0.1) pomocou príkazu watch 'netstat -an | grep 127.0.0.1', je možné pozorovať, že pribúdajú nové sieťové požiadavky vychádzajúce z predvoleného portu NATS, ktorým je 4222.

Správne konfigurácie pre režim embedded

  • Aby sa komunikácia uskutočňovala v zamýšľanom režime embedded, sú potrebné nasledujúce dve voľby:

    • Klient: Je nutné pridať voľbu InProcessServer.
    • Server: Je nutné v Server.Options explicitne uviesť príznak DontListen ako true.
  • Tieto aspekty nie sú oficiálne dokumentované a pôvod tejto funkcie je možné zistiť prostredníctvom predmetného PR.

    This PR adds three things:

    1. InProcessConn() function to Server which builds a net.Pipe to get a connection to the NATS server without using TCP sockets
    2. DontListen option which tells the NATS server not to listen on the usual TCP listener
    3. startupComplete channel, which is closed right before we start AcceptLoop, and readyForConnections will wait for it

    The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.

    An accompanying PR nats-io/nats.go#774 adds support to the client side.

    This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.

    /cc @nats-io/core

Funkčný príklad pre režim embedded

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// for configuring the embeded NATS server
14		// set DonListen as true
15		DontListen: true,
16	}
17
18	// Initialize new server with options
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start the server via goroutine
26	go ns.Start()
27
28	// Wait for server to be ready for connections
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Connect to server via in-process connection
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Subscribe to the subject
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Print message data
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Shutdown the server (optional)
49		ns.Shutdown()
50	})
51
52	// Publish data to the subject
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Wait for server shutdown
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Teraz je možné pozorovať, že nedochádza k žiadnym dodatočným sieťovým prechodom (network hop), ako bolo zamýšľané.

Pod kapotou (Interná implementácia)

Zhrnutie (TL;DR)

diagram1

  • Toto je sekvenčný diagram znázorňujúci, aké funkcie a ako interne pracujú pri spustení tohto kódu v súbore main.go, pričom jeho podstata je nasledovná:
    • Keď je nastavené DontListen: true, server vynechá fázu počúvania klienta nazvanú AcceptLoop.
    • Ak je voľba InProcessServer klienta aktivovaná, vytvorí sa in-memory spojenie, a následne sa pomocou net.Pipe vytvorí rúra (pipe), ktorej jeden koniec je klientovi vrátený ako spojenie typu net.Conn.
    • Klient a server realizujú komunikáciu in-process prostredníctvom tohto spojenia.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Predovšetkým, ak je DontListen nastavené na hodnotu true, preskočí sa fáza počúvania klienta nazvaná AcceptLoop.
 1// nats-server/server/server.go
 2
 3// AcceptLoop is exported for easier testing.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// If we were to exit before the listener is setup properly,
 6	// make sure we close the channel.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot server options.
18	opts := s.getOpts()
19
20	// Setup state that can enable shutdown
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Alert of TLS enabled.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS required for client connections")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38		}
39	}
40
41	// If server was started with RANDOM_PORT (-1), opts.Port would be equal
42	// to 0 at the beginning this function. So we need to get the actual port
43	if opts.Port == 0 {
44		// Write resolved port back to options.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Now that port has been set (if it was set to RANDOM), set the
49	// server's info Host/Port with either values from Options or
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Keep track of client connect URLs. We may need them later.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signal that we are not accepting new clients
65				s.ldmCh <- true
66				// Now wait for the Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Let the caller know that we are ready
75	close(clr)
76	clr = nil
77}
  • Funkcia AcceptLoop, mimochodom, vykonáva nasledujúce kroky. Ide o časti súvisiace so sieťovou komunikáciou, ako sú TLS alebo hostPort, ktoré sú pri komunikácii in-process zbytočné, a preto je možné ich vynechať.

Klient

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect will attempt to connect to the NATS system.
 5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
 6// Comma separated arrays are also supported, e.g. urlA, urlB.
 7// Options start with the defaults but can be overridden.
 8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
 9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Funkcia Connect v nats client, ktorá zaisťuje spojenie medzi nats server a nats client, umožňuje nastaviť URL klienta a voľby pripojenia (connect Option). Štruktúra Options, ktorá zhromažďuje tieto voľby, obsahuje pole s názvom InProcessServer typu rozhrania (interface) InProcessConnProvider.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Pri inicializácii pripojenia v nats kliente, ak je ako pole InProcessServer poskytnutý argument nats.InProcessServer(ns),
 1// nats-go/nats.go
 2
 3// InProcessServer is an Option that will try to establish a direction to a NATS server
 4// running within the process instead of dialing via TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • pole InProcessServer vo voľbách (options) je nahradené vloženým (embedded) nats serverom a
 1// nats-go/nats.go
 2
 3// createConn will connect to the server and wrap the appropriate
 4// bufio structures. It will do the right thing when an existing
 5// connection is in place.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// If we have a reference to an in-process server then establish a
15	// connection using that.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • toto rozhranie je volané vo funkcii createConn, ktorá vytvára spojenie, a to v prípade, že voľba InProcessServer nie je nil (je platná), čím sa vykoná metóda InProcesConn z voľby a vytvorí sa spojenie (net.Conn), ktoré sa následne viaže k spojeniu servera.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Vykonáva sa implementácia InProcessConn na serveri NATS.
  • Táto funkcia je volaná v prípade, že InProcessServer v nats.go kliente nie je nil, čím sa vytvorí spojenie (net.Conn) a viaže sa k spojeniu servera.

Rozhranie riadené spotrebiteľom (Consumer driven interface) v jazyku Go

Typ implementuje rozhranie implementáciou jeho metód. Neexistuje žiadne explicitné vyhlásenie zámeru, žiadne kľúčové slovo „implements“. Implicitné rozhrania oddeľujú definíciu rozhrania od jeho implementácie, ktorá sa následne môže objaviť v akomkoľvek balíku bez predchádzajúcej dohody.

Interfaces are implemented implicitly, A Tour of Go

Ak typ existuje len na implementáciu rozhrania a nikdy nebude mať exportované metódy nad rámec tohto rozhrania, nie je potrebné exportovať samotný typ.

Generality, Effective Go

  • Tento návrh rozhrania (interface design) dobre vystihuje to, čo sa v jazyku Go často označuje ako consumer defined interface a structural typing (duck typing), preto som sa rozhodol predstaviť aj túto tému.
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Vráťme sa späť ku kódu. V klientskej knižnici nats.go je pole štruktúry (struct field) voľby InProcessServer definované ako rozhranie InProcessConnProvider, ktoré vykonáva výhradne funkciu InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Avšak typ, ktorý sa doň vkladá, je Server z nats-server, ktorý okrem InProcessConn vykonáva aj rôzne iné funkcie.
  • Dôvodom je, že v danom kontexte klienta je podstatné iba to, či poskytuje funkciu (interface) InProcessConn() (net.Conn, error), zatiaľ čo ostatné aspekty sú menej významné.
  • Preto nats.go klient vytvoril a používa len consumer defined interface nazvaný InProcessConnProvider, ktorý definuje výlučne funkciu InProcessConn() (net.Conn, error).

Záver

  • Stručne sme sa zamerali na režim embedded NATS, jeho spôsob fungovania a na Go consumer defined interface, ktorý je možné pozorovať v kóde NATS.
  • Ukončujem tento text s vierou, že tieto informácie budú užitočné pre osoby využívajúce NATS na vyššie uvedené účely.