GoSuda

Ako komunikujú vložené NATS s aplikáciou Go?

By prravda
views ...

Začíname

O NATS

Softvérové aplikácie a služby si vyžadujú výmenu dát. NATS je infraštruktúra, ktorá umožňuje takúto výmenu dát, segmentovanú vo forme správ. Nazývame to „správovo orientovaný middleware“.

S NATS môžu vývojári aplikácií:

  • Bez námahy vytvárať distribuované a škálovateľné klientsko-serverové aplikácie.
  • Ukladať a distribuovať dáta v reálnom čase všeobecným spôsobom. To sa dá flexibilne dosiahnuť naprieč rôznymi prostrediami, jazykmi, poskytovateľmi cloudu a on-premises systémami.

What is NATS, NATS docs

  • NATS je Message Broker implementovaný v Go.

Embedded NATS

Ak je vaša aplikácia v Go a ak to vyhovuje vášmu prípadu použitia a scenárom nasadenia, môžete dokonca vložiť server NATS priamo do vašej aplikácie.

Embedding NATS, NATS docs

  • NATS má navyše špecifickú vlastnosť, a to podporu embedded režimu pre aplikácie implementované v Go.
  • To znamená, že namiesto typického prístupu Message Brokera, ktorý zahŕňa spustenie samostatného servera Brokera a komunikáciu s ním prostredníctvom klienta aplikácie, je možné samotný Broker vložiť (embedovať) priamo do aplikácie vytvorenej v Go.

Výhody a prípady použitia embedded NATS

  • Existuje dobre vysvetľujúce video na YouTube, na ktoré sa odkazujem.
  • Aj bez nasadenia samostatného Message Brokera je možné vytvoriť modular monolith application, dosiahnuť separate of concern a využiť výhody embedded NATS. Okrem toho je možná aj single binary deployment.
  • Môže byť užitočný nielen pre platform with no network (WASM), ale aj pre offline-first application.

Príklad z oficiálnej dokumentácie

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Inicializácia nového servera s možnosťami
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Spustenie servera prostredníctvom goroutine
22    go ns.Start()
23
24    // Čakanie, kým bude server pripravený na pripojenia
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Pripojenie k serveru
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Prihlásenie sa k odberu témy
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Vypísanie dát správy
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Vypnutie servera (voliteľné)
45        ns.Shutdown()
46    })
47
48    // Publikovanie dát do témy
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Čakanie na vypnutie servera
52    ns.WaitForShutdown()
53}
  • Toto je príklad embedded NATS uvedený v oficiálnej dokumentácii NATS, avšak ak sa riadite týmto príkladom kódu, komunikácia sa neuskutoční v embedded režime.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Ak vykonáte tento súbor Go pomocou go run . a zároveň monitorujete sieťovú komunikáciu na localhost (127.0.0.1) pomocou príkazu watch 'netstat -an | grep 127.0.0.1', uvidíte, že sa pridávajú nové sieťové požiadavky vychádzajúce z predvoleného portu NATS, ktorým je 4222.

Správne konfigurácie pre embedded režim

  • Na dosiahnutie zamýšľanej komunikácie v embedded režime sú potrebné nasledujúce dve možnosti.

    • Klient: Musí byť pridaná možnosť InProcessServer.
    • Server: V Server.Options musí byť flag DontListen explicitne nastavený na true.
  • Tieto časti neboli oficiálne zdokumentované a začiatok tejto funkcie možno sledovať prostredníctvom tohto PR.

    This PR adds three things:

    1. InProcessConn() funkcia k Server, ktorá vytvára net.Pipe na získanie pripojenia k serveru NATS bez použitia TCP socketov
    2. DontListen možnosť, ktorá hovorí serveru NATS, aby nepočúval na obvyklom TCP listeneri
    3. startupComplete kanál, ktorý sa zatvorí tesne predtým, ako spustíme AcceptLoop, a readyForConnections naň bude čakať

    Hlavnou motiváciou pre toto je, že máme aplikáciu, ktorá môže bežať buď v monolitickom (single-process) režime alebo v polylitickom (multi-process) režime. Radi by sme boli schopní používať NATS pre oba režimy pre jednoduchosť, ale monolitický režim musí byť schopný vyhovieť rôznym platformám, kde otváranie socketových pripojení buď nedáva zmysel (mobilné) alebo jednoducho nie je možné (WASM). Tieto zmeny nám umožnia používať NATS úplne in-process.

    Sprievodný PR nats-io/nats.go#774 pridáva podporu na strane klienta.

    Toto je môj prvý PR do tohto projektu, takže sa vopred ospravedlňujem, ak som niečo zjavné vynechal.

    /cc @nats-io/core

Funkčný príklad pre embedded režim

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// pre konfiguráciu vloženého NATS servera
14		// nastavte DonListen na true
15		DontListen: true,
16	}
17
18	// Inicializácia nového servera s možnosťami
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Spustenie servera prostredníctvom goroutine
26	go ns.Start()
27
28	// Čakanie, kým bude server pripravený na pripojenia
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Pripojenie k serveru prostredníctvom in-process pripojenia
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Prihlásenie sa k odberu témy
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Vypísanie dát správy
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Vypnutie servera (voliteľné)
49		ns.Shutdown()
50	})
51
52	// Publikovanie dát do témy
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Čakanie na vypnutie servera
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...žiadne ďalšie záznamy
  • Teraz je zrejmé, že nevzniká žiadny ďalší network hop, čo bolo zamýšľané.

Pod pokrievkou

TL;DR

diagram1

  • Toto je sekvenčný diagram, ktorý ukazuje, ako interné funkcie pracujú, keď je daný kód spustený v main.go, a jeho podstata je nasledovná:
    • Prostredníctvom DontListen: true server vynechá fázu počúvania klienta, nazývanú AcceptLoop.
    • Ak je aktivovaná možnosť InProcessServer v možnostiach pripojenia klienta, vytvorí sa in-memory connection, prostredníctvom net.Pipe sa vytvorí pipe a koniec pipe sa vráti klientovi ako typ net.Conn.
    • Klient a server komunikujú in-process prostredníctvom tohto pripojenia.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Čakanie na klientov.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Po prvé, ak je DontListen true, fáza počúvania klienta nazvaná AcceptLoop sa vynechá.
 1// nats-server/server/server.go
 2
 3// AcceptLoop je exportovaný pre jednoduchšie testovanie.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Ak by sme mali ukončiť pred správnym nastavením listenera,
 6	// uistite sa, že kanál zatvoríme.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snímka možností servera.
18	opts := s.getOpts()
19
20	// Nastavenie stavu, ktorý môže umožniť vypnutie
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Chyba pri počúvaní na porte: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Počúvanie na klientske pripojenia na %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Upozornenie na povolené TLS.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS je vyžadované pre klientske pripojenia")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Klienti, ktorí nepoužívajú možnosť \"TLS Handshake First\", sa nebudú môcť pripojiť")
38		}
39	}
40
41	// Ak bol server spustený s RANDOM_PORT (-1), opts.Port by bolo na začiatku tejto funkcie
42	// rovné 0. Preto musíme získať skutočný port.
43	if opts.Port == 0 {
44		// Zapíšte rozlíšený port späť do možností.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Teraz, keď bol port nastavený (ak bol nastavený na RANDOM), nastavte
49	// serverové info Host/Port buď s hodnotami z Options alebo
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Chyba pri nastavovaní INFO servera s hodnotou ClientAdvertise %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Sledujte URL pripojenia klienta. Možno ich budeme potrebovať neskôr.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signalizujte, že neprijímame nových klientov
65				s.ldmCh <- true
66				// Teraz počkajte na vypnutie...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Dajte volajúcemu vedieť, že sme pripravení
75	close(clr)
76	clr = nil
77}
  • Mimochodom, funkcia AcceptLoop vykonáva nasledujúce procesy: Ide o časti súvisiace so sieťovou komunikáciou, ako sú TLS a hostPort, ktoré sú zbytočné, ak prebieha in-process komunikácia, a preto ich možno vynechať.

Klient

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect sa pokúsi pripojiť k systému NATS.
 5// URL môže obsahovať sémantiku používateľského mena/hesla. napr. nats://derek:pass@localhost:4222
 6// Podporované sú aj zoznamy oddelené čiarkami, napr. urlA, urlB.
 7// Možnosti začínajú s predvolenými hodnotami, ale môžu byť prepísané.
 8// Pre pripojenie k websocket portu NATS servera použite schému `ws` alebo `wss`, napríklad
 9// `ws://localhost:8080`. Všimnite si, že schémy websocketov nemožno miešať s inými (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options je možné použiť na vytvorenie prispôsobeného pripojenia.
 4type Options struct {
 5	// Url predstavuje jedinú URL servera NATS, ku ktorej sa bude klient
 6	// pripájať. Ak je nastavená aj možnosť Servers, potom sa stane
 7	// prvým serverom v poli Servers.
 8	Url string
 9
10	// InProcessServer predstavuje server NATS bežiaci v rámci
11	// rovnakého procesu. Ak je toto nastavené, pokúsime sa pripojiť
12	// k serveru priamo, namiesto použitia externých TCP pripojení.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Funkcia Connect, ktorá spája server NATS a klienta NATS, umožňuje nastaviť URL klienta a možnosti pripojenia; štruktúra Options, ktorá zhromažďuje tieto možnosti, obsahuje pole InProcessServer typu InProcessConnProvider interface.
1// main.go of example code
2
3// Inicializácia nového servera s možnosťami
4ns, err := server.NewServer(opts)
5
6//...
7
8// Pripojenie k serveru prostredníctvom in-process pripojenia
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Keď klient NATS uskutočňuje pripojenie, ak sa do poľa InProcessServer odovzdá nats.InProcessServer(ns),
 1// nats-go/nats.go
 2
 3// InProcessServer je možnosť, ktorá sa pokúsi nadviazať priame spojenie so serverom NATS
 4// bežiacim v rámci procesu namiesto vytáčania cez TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • InProcessServer v možnostiach je nahradený embedded NATS serverom.
 1// nats-go/nats.go
 2
 3// createConn sa pripojí k serveru a obalí príslušné
 4// bufio štruktúry. Urobí správnu vec, keď existuje
 5// pripojenie.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Ak máme referenciu na in-process server, potom nadviažte
15	// pripojenie pomocou neho.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("nepodarilo sa získať in-process pripojenie: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • Toto rozhranie, ak možnosť InProcessServer v rámci funkcie createConn (ktorá vytvára pripojenie) nie je nil (je platná), vykoná InProcesConn z InProcessServer v možnostiach a
 1// nats-server/server/server.go
 2
 3// InProcessConn vracia in-process pripojenie k serveru,
 4// čím sa vyhýba potrebe použiť TCP listener pre lokálne pripojenie
 5// v rámci rovnakého procesu. Toto sa môže použiť bez ohľadu na
 6// stav možnosti DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("nepodarilo sa vytvoriť pripojenie")
16	}
17	return pr, nil
18}
  • Volá a vykonáva InProcessConn implementovanú na serveri.
  • Táto funkcia, ak InProcessServer v nc (nats connection) v klientovi Go nats (nats.go) nie je nil, sa volá na vytvorenie pripojenia (net.Conn) a jeho naviazanie na pripojenie servera.

Consumer driven interface v Go

Typ implementuje rozhranie implementovaním jeho metód. Neexistuje žiadne explicitné deklarovanie úmyslu, žiadne kľúčové slovo "implements". Implicitné rozhrania oddeľujú definíciu rozhrania od jeho implementácie, ktorá sa potom môže objaviť v akomkoľvek balíku bez predchádzajúcej dohody.

Interfaces are implemented implicitly, A Tour of Go

Ak typ existuje len na implementáciu rozhrania a nikdy nebude mať exportované metódy mimo tohto rozhrania, nie je potrebné exportovať samotný typ.

Generality, Effective Go

  • Tento návrh rozhrania dobre zachytáva takzvané consumer defined interface a structural typing (duck typing) v Go, preto by som rád predstavil aj túto tému.
 1// nats-go/nats.go
 2
 3// Options je možné použiť na vytvorenie prispôsobeného pripojenia.
 4type Options struct {
 5	// Url predstavuje jedinú URL servera NATS, ku ktorej sa bude klient
 6	// pripájať. Ak je nastavená aj možnosť Servers, potom sa stane
 7	// prvým serverom v poli Servers.
 8	Url string
 9
10	// InProcessServer predstavuje server NATS bežiaci v rámci
11	// rovnakého procesu. Ak je toto nastavené, pokúsime sa pripojiť
12	// k serveru priamo, namiesto použitia externých TCP pripojení.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Vráťme sa ku kódu. V klientovi nats.go je pole štruktúry InProcessServer definované ako rozhranie InProcessConnProvider, ktoré vykonáva iba InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn vracia in-process pripojenie k serveru,
 4// čím sa vyhýba potrebe použiť TCP listener pre lokálne pripojenie
 5// v rámci rovnakého procesu. Toto sa môže použiť bez ohľadu na
 6// stav možnosti DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("nepodarilo sa vytvoriť pripojenie")
16	}
17	return pr, nil
18}
  • Avšak typ, ktorý do nej vstupuje, je Server z nats-servera, ktorý okrem InProcessConn vykonáva aj rôzne iné funkcie.
  • Dôvodom je, že v tejto situácii je pre klienta dôležité len to, či bolo poskytnuté rozhranie InProcessConn, zatiaľ čo ostatné veci nie sú až tak podstatné.
  • Preto klient nats.go vytvoril a používa iba consumer defined interface nazvané InProcessConnProvider, ktoré definuje iba funkciu InProcessConn() (net.Conn, error).

Záver

  • Stručne som sa zaoberal embedded režimom NATS, jeho fungovaním a Go consumer defined interface, ktoré možno pozorovať v kóde NATS.
  • Dúfam, že tieto informácie budú užitočné pre tých, ktorí používajú NATS na uvedené účely, a týmto sa lúčim s týmto článkom.