Ako komunikujú vložené NATS s aplikáciou Go?
Začíname
O NATS
Softvérové aplikácie a služby si vyžadujú výmenu dát. NATS je infraštruktúra, ktorá umožňuje takúto výmenu dát, segmentovanú vo forme správ. Nazývame to „správovo orientovaný middleware“.
S NATS môžu vývojári aplikácií:
- Bez námahy vytvárať distribuované a škálovateľné klientsko-serverové aplikácie.
- Ukladať a distribuovať dáta v reálnom čase všeobecným spôsobom. To sa dá flexibilne dosiahnuť naprieč rôznymi prostrediami, jazykmi, poskytovateľmi cloudu a on-premises systémami.
- NATS je Message Broker implementovaný v Go.
Embedded NATS
Ak je vaša aplikácia v Go a ak to vyhovuje vášmu prípadu použitia a scenárom nasadenia, môžete dokonca vložiť server NATS priamo do vašej aplikácie.
- NATS má navyše špecifickú vlastnosť, a to podporu embedded režimu pre aplikácie implementované v Go.
- To znamená, že namiesto typického prístupu Message Brokera, ktorý zahŕňa spustenie samostatného servera Brokera a komunikáciu s ním prostredníctvom klienta aplikácie, je možné samotný Broker vložiť (embedovať) priamo do aplikácie vytvorenej v Go.
Výhody a prípady použitia embedded NATS
- Existuje dobre vysvetľujúce video na YouTube, na ktoré sa odkazujem.
- Aj bez nasadenia samostatného Message Brokera je možné vytvoriť modular monolith application, dosiahnuť separate of concern a využiť výhody embedded NATS. Okrem toho je možná aj single binary deployment.
- Môže byť užitočný nielen pre platform with no network (WASM), ale aj pre offline-first application.
Príklad z oficiálnej dokumentácie
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Inicializácia nového servera s možnosťami
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Spustenie servera prostredníctvom goroutine
22 go ns.Start()
23
24 // Čakanie, kým bude server pripravený na pripojenia
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Pripojenie k serveru
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Prihlásenie sa k odberu témy
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Vypísanie dát správy
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Vypnutie servera (voliteľné)
45 ns.Shutdown()
46 })
47
48 // Publikovanie dát do témy
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Čakanie na vypnutie servera
52 ns.WaitForShutdown()
53}
- Toto je príklad embedded NATS uvedený v oficiálnej dokumentácii NATS, avšak ak sa riadite týmto príkladom kódu, komunikácia sa neuskutoční v embedded režime.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Ak vykonáte tento súbor Go pomocou
go run .
a zároveň monitorujete sieťovú komunikáciu na localhost (127.0.0.1) pomocou príkazuwatch 'netstat -an | grep 127.0.0.1'
, uvidíte, že sa pridávajú nové sieťové požiadavky vychádzajúce z predvoleného portu NATS, ktorým je4222
.
Správne konfigurácie pre embedded režim
Na dosiahnutie zamýšľanej komunikácie v embedded režime sú potrebné nasledujúce dve možnosti.
- Klient: Musí byť pridaná možnosť
InProcessServer
. - Server: V
Server.Options
musí byť flagDontListen
explicitne nastavený natrue
.
- Klient: Musí byť pridaná možnosť
Tieto časti neboli oficiálne zdokumentované a začiatok tejto funkcie možno sledovať prostredníctvom tohto PR.
This PR adds three things:
InProcessConn()
funkcia kServer
, ktorá vytváranet.Pipe
na získanie pripojenia k serveru NATS bez použitia TCP socketovDontListen
možnosť, ktorá hovorí serveru NATS, aby nepočúval na obvyklom TCP listeneristartupComplete
kanál, ktorý sa zatvorí tesne predtým, ako spustímeAcceptLoop
, areadyForConnections
naň bude čakať
Hlavnou motiváciou pre toto je, že máme aplikáciu, ktorá môže bežať buď v monolitickom (single-process) režime alebo v polylitickom (multi-process) režime. Radi by sme boli schopní používať NATS pre oba režimy pre jednoduchosť, ale monolitický režim musí byť schopný vyhovieť rôznym platformám, kde otváranie socketových pripojení buď nedáva zmysel (mobilné) alebo jednoducho nie je možné (WASM). Tieto zmeny nám umožnia používať NATS úplne in-process.
Sprievodný PR nats-io/nats.go#774 pridáva podporu na strane klienta.
Toto je môj prvý PR do tohto projektu, takže sa vopred ospravedlňujem, ak som niečo zjavné vynechal.
/cc @nats-io/core
Funkčný príklad pre embedded režim
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // pre konfiguráciu vloženého NATS servera
14 // nastavte DonListen na true
15 DontListen: true,
16 }
17
18 // Inicializácia nového servera s možnosťami
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Spustenie servera prostredníctvom goroutine
26 go ns.Start()
27
28 // Čakanie, kým bude server pripravený na pripojenia
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Pripojenie k serveru prostredníctvom in-process pripojenia
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Prihlásenie sa k odberu témy
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Vypísanie dát správy
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Vypnutie servera (voliteľné)
49 ns.Shutdown()
50 })
51
52 // Publikovanie dát do témy
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Čakanie na vypnutie servera
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...žiadne ďalšie záznamy
- Teraz je zrejmé, že nevzniká žiadny ďalší network hop, čo bolo zamýšľané.
Pod pokrievkou
TL;DR
- Toto je sekvenčný diagram, ktorý ukazuje, ako interné funkcie pracujú, keď je daný kód spustený v
main.go
, a jeho podstata je nasledovná:- Prostredníctvom
DontListen: true
server vynechá fázu počúvania klienta, nazývanúAcceptLoop
. - Ak je aktivovaná možnosť
InProcessServer
v možnostiach pripojenia klienta, vytvorí sa in-memory connection, prostredníctvomnet.Pipe
sa vytvorí pipe a koniec pipe sa vráti klientovi ako typnet.Conn
. - Klient a server komunikujú in-process prostredníctvom tohto pripojenia.
- Prostredníctvom
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Čakanie na klientov.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Po prvé, ak je
DontListen
true, fáza počúvania klienta nazvanáAcceptLoop
sa vynechá.
1// nats-server/server/server.go
2
3// AcceptLoop je exportovaný pre jednoduchšie testovanie.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Ak by sme mali ukončiť pred správnym nastavením listenera,
6 // uistite sa, že kanál zatvoríme.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Snímka možností servera.
18 opts := s.getOpts()
19
20 // Nastavenie stavu, ktorý môže umožniť vypnutie
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Chyba pri počúvaní na porte: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Počúvanie na klientske pripojenia na %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Upozornenie na povolené TLS.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS je vyžadované pre klientske pripojenia")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Klienti, ktorí nepoužívajú možnosť \"TLS Handshake First\", sa nebudú môcť pripojiť")
38 }
39 }
40
41 // Ak bol server spustený s RANDOM_PORT (-1), opts.Port by bolo na začiatku tejto funkcie
42 // rovné 0. Preto musíme získať skutočný port.
43 if opts.Port == 0 {
44 // Zapíšte rozlíšený port späť do možností.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Teraz, keď bol port nastavený (ak bol nastavený na RANDOM), nastavte
49 // serverové info Host/Port buď s hodnotami z Options alebo
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Chyba pri nastavovaní INFO servera s hodnotou ClientAdvertise %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Sledujte URL pripojenia klienta. Možno ich budeme potrebovať neskôr.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signalizujte, že neprijímame nových klientov
65 s.ldmCh <- true
66 // Teraz počkajte na vypnutie...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Dajte volajúcemu vedieť, že sme pripravení
75 close(clr)
76 clr = nil
77}
- Mimochodom, funkcia AcceptLoop vykonáva nasledujúce procesy: Ide o časti súvisiace so sieťovou komunikáciou, ako sú
TLS
ahostPort
, ktoré sú zbytočné, ak prebieha in-process komunikácia, a preto ich možno vynechať.
Klient
InProcessServer
1
2// nats-go/nats.go
3
4// Connect sa pokúsi pripojiť k systému NATS.
5// URL môže obsahovať sémantiku používateľského mena/hesla. napr. nats://derek:pass@localhost:4222
6// Podporované sú aj zoznamy oddelené čiarkami, napr. urlA, urlB.
7// Možnosti začínajú s predvolenými hodnotami, ale môžu byť prepísané.
8// Pre pripojenie k websocket portu NATS servera použite schému `ws` alebo `wss`, napríklad
9// `ws://localhost:8080`. Všimnite si, že schémy websocketov nemožno miešať s inými (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options je možné použiť na vytvorenie prispôsobeného pripojenia.
4type Options struct {
5 // Url predstavuje jedinú URL servera NATS, ku ktorej sa bude klient
6 // pripájať. Ak je nastavená aj možnosť Servers, potom sa stane
7 // prvým serverom v poli Servers.
8 Url string
9
10 // InProcessServer predstavuje server NATS bežiaci v rámci
11 // rovnakého procesu. Ak je toto nastavené, pokúsime sa pripojiť
12 // k serveru priamo, namiesto použitia externých TCP pripojení.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Funkcia
Connect
, ktorá spája server NATS a klienta NATS, umožňuje nastaviť URL klienta a možnosti pripojenia; štruktúra Options, ktorá zhromažďuje tieto možnosti, obsahuje poleInProcessServer
typuInProcessConnProvider interface
.
1// main.go of example code
2
3// Inicializácia nového servera s možnosťami
4ns, err := server.NewServer(opts)
5
6//...
7
8// Pripojenie k serveru prostredníctvom in-process pripojenia
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Keď klient NATS uskutočňuje pripojenie, ak sa do poľa
InProcessServer
odovzdánats.InProcessServer(ns)
,
1// nats-go/nats.go
2
3// InProcessServer je možnosť, ktorá sa pokúsi nadviazať priame spojenie so serverom NATS
4// bežiacim v rámci procesu namiesto vytáčania cez TCP.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- InProcessServer v možnostiach je nahradený embedded NATS serverom.
1// nats-go/nats.go
2
3// createConn sa pripojí k serveru a obalí príslušné
4// bufio štruktúry. Urobí správnu vec, keď existuje
5// pripojenie.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Ak máme referenciu na in-process server, potom nadviažte
15 // pripojenie pomocou neho.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("nepodarilo sa získať in-process pripojenie: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- Toto rozhranie, ak možnosť
InProcessServer
v rámci funkciecreateConn
(ktorá vytvára pripojenie) nie je nil (je platná), vykonáInProcesConn
zInProcessServer
v možnostiach a
1// nats-server/server/server.go
2
3// InProcessConn vracia in-process pripojenie k serveru,
4// čím sa vyhýba potrebe použiť TCP listener pre lokálne pripojenie
5// v rámci rovnakého procesu. Toto sa môže použiť bez ohľadu na
6// stav možnosti DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("nepodarilo sa vytvoriť pripojenie")
16 }
17 return pr, nil
18}
- Volá a vykonáva
InProcessConn
implementovanú na serveri. - Táto funkcia, ak
InProcessServer
v nc (nats connection) v klientovi Go nats (nats.go
) nie je nil, sa volá na vytvorenie pripojenia (net.Conn
) a jeho naviazanie na pripojenie servera.
Consumer driven interface v Go
Typ implementuje rozhranie implementovaním jeho metód. Neexistuje žiadne explicitné deklarovanie úmyslu, žiadne kľúčové slovo "implements". Implicitné rozhrania oddeľujú definíciu rozhrania od jeho implementácie, ktorá sa potom môže objaviť v akomkoľvek balíku bez predchádzajúcej dohody.
Interfaces are implemented implicitly, A Tour of Go
Ak typ existuje len na implementáciu rozhrania a nikdy nebude mať exportované metódy mimo tohto rozhrania, nie je potrebné exportovať samotný typ.
- Tento návrh rozhrania dobre zachytáva takzvané consumer defined interface a structural typing (duck typing) v Go, preto by som rád predstavil aj túto tému.
1// nats-go/nats.go
2
3// Options je možné použiť na vytvorenie prispôsobeného pripojenia.
4type Options struct {
5 // Url predstavuje jedinú URL servera NATS, ku ktorej sa bude klient
6 // pripájať. Ak je nastavená aj možnosť Servers, potom sa stane
7 // prvým serverom v poli Servers.
8 Url string
9
10 // InProcessServer predstavuje server NATS bežiaci v rámci
11 // rovnakého procesu. Ak je toto nastavené, pokúsime sa pripojiť
12 // k serveru priamo, namiesto použitia externých TCP pripojení.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Vráťme sa ku kódu. V klientovi
nats.go
je pole štruktúryInProcessServer
definované ako rozhranieInProcessConnProvider
, ktoré vykonáva ibaInProcessConn
.
1// nats-server/server/server.go
2
3// InProcessConn vracia in-process pripojenie k serveru,
4// čím sa vyhýba potrebe použiť TCP listener pre lokálne pripojenie
5// v rámci rovnakého procesu. Toto sa môže použiť bez ohľadu na
6// stav možnosti DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("nepodarilo sa vytvoriť pripojenie")
16 }
17 return pr, nil
18}
- Avšak typ, ktorý do nej vstupuje, je
Server
z nats-servera, ktorý okrem InProcessConn vykonáva aj rôzne iné funkcie. - Dôvodom je, že v tejto situácii je pre klienta dôležité len to, či bolo poskytnuté rozhranie
InProcessConn
, zatiaľ čo ostatné veci nie sú až tak podstatné. - Preto klient
nats.go
vytvoril a používa iba consumer defined interface nazvanéInProcessConnProvider
, ktoré definuje iba funkciuInProcessConn() (net.Conn, error)
.
Záver
- Stručne som sa zaoberal embedded režimom NATS, jeho fungovaním a Go consumer defined interface, ktoré možno pozorovať v kóde NATS.
- Dúfam, že tieto informácie budú užitočné pre tých, ktorí používajú NATS na uvedené účely, a týmto sa lúčim s týmto článkom.