GoSuda

Hogyan kommunikálnak a beágyazott NATS-ok a Go alkalmazással?

By prravda
views ...

Első lépések

A NATS-ról

A szoftveralkalmazásoknak és szolgáltatásoknak adatot kell cserélniük. A NATS egy olyan infrastruktúra, amely lehetővé teszi az ilyen adatcserét, üzenetek formájában szegmentálva. Ezt „üzenetorientált middleware”-nek nevezzük.

A NATS segítségével az alkalmazásfejlesztők a következőket tehetik:

  • Könnyedén építhetnek elosztott és skálázható kliens-szerver alkalmazásokat.
  • Általános módon tárolhatnak és terjeszthetnek adatokat valós időben. Ez rugalmasan megvalósítható különböző környezetekben, nyelveken, felhőszolgáltatóknál és helyben telepített rendszereken.

Mi a NATS, NATS dokumentáció

  • A NATS egy Go nyelven megírt üzenetbróker.

Beágyazott NATS

Ha az alkalmazása Go nyelven íródott, és illeszkedik a felhasználási esetéhez és a telepítési forgatókönyveihez, akár be is ágyazhat egy NATS szervert az alkalmazásába.

NATS beágyazása, NATS dokumentáció

  • A NATS-nak van egy különleges tulajdonsága: támogatja az embedded mode-ot a Go nyelven megírt alkalmazásokhoz.
  • Ez azt jelenti, hogy a brókert magát be lehet ágyazni (embed) egy Go nyelven írt alkalmazásba, ahelyett, hogy a bróker szerver külön futna, és az alkalmazás kliensekkel kommunikálna vele, ami az üzenetbrókerek szokásos működési módja.

A beágyazott NATS előnyei és felhasználási esetei

  • Van egy jól magyarázó Youtube videó, ezért a videó linkjével helyettesítem.
  • Még akkor is, ha nincs külön üzenetbróker szerver telepítve, létrehozhatunk egy modular monolith application-t, amely lehetővé teszi a concern-ek szétválasztását, miközben kihasználja a NATS beágyazásának előnyeit. Ezenkívül lehetővé válik a single binary deployment is.
  • Nemcsak platform with no network (WASM) környezetben, hanem offline-first application-ökben is hasznosan alkalmazható.

Példa a hivatalos dokumentációból

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialize new server with options
15    // Új szerver inicializálása opciókkal
16    ns, err := server.NewServer(opts)
17
18    if err != nil {
19        panic(err)
20    }
21
22    // Start the server via goroutine
23    // A szerver indítása goroutine-on keresztül
24    go ns.Start()
25
26    // Wait for server to be ready for connections
27    // Várja meg, amíg a szerver készen áll a kapcsolatokra
28    if !ns.ReadyForConnections(4 * time.Second) {
29        panic("not ready for connection")
30    }
31
32    // Connect to server
33    // Csatlakozás a szerverhez
34    nc, err := nats.Connect(ns.ClientURL())
35
36    if err != nil {
37        panic(err)
38    }
39
40    subject := "my-subject"
41
42    // Subscribe to the subject
43    // Feliratkozás a subject-re
44    nc.Subscribe(subject, func(msg *nats.Msg) {
45        // Print message data
46        // Üzenetadatok kiírása
47        data := string(msg.Data)
48        fmt.Println(data)
49
50        // Shutdown the server (optional)
51        // A szerver leállítása (opcionális)
52        ns.Shutdown()
53    })
54
55    // Publish data to the subject
56    // Adatok publikálása a subject-re
57    nc.Publish(subject, []byte("Hello embedded NATS!"))
58
59    // Wait for server shutdown
60    // Várja meg a szerver leállását
61    ns.WaitForShutdown()
62}
  • Ez a NATS hivatalos dokumentációjában található példa a beágyazott NATS-ra, de ha ezt a példakódot futtatjuk, a kommunikáció nem embedding mode-ban történik.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • A watch 'netstat -an | grep 127.0.0.1' paranccsal ellenőrizhetjük a localhost (127.0.0.1) felé irányuló hálózati forgalmat, és ha a go run . paranccsal futtatjuk a Go fájlt, láthatjuk, hogy új hálózati kérések adódnak hozzá a NATS alapértelmezett portjáról, a 4222-ről.

Helyes konfigurációk embedding mode-hoz

  • Ahhoz, hogy a kommunikáció a szándékozott embedded mode-ban történjen, a következő két opcióra van szükség:

    • Kliens: InProcessServer opciót kell hozzáadni.
    • Szerver: A Server.Options mezőben a DontListen flag-et true értékre kell állítani.
  • Ezek a részek nem voltak hivatalosan dokumentálva, és a funkció kezdetét az alábbi PR segítségével lehet azonosítani.

    This PR adds three things:

    1. InProcessConn() function to Server which builds a net.Pipe to get a connection to the NATS server without using TCP sockets
    2. DontListen option which tells the NATS server not to listen on the usual TCP listener
    3. startupComplete channel, which is closed right before we start AcceptLoop, and readyForConnections will wait for it

    The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.

    An accompanying PR nats-io/nats.go#774 adds support to the client side.

    This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.

    /cc @nats-io/core

Működő példa embedded mode-ra

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// for configuring the embeded NATS server
14		// a beágyazott NATS szerver konfigurálásához
15		// set DonListen as true
16		// állítsa a DontListen-t igazra
17		DontListen: true,
18	}
19
20	// Initialize new server with options
21	// Új szerver inicializálása opciókkal
22	ns, err := server.NewServer(opts)
23
24	if err != nil {
25		panic(err)
26	}
27
28	// Start the server via goroutine
29	// A szerver indítása goroutine-on keresztül
30	go ns.Start()
31
32	// Wait for server to be ready for connections
33	// Várja meg, amíg a szerver készen áll a kapcsolatokra
34	if !ns.ReadyForConnections(10 * time.Second) {
35		panic("not ready for connection")
36	}
37
38	// Connect to server via in-process connection
39	// Csatlakozás a szerverhez in-process kapcsolaton keresztül
40	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
41
42	if err != nil {
43		panic(err)
44	}
45
46	subject := "my-subject"
47
48	// Subscribe to the subject
49	// Feliratkozás a subject-re
50	nc.Subscribe(subject, func(msg *nats.Msg) {
51		// Print message data
52		// Üzenetadatok kiírása
53		data := string(msg.Data)
54		fmt.Println(data)
55
56		// Shutdown the server (optional)
57		// A szerver leállítása (opcionális)
58		ns.Shutdown()
59	})
60
61	// Publish data to the subject
62	// Adatok publikálása a subject-re
63	nc.Publish(subject, []byte("Hello embedded NATS!"))
64
65	// Wait for server shutdown
66	// Várja meg a szerver leállását
67	ns.WaitForShutdown()
68}
69
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Most látható, hogy nem keletkezik további network hop, ahogyan az szándékozott volt.

A háttérben

TL;DR

diagram1

  • Ez egy sequence diagram, amely bemutatja, hogyan működnek belsőleg a függvények, amikor ezt a kódot a main.go fájlban futtatjuk, és a lényeg a következő:
    • A DontListen: true opcióval a szerver kihagyja az AcceptLoop nevű kliens figyelési fázist.
    • Ha a kliens Connect opciói közül az InProcessServer aktiválva van, akkor in-memory connection-t hoz létre, net.Pipe segítségével pipe-ot épít, majd a pipe végét net.Conn típusban visszaadja a kliensnek.
    • A kliens és a szerver ezen a connection-ön keresztül in-process communication-t végez.

Szerver

AcceptLoop

1// nats-server/server/server.go
2
3// Wait for clients.
4// Várja a klienseket.
5if !opts.DontListen {
6	s.AcceptLoop(clientListenReady)
7}
  • Először is, ha a DontListen értéke true, akkor az AcceptLoop nevű kliens figyelési fázis kimarad.
 1// nats-server/server/server.go
 2
 3// AcceptLoop is exported for easier testing.
 4// Az AcceptLoop exportálva van a könnyebb tesztelés érdekében.
 5func (s *Server) AcceptLoop(clr chan struct{}) {
 6	// If we were to exit before the listener is setup properly,
 7	// make sure we close the channel.
 8	// Ha kilépnénk, mielőtt a listener megfelelően be lenne állítva,
 9	// győződjön meg róla, hogy bezárja a csatornát.
10	defer func() {
11		if clr != nil {
12			close(clr)
13		}
14	}()
15
16	if s.isShuttingDown() {
17		return
18	}
19
20	// Snapshot server options.
21	// Szerver opciók pillanatképe.
22	opts := s.getOpts()
23
24	// Setup state that can enable shutdown
25	// Leállítást lehetővé tevő állapot beállítása
26	s.mu.Lock()
27	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
28	l, e := natsListen("tcp", hp)
29	s.listenerErr = e
30	if e != nil {
31		s.mu.Unlock()
32		s.Fatalf("Error listening on port: %s, %q", hp, e)
33		return
34	}
35	s.Noticef("Listening for client connections on %s",
36		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
37
38	// Alert of TLS enabled.
39	// TLS engedélyezésének jelzése.
40	if opts.TLSConfig != nil {
41		s.Noticef("TLS required for client connections")
42		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
43			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
44		}
45	}
46
47	// If server was started with RANDOM_PORT (-1), opts.Port would be equal
48	// to 0 at the beginning this function. So we need to get the actual port
49	// Ha a szerver RANDOM_PORT (-1) opcióval indult, az opts.Port értéke 0 lenne
50	// a függvény elején. Ezért meg kell szerezni az aktuális portot.
51	if opts.Port == 0 {
52		// Write resolved port back to options.
53		// Írja vissza a feloldott portot az opciókba.
54		opts.Port = l.Addr().(*net.TCPAddr).Port
55	}
56
57	// Now that port has been set (if it was set to RANDOM), set the
58	// server's info Host/Port with either values from Options or
59	// ClientAdvertise.
60	// Most, hogy a port beállítva van (ha RANDOM-ra volt állítva), állítsa be a
61	// szerver info Host/Port értékét az Options vagy a ClientAdvertise értékeivel.
62	if err := s.setInfoHostPort(); err != nil {
63		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
64		l.Close()
65		s.mu.Unlock()
66		return
67	}
68	// Keep track of client connect URLs. We may need them later.
69	// Tartsa nyilván a kliens csatlakozási URL-eket. Később szükségünk lehet rájuk.
70	s.clientConnectURLs = s.getClientConnectURLs()
71	s.listener = l
72
73	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
74		func(_ error) bool {
75			if s.isLameDuckMode() {
76				// Signal that we are not accepting new clients
77				// Jelzi, hogy nem fogadunk új klienseket
78				s.ldmCh <- true
79				// Now wait for the Shutdown...
80				// Most várja meg a leállítást...
81				<-s.quitCh
82				return true
83			}
84			return false
85		})
86	s.mu.Unlock()
87
88	// Let the caller know that we are ready
89	// Értesítse a hívót, hogy készen állunk
90	close(clr)
91	clr = nil
92}
  • Az AcceptLoop függvény a következő folyamatokat hajtja végre. Ezek a részek, mint a TLS vagy a hostPort, a hálózati kommunikációval kapcsolatosak, és elhagyhatók, ha in-process communication-t alkalmazunk, mivel akkor nincs rájuk szükség.

Kliens

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect will attempt to connect to the NATS system.
 5// A Connect megpróbál csatlakozni a NATS rendszerhez.
 6// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
 7// Az URL tartalmazhat felhasználónév/jelszó szemantikát. pl. nats://derek:pass@localhost:4222
 8// Comma separated arrays are also supported, e.g. urlA, urlB.
 9// Vesszővel elválasztott tömbök is támogatottak, pl. urlA, urlB.
10// Options start with the defaults but can be overridden.
11// Az opciók az alapértelmezettekkel kezdődnek, de felülírhatók.
12// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
13// A NATS Server websocket portjához való csatlakozáshoz használja a `ws` vagy `wss` sémát, például
14// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
15// `ws://localhost:8080`. Vegye figyelembe, hogy a websocket sémák nem keverhetők másokkal (nats/tls).
16func Connect(url string, options ...Option) (*Conn, error) {
17	opts := GetDefaultOptions()
18	opts.Servers = processUrlString(url)
19	for _, opt := range options {
20		if opt != nil {
21			if err := opt(&opts); err != nil {
22				return nil, err
23			}
24		}
25	}
26	return opts.Connect()
27}
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4// Az Options használható egy testreszabott kapcsolat létrehozására.
 5type Options struct {
 6	// Url represents a single NATS server url to which the client
 7	// Az Url egyetlen NATS szerver URL-t jelöl, amelyhez a kliens
 8	// will be connecting. If the Servers option is also set, it
 9	// csatlakozni fog. Ha a Servers opció is be van állítva, akkor az
10	// then becomes the first server in the Servers array.
11	// lesz az első szerver a Servers tömbben.
12	Url string
13
14	// InProcessServer represents a NATS server running within the
15	// Az InProcessServer egy NATS szervert jelöl, amely ugyanabban a
16	// same process. If this is set then we will attempt to connect
17	// folyamatban fut. Ha ez be van állítva, akkor megpróbálunk csatlakozni
18	// to the server directly rather than using external TCP conns.
19	// közvetlenül a szerverhez, külső TCP kapcsolatok használata nélkül.
20	InProcessServer InProcessConnProvider
21	
22	//...
23}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • A Connect függvény, amely a NATS szerver és a NATS kliens közötti kapcsolatot hozza létre, beállíthatja a kliens URL-t és a connect Option-t, és az ezen opciókat tartalmazó Options struct-ban található egy InProcessServer nevű mező, amely InProcessConnProvider interface típusú.
 1// main.go of example code
 2
 3// Initialize new server with options
 4// Új szerver inicializálása opciókkal
 5ns, err := server.NewServer(opts)
 6
 7//...
 8
 9// Connect to server via in-process connection
10// Csatlakozás a szerverhez in-process kapcsolaton keresztül
11nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Amikor a nats kliens Connect hívást kezdeményez, és a nats.InProcessServer(ns) értéket adja át az InProcessServer mezőnek, akkor
 1// nats-go/nats.go
 2
 3// InProcessServer is an Option that will try to establish a direction to a NATS server
 4// Az InProcessServer egy opció, amely megpróbálja irányt létesíteni egy NATS szerverhez
 5// running within the process instead of dialing via TCP.
 6// amely a folyamaton belül fut, a TCP-n keresztüli tárcsázás helyett.
 7func InProcessServer(server InProcessConnProvider) Option {
 8	return func(o *Options) error {
 9		o.InProcessServer = server
10		return nil
11	}
12}
  • az opció InProcessServer része beágyazott NATS szerverrel helyettesítődik, és
 1// nats-go/nats.go
 2
 3// createConn will connect to the server and wrap the appropriate
 4// A createConn csatlakozik a szerverhez, és becsomagolja a megfelelő
 5// bufio structures. It will do the right thing when an existing
 6// bufio struktúrákat. Helyesen fog működni, ha már létező
 7// connection is in place.
 8// kapcsolat van érvényben.
 9func (nc *Conn) createConn() (err error) {
10	if nc.Opts.Timeout < 0 {
11		return ErrBadTimeout
12	}
13	if _, cur := nc.currentServer(); cur == nil {
14		return ErrNoServers
15	}
16
17	// If we have a reference to an in-process server then establish a
18	// Ha van hivatkozásunk egy in-process szerverre, akkor létesítsünk
19	// connection using that.
20	// kapcsolatot azzal.
21	if nc.Opts.InProcessServer != nil {
22		conn, err := nc.Opts.InProcessServer.InProcessConn()
23		if err != nil {
24			return fmt.Errorf("failed to get in-process connection: %w", err)
25		}
26		nc.conn = conn
27		nc.bindToNewConn()
28		return nil
29	}
30	
31	//...
32}
  • ez az interface a kapcsolatot létrehozó createConn függvényben, ha az InProcessServer opció nem nil (érvényes), akkor végrehajtja az opcióban található InProcessConn metódust, és
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// Az InProcessConn egy in-process kapcsolatot ad vissza a szerverhez,
 5// avoiding the need to use a TCP listener for local connectivity
 6// elkerülve a TCP listener használatának szükségességét a helyi kapcsolódáshoz
 7// within the same process. This can be used regardless of the
 8// ugyanazon folyamaton belül. Ez használható függetlenül a
 9// state of the DontListen option.
10// DontListen opció állapotától.
11func (s *Server) InProcessConn() (net.Conn, error) {
12	pl, pr := net.Pipe()
13	if !s.startGoRoutine(func() {
14		s.createClientInProcess(pl)
15		s.grWG.Done()
16	}) {
17		pl.Close()
18		pr.Close()
19		return nil, fmt.Errorf("failed to create connection")
20	}
21	return pr, nil
22}
  • meghívja a szerverben implementált InProcessConn függvényt és végrehajtja azt.
  • Ez a függvény a NATS Go kliensében, a nats.go-ban, akkor hívódik meg, ha az nc (nats connection) InProcessServer mezője nem nil, létrehozza a connection-t (net.Conn), majd azt a szerver connection-jéhez köti.

Go consumer driven interface-e

A típus az interface-t metódusainak implementálásával valósítja meg. Nincs explicit szándéknyilatkozat, nincs „implements” kulcsszó. Az implicit interface-ek szétválasztják egy interface definícióját az implementációjától, amely aztán bármely csomagban megjelenhet előzetes egyeztetés nélkül.

Az interface-ek implicit módon vannak implementálva, A Tour of Go

Ha egy típus csak egy interface implementálására létezik, és azon túl soha nem lesz exportált metódusa, akkor nincs szükség magának a típusnak az exportálására.

Általánosság, Effective Go

  • Ez az interface design jól tükrözi a Go-ban gyakran emlegetett consumer defined interface-t és a structural typing-ot (duck typing), ezért szeretném röviden bemutatni ezt a témát is.
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4// Az Options használható egy testreszabott kapcsolat létrehozására.
 5type Options struct {
 6	// Url represents a single NATS server url to which the client
 7	// Az Url egyetlen NATS szerver URL-t jelöl, amelyhez a kliens
 8	// will be connecting. If the Servers option is also set, it
 9	// csatlakozni fog. Ha a Servers opció is be van állítva, akkor az
10	// then becomes the first server in the Servers array.
11	// lesz az első szerver a Servers tömbben.
12	Url string
13
14	// InProcessServer represents a NATS server running within the
15	// Az InProcessServer egy NATS szervert jelöl, amely ugyanabban a
16	// same process. If this is set then we will attempt to connect
17	// folyamatban fut. Ha ez be van állítva, akkor megpróbálunk csatlakozni
18	// to the server directly rather than using external TCP conns.
19	// közvetlenül a szerverhez, külső TCP kapcsolatok használata nélkül.
20	InProcessServer InProcessConnProvider
21	
22	//...
23}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Térjünk vissza a kódra. A nats.go kliensben az InProcessServer opció struct mezője InProcessConnProvider interface-ként van definiálva, amely csak az InProcessConn metódust hajtja végre.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// Az InProcessConn egy in-process kapcsolatot ad vissza a szerverhez,
 5// avoiding the need to use a TCP listener for local connectivity
 6// elkerülve a TCP listener használatának szükségességét a helyi kapcsolódáshoz
 7// within the same process. This can be used regardless of the
 8// ugyanazon folyamaton belül. Ez használható függetlenül a
 9// state of the DontListen option.
10// DontListen opció állapotától.
11func (s *Server) InProcessConn() (net.Conn, error) {
12	pl, pr := net.Pipe()
13	if !s.startGoRoutine(func() {
14		s.createClientInProcess(pl)
15		s.grWG.Done()
16	}) {
17		pl.Close()
18		pr.Close()
19		return nil, fmt.Errorf("failed to create connection")
20	}
21	return pr, nil
22}
  • Azonban a bemeneti típus a nats-server Server típusa, amely az InProcessConn mellett számos más funkciót is ellát.
  • Ez azért van így, mert ebben a helyzetben a kliens számára csak az a fontos, hogy az InProcessConn interface biztosított-e vagy sem, minden más irreleváns.
  • Ezért a nats.go kliens csak egy InProcessConnProvider nevű consumer defined interface-t hozott létre és használ, amely kizárólag az InProcessConn() (net.Conn, error) funkciót definiálja.

Összefoglalás

  • Röviden tárgyaltuk a NATS embedded mode-ját és működését, valamint a Go consumer defined interface-ét, amelyet a NATS kódjában is megfigyelhetünk.
  • Reméljük, hogy ez az információ hasznos lesz azok számára, akik a NATS-ot a fent említett célokra használják, és ezzel zárjuk ezt a cikket.