Jak integrované NATS komunikují s go application?
Zahájení práce
O systému NATS
Softwarové aplikace a služby potřebují vyměňovat data. NATS představuje infrastrukturu umožňující takovou výměnu dat, která je segmentována ve formě zpráv. Toto nazýváme "message oriented middleware".
S využitím NATS mohou aplikační vývojáři:
- Bez námahy vytvářet distribuované a škálovatelné klientsko-serverové aplikace.
- Ukládat a distribuovat data v reálném čase obecným způsobem. Toho lze flexibilně dosáhnout napříč různými prostředími, jazyky, poskytovateli cloudu a on-premises systémy.
- NATS je zprostředkovatel zpráv implementovaný v jazyce Go.
Vložený NATS
Pokud je vaše aplikace napsána v jazyce Go, a pokud to odpovídá vašemu případu užití a scénářům nasazení, můžete dokonce vložit server NATS přímo do své aplikace.
- A co je na NATS pozoruhodné, je podpora embedded mode u aplikací napsaných v jazyce Go.
- To znamená, že namísto obvyklého přístupu u message brokerů, kde se spustí samostatný server broker a komunikace probíhá prostřednictvím klienta aplikace s tímto serverem, je možné samotný broker vložit (embed) do aplikace vytvořené v jazyce Go.
Výhody a případy užití vloženého NATS
- Existuje dobře vysvětlené video na Youtube, proto odkaz na video považujeme za postačující.
- Lze dosáhnout výhody implementace NATS v embedded režimu a zároveň zajistit separate of concern vytvořením modular monolith aplikace, aniž by bylo nutné nasadit samostatný server message brokeru. Navíc se stává možným i single binary deployment.
- Lze jej užitečně využít nejen na platformách bez sítě (wasm), ale také v offline-first aplikacích.
Příklad z oficiální dokumentace
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Inicializace nového serveru s možnostmi
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Spuštění serveru prostřednictvím goroutiny
22 go ns.Start()
23
24 // Čekání, než bude server připraven k připojení
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Připojení k serveru
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Přihlášení k odběru tématu (subject)
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Tisk dat zprávy
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Ukončení serveru (volitelné)
45 ns.Shutdown()
46 })
47
48 // Publikování dat do tématu
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Čekání na ukončení serveru
52 ns.WaitForShutdown()
53}
- Toto je příklad Embedded NATS odkazovaný v oficiální dokumentaci NATS, avšak při postupu podle daného příkladového kódu nedochází ke komunikaci v embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Pomocí příkazu
watch 'netstat -an | grep 127.0.0.1'
můžeme ověřit síťový provoz směřující na localhost(127.0.0.1) a při spuštění příslušného souboru Go pomocígo run .
je patrné, že přibývají nové síťové požadavky vycházející z defaultního portu NATS, tj.4222
.
Správné konfigurace pro embedding mode
Pro dosažení komunikace v zamýšleném embedded mode jsou zapotřebí následující dvě volby (options):
- Klient: Je nutné přidat volbu
InProcessServer
. - Server: Je nutné ve
Server.Options
explicitně nastavit příznakDontListen
na hodnotutrue
.
- Klient: Je nutné přidat volbu
Tyto aspekty nebyly oficiálně zdokumentovány a počátek této funkcionality lze identifikovat prostřednictvím příslušného PR.
Tento PR přidává tři věci:
- Funkci
InProcessConn()
doServer
, která vytvářínet.Pipe
pro získání spojení k serveru NATS bez použití TCP socketů. - Volbu
DontListen
, která sděluje serveru NATS, aby neposlouchal na obvyklém TCP naslouchacím mechanismu. - Kanál
startupComplete
, který je uzavřen těsně před zahájenímAcceptLoop
, areadyForConnections
na něj bude čekat.
Hlavní motivací je, že disponujeme aplikací, která může běžet buď v monolitickém (jednoprocesovém) režimu, nebo v polylitickém (víceprocesovém) režimu. Z důvodu jednoduchosti bychom rádi mohli NATS využívat pro oba režimy, ale monolitický režim musí být schopen vyhovět různým platformám, kde otevření socketových spojení buď nedává smysl (mobilní zařízení), nebo prostě není možné (WASM). Tyto změny nám umožní využívat NATS namísto toho zcela in-process.
Doprovodný PR nats-io/nats.go#774 přidává podporu na straně klienta.
Toto je můj první PR k tomuto projektu, proto se předem omlouvám, pokud jsem přehlédl cokoli zřejmého.
/cc @nats-io/core
- Funkci
Funkční příklad pro embedded mode
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // pro konfiguraci vloženého NATS serveru
14 // nastavte DontListen na true
15 DontListen: true,
16 }
17
18 // Inicializace nového serveru s možnostmi
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Spuštění serveru prostřednictvím goroutiny
26 go ns.Start()
27
28 // Čekání, než bude server připraven k připojení
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Připojení k serveru prostřednictvím in-process spojení
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Přihlášení k odběru tématu (subject)
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Tisk dat zprávy
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Ukončení serveru (volitelné)
49 ns.Shutdown()
50 })
51
52 // Publikování dat do tématu
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Čekání na ukončení serveru
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Nyní lze pozorovat, že k zamýšlenému jevu, tj. výskytu dodatečných síťových skoků (network hop), nedochází.
Pod kapotou (Under the hood)
Stručné shrnutí (TL;DR)
- Toto je sekvenční diagram znázorňující, jaké funkce se interně spouštějí a jakým způsobem fungují, když je daný kód spuštěn v souboru
main.go
; jeho hlavní myšlenka je následující:- Prostřednictvím nastavení
DontListen: true
server vynechá fázi naslouchání klientům, nazvanouAcceptLoop
. - Pokud je v rámci klientských voleb (Connect option) aktivována volba
InProcessServer
, vytvoří se in-memory spojení a pomocínet.Pipe
se vytvoří roura (pipe), jejíž jeden konec je poté vrácen klientovi ve formátu typunet.Conn
. - Klient a server následně provádějí in-process komunikaci prostřednictvím tohoto spojení.
- Prostřednictvím nastavení
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Čekání na klienty.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Pro informaci, funkce AcceptLoop provádí následující kroky. Jedná se o aspekty související se síťovou komunikací, jako jsou
TLS
čihostPort
, které jsou při in-process communication zbytečné, tudíž jejich vynechání je přípustné.
1// nats-server/server/server.go
2
3// AcceptLoop je exportován pro snazší testování.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Pokud bychom se ukončili před řádným nastavením naslouchacího mechanismu,
6 // zajistěte uzavření kanálu.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Pořiďte snímek možností serveru.
18 opts := s.getOpts()
19
20 // Nastavte stav, který umožní vypnutí
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Chyba při naslouchání na portu: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Naslouchání na klientská připojení na %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Upozornění na povolené TLS.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS požadováno pro klientská připojení")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Klienti, kteří nepoužívají volbu \"TLS Handshake First\", se nebudou moci připojit")
38 }
39 }
40
41 // Pokud byl server spuštěn s volbou RANDOM_PORT (-1), bude hodnota opts.Port
42 // na začátku této funkce rovna 0. Potřebujeme tedy získat skutečný port
43 if opts.Port == 0 {
44 // Zapište vyřešený port zpět do možností.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Nyní, když je port nastaven (pokud byl nastaven na RANDOM), nastavte
49 // Host/Port informací serveru buď s hodnotami z Options, nebo
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Chyba při nastavování server INFO s hodnotou ClientAdvertise %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Sledujte URL pro připojení klientů. Budou se nám později hodit.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signalizujte, že nepřijímáme nové klienty
65 s.ldmCh <- true
66 // Nyní počkejte na ukončení...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Dejte volajícímu vědět, že jsme připraveni
75 close(clr)
76 clr = nil
77}
- Pro informaci, funkce AcceptLoop provádí následující kroky. Jedná se o aspekty související se síťovou komunikací, jako jsou
TLS
čihostPort
, které jsou při in-process communication zbytečné, tudíž jejich vynechání je přípustné.
Klient
InProcessServer
1
2// nats-go/nats.go
3
4// Connect se pokusí připojit k systému NATS.
5// URL může obsahovat sémantiku uživatelského jména/hesla. Např. nats://derek:pass@localhost:4222
6// Jsou podporovány i pole oddělená čárkami, např. urlA, urlB.
7// Možnosti začínají výchozími hodnotami, ale mohou být přepsány.
8// Pro připojení k websocketovému portu serveru NATS použijte schéma `ws` nebo `wss`, například
9// `ws://localhost:8080`. Mějte na paměti, že schémata websocket nelze míchat s jinými (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options lze použít k vytvoření přizpůsobeného spojení.
4type Options struct {
5 // Url reprezentuje jednu URL serveru NATS, ke kterému se klient
6 // bude připojovat. Pokud je nastavena i volba Servers, stane se tato URL
7 // prvním serverem v poli Servers.
8 Url string
9
10 // InProcessServer reprezentuje server NATS běžící v rámci
11 // stejného procesu. Pokud je tato volba nastavena, pokusíme se připojit
12 // k serveru přímo namísto použití externích TCP spojení.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Nyní se vraťme ke kódu. V klientovi nats.go je pole structu volby
InProcessServer
definováno jako interfaceInProcessConnProvider
, který realizuje pouze funkciInProcessConn
.
1// nats-server/server/server.go
2
3// InProcessConn vrátí in-process spojení k serveru,
4// čímž se vyhne nutnosti používat TCP listener pro lokální konektivitu
5// v rámci stejného procesu. To lze použít bez ohledu na
6// stav volby DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("nepodařilo se vytvořit spojení")
16 }
17 return pr, nil
18}
- Avšak typ, který se do něj vkládá, je
Server
ze serveru NATS, který vykonává různé funkce kromě InProcessConn. - Důvodem je, že v dané situaci je zájmem klienta NATS pouze to, zda je poskytnuta funkce
InProcessConn()
definovaná interfaceem, nikoli ostatní aspekty. - Proto nats.go klient vytvořil a používá pouze consumer defined interface nazvaný
InProcessConnProvider
, který definuje pouze funkciInProcessConn() (net.Conn, error)
.
Consumer driven interface of Go
A type implements an interface by implementing its methods. There is no explicit declaration of intent, no "implements" keyword. Implicit interfaces decouple the definition of an interface from its implementation, which could then appear in any package without prearrangement.
Interfaces are implemented implicitly, A Tour of Go
If a type exists only to implement an interface and will never have exported methods beyond that interface, there is no need to export the type itself.
- Tento návrh interface je ukázkou toho, co je v jazyce Go běžně označováno jako consumer defined interface a structural typing (duck typing), proto je záměrem představit i tuto problematiku.
1// nats-go/nats.go
2
3// Options lze použít k vytvoření přizpůsobeného spojení.
4type Options struct {
5 // Url reprezentuje jednu URL serveru NATS, ke kterému se klient
6 // bude připojovat. Pokud je nastavena i volba Servers, stane se tato URL
7 // prvním serverem v poli Servers.
8 Url string
9
10 // InProcessServer reprezentuje server NATS běžící v rámci
11 // stejného procesu. Pokud je tato volba nastavena, pokusíme se připojit
12 // k serveru přímo namísto použití externích TCP spojení.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Vraťme se zpět ke kódu. V klientovi nats.go je pole structu volby
InProcessServer
definováno jako interfaceInProcessConnProvider
, který realizuje pouze funkciInProcessConn
.
1// nats-server/server/server.go
2
3// InProcessConn vrátí in-process spojení k serveru,
4// čímž se vyhne nutnosti používat TCP listener pro lokální konektivitu
5// v rámci stejného procesu. To lze použít bez ohledu na
6// stav volby DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("nepodařilo se vytvořit spojení")
16 }
17 return pr, nil
18}
- Avšak typ, který se do něj vkládá, je
Server
ze serveru NATS, který vykonává různé funkce kromě InProcessConn. - Důvodem je, že v dané situaci je zájmem klienta NATS pouze to, zda je poskytnuta funkce
InProcessConn()
definovaná interfaceem, nikoli ostatní aspekty. - Proto nats.go klient vytvořil a používá pouze consumer defined interface nazvaný
InProcessConnProvider
, který definuje pouze funkciInProcessConn() (net.Conn, error)
.
Závěr
- Stručně jsme se zabývali embedded mode systému NATS, jeho mechanismem fungování a ukázkou consumer defined interface v jazyce Go, kterou lze ověřit v kódu NATS.
- Doufáme, že tyto informace budou užitečné pro ty, kteří NATS využívají k výše uvedeným účelům, a tímto článek uzavíráme.