GoSuda

Jak integrované NATS komunikují s go application?

By prravda
views ...

Zahájení práce

O systému NATS

Softwarové aplikace a služby potřebují vyměňovat data. NATS představuje infrastrukturu umožňující takovou výměnu dat, která je segmentována ve formě zpráv. Toto nazýváme "message oriented middleware".

S využitím NATS mohou aplikační vývojáři:

  • Bez námahy vytvářet distribuované a škálovatelné klientsko-serverové aplikace.
  • Ukládat a distribuovat data v reálném čase obecným způsobem. Toho lze flexibilně dosáhnout napříč různými prostředími, jazyky, poskytovateli cloudu a on-premises systémy.

What is NATS, NATS docs

  • NATS je zprostředkovatel zpráv implementovaný v jazyce Go.

Vložený NATS

Pokud je vaše aplikace napsána v jazyce Go, a pokud to odpovídá vašemu případu užití a scénářům nasazení, můžete dokonce vložit server NATS přímo do své aplikace.

Embedding NATS, NATS docs

  • A co je na NATS pozoruhodné, je podpora embedded mode u aplikací napsaných v jazyce Go.
  • To znamená, že namísto obvyklého přístupu u message brokerů, kde se spustí samostatný server broker a komunikace probíhá prostřednictvím klienta aplikace s tímto serverem, je možné samotný broker vložit (embed) do aplikace vytvořené v jazyce Go.

Výhody a případy užití vloženého NATS

  • Existuje dobře vysvětlené video na Youtube, proto odkaz na video považujeme za postačující.
  • Lze dosáhnout výhody implementace NATS v embedded režimu a zároveň zajistit separate of concern vytvořením modular monolith aplikace, aniž by bylo nutné nasadit samostatný server message brokeru. Navíc se stává možným i single binary deployment.
  • Lze jej užitečně využít nejen na platformách bez sítě (wasm), ale také v offline-first aplikacích.

Příklad z oficiální dokumentace

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Inicializace nového serveru s možnostmi
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Spuštění serveru prostřednictvím goroutiny
22    go ns.Start()
23
24    // Čekání, než bude server připraven k připojení
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Připojení k serveru
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Přihlášení k odběru tématu (subject)
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Tisk dat zprávy
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Ukončení serveru (volitelné)
45        ns.Shutdown()
46    })
47
48    // Publikování dat do tématu
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Čekání na ukončení serveru
52    ns.WaitForShutdown()
53}
  • Toto je příklad Embedded NATS odkazovaný v oficiální dokumentaci NATS, avšak při postupu podle daného příkladového kódu nedochází ke komunikaci v embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Pomocí příkazu watch 'netstat -an | grep 127.0.0.1' můžeme ověřit síťový provoz směřující na localhost(127.0.0.1) a při spuštění příslušného souboru Go pomocí go run . je patrné, že přibývají nové síťové požadavky vycházející z defaultního portu NATS, tj. 4222.

Správné konfigurace pro embedding mode

  • Pro dosažení komunikace v zamýšleném embedded mode jsou zapotřebí následující dvě volby (options):

    • Klient: Je nutné přidat volbu InProcessServer.
    • Server: Je nutné ve Server.Options explicitně nastavit příznak DontListen na hodnotu true.
  • Tyto aspekty nebyly oficiálně zdokumentovány a počátek této funkcionality lze identifikovat prostřednictvím příslušného PR.

    Tento PR přidává tři věci:

    1. Funkci InProcessConn() do Server, která vytváří net.Pipe pro získání spojení k serveru NATS bez použití TCP socketů.
    2. Volbu DontListen, která sděluje serveru NATS, aby neposlouchal na obvyklém TCP naslouchacím mechanismu.
    3. Kanál startupComplete, který je uzavřen těsně před zahájením AcceptLoop, a readyForConnections na něj bude čekat.

    Hlavní motivací je, že disponujeme aplikací, která může běžet buď v monolitickém (jednoprocesovém) režimu, nebo v polylitickém (víceprocesovém) režimu. Z důvodu jednoduchosti bychom rádi mohli NATS využívat pro oba režimy, ale monolitický režim musí být schopen vyhovět různým platformám, kde otevření socketových spojení buď nedává smysl (mobilní zařízení), nebo prostě není možné (WASM). Tyto změny nám umožní využívat NATS namísto toho zcela in-process.

    Doprovodný PR nats-io/nats.go#774 přidává podporu na straně klienta.

    Toto je můj první PR k tomuto projektu, proto se předem omlouvám, pokud jsem přehlédl cokoli zřejmého.

    /cc @nats-io/core

Funkční příklad pro embedded mode

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// pro konfiguraci vloženého NATS serveru
14		// nastavte DontListen na true
15		DontListen: true,
16	}
17
18	// Inicializace nového serveru s možnostmi
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Spuštění serveru prostřednictvím goroutiny
26	go ns.Start()
27
28	// Čekání, než bude server připraven k připojení
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Připojení k serveru prostřednictvím in-process spojení
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Přihlášení k odběru tématu (subject)
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Tisk dat zprávy
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Ukončení serveru (volitelné)
49		ns.Shutdown()
50	})
51
52	// Publikování dat do tématu
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Čekání na ukončení serveru
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Nyní lze pozorovat, že k zamýšlenému jevu, tj. výskytu dodatečných síťových skoků (network hop), nedochází.

Pod kapotou (Under the hood)

Stručné shrnutí (TL;DR)

diagram1

  • Toto je sekvenční diagram znázorňující, jaké funkce se interně spouštějí a jakým způsobem fungují, když je daný kód spuštěn v souboru main.go; jeho hlavní myšlenka je následující:
    • Prostřednictvím nastavení DontListen: true server vynechá fázi naslouchání klientům, nazvanou AcceptLoop.
    • Pokud je v rámci klientských voleb (Connect option) aktivována volba InProcessServer, vytvoří se in-memory spojení a pomocí net.Pipe se vytvoří roura (pipe), jejíž jeden konec je poté vrácen klientovi ve formátu typu net.Conn.
    • Klient a server následně provádějí in-process komunikaci prostřednictvím tohoto spojení.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Čekání na klienty.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Pro informaci, funkce AcceptLoop provádí následující kroky. Jedná se o aspekty související se síťovou komunikací, jako jsou TLS či hostPort, které jsou při in-process communication zbytečné, tudíž jejich vynechání je přípustné.
 1// nats-server/server/server.go
 2
 3// AcceptLoop je exportován pro snazší testování.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Pokud bychom se ukončili před řádným nastavením naslouchacího mechanismu,
 6	// zajistěte uzavření kanálu.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Pořiďte snímek možností serveru.
18	opts := s.getOpts()
19
20	// Nastavte stav, který umožní vypnutí
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Chyba při naslouchání na portu: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Naslouchání na klientská připojení na %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Upozornění na povolené TLS.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS požadováno pro klientská připojení")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Klienti, kteří nepoužívají volbu \"TLS Handshake First\", se nebudou moci připojit")
38		}
39	}
40
41	// Pokud byl server spuštěn s volbou RANDOM_PORT (-1), bude hodnota opts.Port
42	// na začátku této funkce rovna 0. Potřebujeme tedy získat skutečný port
43	if opts.Port == 0 {
44		// Zapište vyřešený port zpět do možností.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Nyní, když je port nastaven (pokud byl nastaven na RANDOM), nastavte
49	// Host/Port informací serveru buď s hodnotami z Options, nebo
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Chyba při nastavování server INFO s hodnotou ClientAdvertise %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Sledujte URL pro připojení klientů. Budou se nám později hodit.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signalizujte, že nepřijímáme nové klienty
65				s.ldmCh <- true
66				// Nyní počkejte na ukončení...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Dejte volajícímu vědět, že jsme připraveni
75	close(clr)
76	clr = nil
77}
  • Pro informaci, funkce AcceptLoop provádí následující kroky. Jedná se o aspekty související se síťovou komunikací, jako jsou TLS či hostPort, které jsou při in-process communication zbytečné, tudíž jejich vynechání je přípustné.

Klient

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect se pokusí připojit k systému NATS.
 5// URL může obsahovat sémantiku uživatelského jména/hesla. Např. nats://derek:pass@localhost:4222
 6// Jsou podporovány i pole oddělená čárkami, např. urlA, urlB.
 7// Možnosti začínají výchozími hodnotami, ale mohou být přepsány.
 8// Pro připojení k websocketovému portu serveru NATS použijte schéma `ws` nebo `wss`, například
 9// `ws://localhost:8080`. Mějte na paměti, že schémata websocket nelze míchat s jinými (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options lze použít k vytvoření přizpůsobeného spojení.
 4type Options struct {
 5	// Url reprezentuje jednu URL serveru NATS, ke kterému se klient
 6	// bude připojovat. Pokud je nastavena i volba Servers, stane se tato URL
 7	// prvním serverem v poli Servers.
 8	Url string
 9
10	// InProcessServer reprezentuje server NATS běžící v rámci
11	// stejného procesu. Pokud je tato volba nastavena, pokusíme se připojit
12	// k serveru přímo namísto použití externích TCP spojení.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Nyní se vraťme ke kódu. V klientovi nats.go je pole structu volby InProcessServer definováno jako interface InProcessConnProvider, který realizuje pouze funkci InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn vrátí in-process spojení k serveru,
 4// čímž se vyhne nutnosti používat TCP listener pro lokální konektivitu
 5// v rámci stejného procesu. To lze použít bez ohledu na
 6// stav volby DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("nepodařilo se vytvořit spojení")
16	}
17	return pr, nil
18}
  • Avšak typ, který se do něj vkládá, je Server ze serveru NATS, který vykonává různé funkce kromě InProcessConn.
  • Důvodem je, že v dané situaci je zájmem klienta NATS pouze to, zda je poskytnuta funkce InProcessConn() definovaná interfaceem, nikoli ostatní aspekty.
  • Proto nats.go klient vytvořil a používá pouze consumer defined interface nazvaný InProcessConnProvider, který definuje pouze funkci InProcessConn() (net.Conn, error).

Consumer driven interface of Go

A type implements an interface by implementing its methods. There is no explicit declaration of intent, no "implements" keyword. Implicit interfaces decouple the definition of an interface from its implementation, which could then appear in any package without prearrangement.

Interfaces are implemented implicitly, A Tour of Go

If a type exists only to implement an interface and will never have exported methods beyond that interface, there is no need to export the type itself.

Generality, Effective Go

  • Tento návrh interface je ukázkou toho, co je v jazyce Go běžně označováno jako consumer defined interface a structural typing (duck typing), proto je záměrem představit i tuto problematiku.
 1// nats-go/nats.go
 2
 3// Options lze použít k vytvoření přizpůsobeného spojení.
 4type Options struct {
 5	// Url reprezentuje jednu URL serveru NATS, ke kterému se klient
 6	// bude připojovat. Pokud je nastavena i volba Servers, stane se tato URL
 7	// prvním serverem v poli Servers.
 8	Url string
 9
10	// InProcessServer reprezentuje server NATS běžící v rámci
11	// stejného procesu. Pokud je tato volba nastavena, pokusíme se připojit
12	// k serveru přímo namísto použití externích TCP spojení.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Vraťme se zpět ke kódu. V klientovi nats.go je pole structu volby InProcessServer definováno jako interface InProcessConnProvider, který realizuje pouze funkci InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn vrátí in-process spojení k serveru,
 4// čímž se vyhne nutnosti používat TCP listener pro lokální konektivitu
 5// v rámci stejného procesu. To lze použít bez ohledu na
 6// stav volby DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("nepodařilo se vytvořit spojení")
16	}
17	return pr, nil
18}
  • Avšak typ, který se do něj vkládá, je Server ze serveru NATS, který vykonává různé funkce kromě InProcessConn.
  • Důvodem je, že v dané situaci je zájmem klienta NATS pouze to, zda je poskytnuta funkce InProcessConn() definovaná interfaceem, nikoli ostatní aspekty.
  • Proto nats.go klient vytvořil a používá pouze consumer defined interface nazvaný InProcessConnProvider, který definuje pouze funkci InProcessConn() (net.Conn, error).

Závěr

  • Stručně jsme se zabývali embedded mode systému NATS, jeho mechanismem fungování a ukázkou consumer defined interface v jazyce Go, kterou lze ověřit v kódu NATS.
  • Doufáme, že tyto informace budou užitečné pro ty, kteří NATS využívají k výše uvedeným účelům, a tímto článek uzavíráme.