GoSuda

Ako komunikujú vložené NATS s aplikáciou go?

By prravda
views ...

Začíname

O NATS

Softvérové aplikácie a služby si musia vymieňať dáta. NATS je infraštruktúra, ktorá umožňuje takúto výmenu dát, segmentovanú vo forme správ. Nazývame to „správovo orientovaný middleware“.

S NATS môžu vývojári aplikácií:

  • Bez námahy vytvárať distribuované a škálovateľné klientsko-serverové aplikácie.
  • Ukladať a distribuovať dáta v reálnom čase všeobecným spôsobom. To sa dá flexibilne dosiahnuť naprieč rôznymi prostrediami, jazykmi, poskytovateľmi cloudu a lokálnymi systémami.

Čo je NATS, NATS docs

  • NATS je message broker napísaný v Go.

Embedded NATS

Ak je vaša aplikácia v Go a ak to vyhovuje vášmu prípadu použitia a scenárom nasadenia, môžete dokonca vložiť server NATS do svojej aplikácie.

Vkladanie NATS, NATS docs

  • NATS má špecifickú vlastnosť, a to, že podporuje embedded mode pre aplikácie napísané v Go.
  • To znamená, že namiesto typického prístupu message brokera, kde sa spúšťa samostatný broker server a komunikácia prebieha prostredníctvom klienta aplikácie, môže byť samotný broker vložený (embed) priamo do aplikácie vytvorenej v Go.

Výhody a prípady použitia embedded NATS

  • Existuje dobre vysvetľujúce video na Youtube, takže odkaz na video postačí.
  • Bez nasadenia samostatného message broker servera je možné vytvoriť modular monolith application a dosiahnuť separate of concern, pričom sa využijú výhody embedded NATS. Okrem toho je možné aj single binary deployment.
  • Je to užitočné nielen pre platformy bez siete (WASM), ale aj pre offline-first application.

Príklad z oficiálnej dokumentácie

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Inicializácia nového servera s možnosťami
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Spustenie servera pomocou goroutine
22    go ns.Start()
23
24    // Čakanie na pripravenosť servera na pripojenia
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("nie je pripravený na pripojenie")
27    }
28
29    // Pripojenie k serveru
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Prihlásenie sa k téme
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Vypísanie dát správy
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Vypnutie servera (voliteľné)
45        ns.Shutdown()
46    })
47
48    // Zverejnenie dát do témy
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Čakanie na vypnutie servera
52    ns.WaitForShutdown()
53}
  • Toto je príklad Embedded NATS uvedený v oficiálnej dokumentácii NATS, ale ak sa postupuje podľa tohto príkladového kódu, komunikácia sa neuskutoční v embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Ak spustíte tento súbor go pomocou príkazu go run . a zároveň sledujete sieťovú prevádzku na localhost (127.0.0.1) pomocou príkazu watch 'netstat -an | grep 127.0.0.1', uvidíte nové sieťové požiadavky prichádzajúce z predvoleného portu NATS, 4222.

Správne konfigurácie pre embedding mode

  • Na komunikáciu v embedded mode podľa zámeru sú potrebné nasledujúce dve možnosti.

    • Klient: Je potrebné pridať možnosť InProcessServer.
    • Server: V Server.Options je potrebné nastaviť príznak DontListen na true.
  • Tieto časti neboli oficiálne zdokumentované a začiatok tejto funkcie možno zistiť prostredníctvom tohto PR.

    Tento PR pridáva tri veci:

    1. Funkciu InProcessConn() do Server, ktorá vytvára net.Pipe na získanie pripojenia k serveru NATS bez použitia TCP socketov.
    2. Možnosť DontListen, ktorá hovorí serveru NATS, aby nepočúval na bežnom TCP listeneri.
    3. Kanál startupComplete, ktorý sa uzavrie tesne predtým, než spustíme AcceptLoop, a readyForConnections naň bude čakať.

    Hlavnou motiváciou pre toto je, že máme aplikáciu, ktorá môže bežať buď v režime monolith (jednoprocesový) alebo polylith (viacprocesový). Chceli by sme byť schopní používať NATS pre oba režimy pre jednoduchosť, ale režim monolith musí byť schopný vyhovieť rôznym platformám, kde otváranie socketových pripojení buď nedáva zmysel (mobilné) alebo jednoducho nie je možné (WASM). Tieto zmeny nám umožnia namiesto toho používať NATS úplne in-process.

    Súčasný PR nats-io/nats.go#774 pridáva podporu na strane klienta.

    Toto je môj prvý PR do tohto projektu, takže sa vopred ospravedlňujem, ak som niečo zjavné prehliadol.

    /cc @nats-io/core

Pracovný príklad pre embedded mode

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// pre konfiguráciu vloženého NATS servera
14		// nastavte DontListen ako true
15		DontListen: true,
16	}
17
18	// Inicializácia nového servera s možnosťami
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Spustenie servera pomocou goroutine
26	go ns.Start()
27
28	// Čakanie na pripravenosť servera na pripojenia
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("nie je pripravený na pripojenie")
31	}
32
33	// Pripojenie k serveru prostredníctvom in-process pripojenia
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Prihlásenie sa k téme
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Vypísanie dát správy
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Vypnutie servera (voliteľné)
49		ns.Shutdown()
50	})
51
52	// Zverejnenie dát do témy
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Čakanie na vypnutie servera
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Teraz je možné vidieť, že nedochádza k žiadnemu ďalšiemu sieťovému „hopu“, ako bolo zamýšľané.

Pod pokrievkou

TL;DR

diagram1

  • Toto je sekvenčný diagram, ktorý ukazuje, aké funkcie a ako fungujú interne, keď sa tento kód spustí z main.go, a podstata je nasledovná.
    • Prostredníctvom DontListen: true server preskočí fázu počúvania klienta nazvanú AcceptLoop.
    • Ak je aktivovaná možnosť InProcessServer pre pripojenie klienta, vytvorí sa in-memory connection a net.Pipe sa použije na vytvorenie pipe, pričom koniec pipe sa vráti klientovi ako typ net.Conn.
    • Klient a server komunikujú in-process prostredníctvom tohto pripojenia.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Čakajte na klientov.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Najprv, ak je DontListen nastavené na true, fáza počúvania klienta AcceptLoop sa preskočí.
 1// nats-server/server/server.go
 2
 3// AcceptLoop je exportovaný pre jednoduchšie testovanie.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Ak by sme mali ukončiť pred správnym nastavením listenera,
 6	// uistite sa, že sme kanál uzavreli.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snímka možností servera.
18	opts := s.getOpts()
19
20	// Nastavenie stavu, ktorý môže umožniť vypnutie
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Chyba pri počúvaní na porte: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Počúvanie na klientske pripojenia na %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Upozornenie na povolené TLS.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS je vyžadované pre klientske pripojenia")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Klienti, ktorí nepoužívajú možnosť \"TLS Handshake First\", sa nepripoja")
38		}
39	}
40
41	// Ak bol server spustený s RANDOM_PORT (-1), opts.Port by bolo na začiatku tejto funkcie
42	// rovné 0. Takže musíme získať skutočný port.
43	if opts.Port == 0 {
44		// Zapíšte vyriešený port späť do možností.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Teraz, keď bol port nastavený (ak bol nastavený na RANDOM), nastavte
49	// Host/Port informácií servera buď z hodnôt Options alebo
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Chyba pri nastavovaní informácií servera s hodnotou ClientAdvertise %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Sledujte URL pripojenia klienta. Možno ich budeme potrebovať neskôr.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signalizujte, že neprijímame nových klientov
65				s.ldmCh <- true
66				// Teraz počkajte na vypnutie...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Dajte volajúcemu vedieť, že sme pripravení
75	close(clr)
76	clr = nil
77}
  • Funkcia AcceptLoop prechádza nasledujúcimi procesmi. Ide o časti súvisiace so sieťovou komunikáciou, ako sú TLS a hostPort, ktoré sú irelevantné pre in-process communication, takže ich možno vynechať.

Klient

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect sa pokúsi pripojiť k systému NATS.
 5// URL môže obsahovať sémantiku používateľského mena/hesla. napr. nats://derek:pass@localhost:4222
 6// Podporované sú aj zoznamy oddelené čiarkami, napr. urlA, urlB.
 7// Možnosti začínajú predvolenými, ale môžu byť prepísané.
 8// Pre pripojenie k websocket portu NATS servera použite schému `ws` alebo `wss`, napríklad
 9// `ws://localhost:8080`. Všimnite si, že websocket schémy nemožno miešať s inými (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Možnosti možno použiť na vytvorenie prispôsobeného pripojenia.
 4type Options struct {
 5	// Url predstavuje jednu URL servera NATS, ku ktorej sa klient
 6	// bude pripájať. Ak je nastavená aj možnosť Servers,
 7	// potom sa stane prvým serverom v poli Servers.
 8	Url string
 9
10	// InProcessServer predstavuje NATS server bežiaci v rámci
11	// rovnakého procesu. Ak je toto nastavené, pokúsime sa pripojiť
12	// k serveru priamo, namiesto použitia externých TCP pripojení.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Funkcia Connect, ktorá spravuje pripojenie medzi NATS serverom a NATS klientom, umožňuje nastaviť URL klienta a možnosti pripojenia. Štruktúra Options, ktorá zhromažďuje tieto možnosti, obsahuje pole InProcessServer typu InProcessConnProvider interface.
1// main.go of example code
2
3// Inicializácia nového servera s možnosťami
4ns, err := server.NewServer(opts)
5
6//...
7
8// Pripojenie k serveru prostredníctvom in-process pripojenia
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Keď klient NATS vykoná Connect a odovzdá nats.InProcessServer(ns) ako pole InProcessServer,
 1// nats-go/nats.go
 2
 3// InProcessServer je možnosť, ktorá sa pokúsi nadviazať priame spojenie s NATS serverom
 4// bežiacim v rámci procesu namiesto vytáčania cez TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • možnosť InProcessServer je nahradená embedded NATS serverom, a
 1// nats-go/nats.go
 2
 3// createConn sa pripojí k serveru a obalí príslušné
 4// štruktúry bufio. Urobí správnu vec, keď existuje
 5// pripojenie.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Ak máme referenciu na in-process server, potom nadviažeme
15	// pripojenie pomocou neho.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("nepodarilo sa získať in-process pripojenie: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • tento interface, keď možnosť InProcessServer vo funkcii createConn nie je nil (je validná), vykoná InProcesConn možnosti InProcessServer, a
 1// nats-server/server/server.go
 2
 3// InProcessConn vracia in-process pripojenie k serveru,
 4// čím sa vyhýba potrebe používať TCP listener pre lokálne pripojenie
 5// v rámci rovnakého procesu. Toto sa môže použiť bez ohľadu na
 6// stav možnosti DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("nepodarilo sa vytvoriť pripojenie")
16	}
17	return pr, nil
18}
  • volá a vykonáva InProcessConn implementovaný na serveri.
  • Táto funkcia sa volá, ak InProcessServer v nc (nats connection) v Go klientovi nats.go nie je nil, vytvorí sa connection (net.Conn) a táto sa potom naviaže na connection servera.

Rozhranie orientované na spotrebiteľa v Go

Typ implementuje rozhranie implementáciou jeho metód. Neexistuje žiadne explicitné vyhlásenie zámeru, žiadne kľúčové slovo „implements“. Implicitné rozhrania oddeľujú definíciu rozhrania od jeho implementácie, ktorá sa potom môže objaviť v akomkoľvek balíku bez predchádzajúcej dohody.

Rozhrania sú implementované implicitne, A Tour of Go

Ak typ existuje len na implementáciu rozhrania a nikdy nebude mať exportované metódy nad rámec tohto rozhrania, nie je potrebné exportovať samotný typ.

Všeobecnosť, Effective Go

  • Tento návrh rozhrania dobre zachytáva takzvané consumer defined interface a structural typing (duck typing) v Go, a preto by som chcel predstaviť aj túto tému.
 1// nats-go/nats.go
 2
 3// Možnosti možno použiť na vytvorenie prispôsobeného pripojenia.
 4type Options struct {
 5	// Url predstavuje jednu URL servera NATS, ku ktorej sa klient
 6	// bude pripájať. Ak je nastavená aj možnosť Servers,
 7	// potom sa stane prvým serverom v poli Servers.
 8	Url string
 9
10	// InProcessServer predstavuje NATS server bežiaci v rámci
11	// rovnakého procesu. Ak je toto nastavené, pokúsime sa pripojiť
12	// k serveru priamo, namiesto použitia externých TCP pripojení.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Vráťme sa ku kódu. Pole štruktúry InProcessServer v klientovi nats.go je definované ako rozhranie InProcessConnProvider, ktoré vykonáva iba InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn vracia in-process pripojenie k serveru,
 4// čím sa vyhýba potrebe používať TCP listener pre lokálne pripojenie
 5// v rámci rovnakého procesu. Toto sa môže použiť bez ohľadu na
 6// stav možnosti DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("nepodarilo sa vytvoriť pripojenie")
16	}
17	return pr, nil
18}
  • Avšak, typ, ktorý do neho vstupuje, je Server z nats-server, ktorý vykonáva nielen InProcessConn, ale aj rôzne iné funkcie.
  • Dôvodom je, že v tejto situácii je pre klienta dôležité len to, či bolo poskytnuté rozhranie InProcessConn, zatiaľ čo iné aspekty sú menej podstatné.
  • Preto klient nats.go vytvára a používa iba InProcessConnProvider – consumer defined interface, ktoré definuje len funkciu InProcessConn() (net.Conn, error).

Záver

  • Stručne som sa venoval embedded mode NATS, jeho fungovaniu a consumer defined interface v Go, ktoré možno overiť prostredníctvom kódu NATS.
  • Dúfam, že tieto informácie pomôžu tým, ktorí používajú NATS na uvedené účely, a týmto by som chcel tento článok ukončiť.