Hogyan kommunikálnak a beágyazott NATS-ok a Go alkalmazással?
Első lépések
A NATS-ról
A szoftveralkalmazásoknak és szolgáltatásoknak adatot kell cserélniük. A NATS egy olyan infrastruktúra, amely lehetővé teszi az ilyen adatcserét, üzenetek formájában szegmentálva. Ezt „üzenetorientált middleware”-nek nevezzük.
A NATS segítségével az alkalmazásfejlesztők a következőket tehetik:
- Könnyedén építhetnek elosztott és skálázható kliens-szerver alkalmazásokat.
- Általános módon tárolhatnak és terjeszthetnek adatokat valós időben. Ez rugalmasan megvalósítható különböző környezetekben, nyelveken, felhőszolgáltatóknál és helyben telepített rendszereken.
- A NATS egy Go nyelven megírt üzenetbróker.
Beágyazott NATS
Ha az alkalmazása Go nyelven íródott, és illeszkedik a felhasználási esetéhez és a telepítési forgatókönyveihez, akár be is ágyazhat egy NATS szervert az alkalmazásába.
NATS beágyazása, NATS dokumentáció
- A NATS-nak van egy különleges tulajdonsága: támogatja az embedded mode-ot a Go nyelven megírt alkalmazásokhoz.
- Ez azt jelenti, hogy a brókert magát be lehet ágyazni (embed) egy Go nyelven írt alkalmazásba, ahelyett, hogy a bróker szerver külön futna, és az alkalmazás kliensekkel kommunikálna vele, ami az üzenetbrókerek szokásos működési módja.
A beágyazott NATS előnyei és felhasználási esetei
- Van egy jól magyarázó Youtube videó, ezért a videó linkjével helyettesítem.
- Még akkor is, ha nincs külön üzenetbróker szerver telepítve, létrehozhatunk egy modular monolith application-t, amely lehetővé teszi a concern-ek szétválasztását, miközben kihasználja a NATS beágyazásának előnyeit. Ezenkívül lehetővé válik a single binary deployment is.
- Nemcsak platform with no network (WASM) környezetben, hanem offline-first application-ökben is hasznosan alkalmazható.
Példa a hivatalos dokumentációból
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialize new server with options
15 // Új szerver inicializálása opciókkal
16 ns, err := server.NewServer(opts)
17
18 if err != nil {
19 panic(err)
20 }
21
22 // Start the server via goroutine
23 // A szerver indítása goroutine-on keresztül
24 go ns.Start()
25
26 // Wait for server to be ready for connections
27 // Várja meg, amíg a szerver készen áll a kapcsolatokra
28 if !ns.ReadyForConnections(4 * time.Second) {
29 panic("not ready for connection")
30 }
31
32 // Connect to server
33 // Csatlakozás a szerverhez
34 nc, err := nats.Connect(ns.ClientURL())
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Subscribe to the subject
43 // Feliratkozás a subject-re
44 nc.Subscribe(subject, func(msg *nats.Msg) {
45 // Print message data
46 // Üzenetadatok kiírása
47 data := string(msg.Data)
48 fmt.Println(data)
49
50 // Shutdown the server (optional)
51 // A szerver leállítása (opcionális)
52 ns.Shutdown()
53 })
54
55 // Publish data to the subject
56 // Adatok publikálása a subject-re
57 nc.Publish(subject, []byte("Hello embedded NATS!"))
58
59 // Wait for server shutdown
60 // Várja meg a szerver leállását
61 ns.WaitForShutdown()
62}
- Ez a NATS hivatalos dokumentációjában található példa a beágyazott NATS-ra, de ha ezt a példakódot futtatjuk, a kommunikáció nem embedding mode-ban történik.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- A
watch 'netstat -an | grep 127.0.0.1'
paranccsal ellenőrizhetjük a localhost (127.0.0.1) felé irányuló hálózati forgalmat, és ha ago run .
paranccsal futtatjuk a Go fájlt, láthatjuk, hogy új hálózati kérések adódnak hozzá a NATS alapértelmezett portjáról, a4222
-ről.
Helyes konfigurációk embedding mode-hoz
Ahhoz, hogy a kommunikáció a szándékozott embedded mode-ban történjen, a következő két opcióra van szükség:
- Kliens:
InProcessServer
opciót kell hozzáadni. - Szerver: A
Server.Options
mezőben aDontListen
flag-ettrue
értékre kell állítani.
- Kliens:
Ezek a részek nem voltak hivatalosan dokumentálva, és a funkció kezdetét az alábbi PR segítségével lehet azonosítani.
This PR adds three things:
InProcessConn()
function toServer
which builds anet.Pipe
to get a connection to the NATS server without using TCP socketsDontListen
option which tells the NATS server not to listen on the usual TCP listenerstartupComplete
channel, which is closed right before we startAcceptLoop
, andreadyForConnections
will wait for it
The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.
An accompanying PR nats-io/nats.go#774 adds support to the client side.
This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.
/cc @nats-io/core
Működő példa embedded mode-ra
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // for configuring the embeded NATS server
14 // a beágyazott NATS szerver konfigurálásához
15 // set DonListen as true
16 // állítsa a DontListen-t igazra
17 DontListen: true,
18 }
19
20 // Initialize new server with options
21 // Új szerver inicializálása opciókkal
22 ns, err := server.NewServer(opts)
23
24 if err != nil {
25 panic(err)
26 }
27
28 // Start the server via goroutine
29 // A szerver indítása goroutine-on keresztül
30 go ns.Start()
31
32 // Wait for server to be ready for connections
33 // Várja meg, amíg a szerver készen áll a kapcsolatokra
34 if !ns.ReadyForConnections(10 * time.Second) {
35 panic("not ready for connection")
36 }
37
38 // Connect to server via in-process connection
39 // Csatlakozás a szerverhez in-process kapcsolaton keresztül
40 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
41
42 if err != nil {
43 panic(err)
44 }
45
46 subject := "my-subject"
47
48 // Subscribe to the subject
49 // Feliratkozás a subject-re
50 nc.Subscribe(subject, func(msg *nats.Msg) {
51 // Print message data
52 // Üzenetadatok kiírása
53 data := string(msg.Data)
54 fmt.Println(data)
55
56 // Shutdown the server (optional)
57 // A szerver leállítása (opcionális)
58 ns.Shutdown()
59 })
60
61 // Publish data to the subject
62 // Adatok publikálása a subject-re
63 nc.Publish(subject, []byte("Hello embedded NATS!"))
64
65 // Wait for server shutdown
66 // Várja meg a szerver leállását
67 ns.WaitForShutdown()
68}
69
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Most látható, hogy nem keletkezik további network hop, ahogyan az szándékozott volt.
A háttérben
TL;DR
- Ez egy sequence diagram, amely bemutatja, hogyan működnek belsőleg a függvények, amikor ezt a kódot a
main.go
fájlban futtatjuk, és a lényeg a következő:- A
DontListen: true
opcióval a szerver kihagyja azAcceptLoop
nevű kliens figyelési fázist. - Ha a kliens
Connect
opciói közül azInProcessServer
aktiválva van, akkor in-memory connection-t hoz létre,net.Pipe
segítségével pipe-ot épít, majd a pipe végétnet.Conn
típusban visszaadja a kliensnek. - A kliens és a szerver ezen a connection-ön keresztül in-process communication-t végez.
- A
Szerver
AcceptLoop
1// nats-server/server/server.go
2
3// Wait for clients.
4// Várja a klienseket.
5if !opts.DontListen {
6 s.AcceptLoop(clientListenReady)
7}
- Először is, ha a
DontListen
értéke true, akkor azAcceptLoop
nevű kliens figyelési fázis kimarad.
1// nats-server/server/server.go
2
3// AcceptLoop is exported for easier testing.
4// Az AcceptLoop exportálva van a könnyebb tesztelés érdekében.
5func (s *Server) AcceptLoop(clr chan struct{}) {
6 // If we were to exit before the listener is setup properly,
7 // make sure we close the channel.
8 // Ha kilépnénk, mielőtt a listener megfelelően be lenne állítva,
9 // győződjön meg róla, hogy bezárja a csatornát.
10 defer func() {
11 if clr != nil {
12 close(clr)
13 }
14 }()
15
16 if s.isShuttingDown() {
17 return
18 }
19
20 // Snapshot server options.
21 // Szerver opciók pillanatképe.
22 opts := s.getOpts()
23
24 // Setup state that can enable shutdown
25 // Leállítást lehetővé tevő állapot beállítása
26 s.mu.Lock()
27 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
28 l, e := natsListen("tcp", hp)
29 s.listenerErr = e
30 if e != nil {
31 s.mu.Unlock()
32 s.Fatalf("Error listening on port: %s, %q", hp, e)
33 return
34 }
35 s.Noticef("Listening for client connections on %s",
36 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
37
38 // Alert of TLS enabled.
39 // TLS engedélyezésének jelzése.
40 if opts.TLSConfig != nil {
41 s.Noticef("TLS required for client connections")
42 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
43 s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
44 }
45 }
46
47 // If server was started with RANDOM_PORT (-1), opts.Port would be equal
48 // to 0 at the beginning this function. So we need to get the actual port
49 // Ha a szerver RANDOM_PORT (-1) opcióval indult, az opts.Port értéke 0 lenne
50 // a függvény elején. Ezért meg kell szerezni az aktuális portot.
51 if opts.Port == 0 {
52 // Write resolved port back to options.
53 // Írja vissza a feloldott portot az opciókba.
54 opts.Port = l.Addr().(*net.TCPAddr).Port
55 }
56
57 // Now that port has been set (if it was set to RANDOM), set the
58 // server's info Host/Port with either values from Options or
59 // ClientAdvertise.
60 // Most, hogy a port beállítva van (ha RANDOM-ra volt állítva), állítsa be a
61 // szerver info Host/Port értékét az Options vagy a ClientAdvertise értékeivel.
62 if err := s.setInfoHostPort(); err != nil {
63 s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
64 l.Close()
65 s.mu.Unlock()
66 return
67 }
68 // Keep track of client connect URLs. We may need them later.
69 // Tartsa nyilván a kliens csatlakozási URL-eket. Később szükségünk lehet rájuk.
70 s.clientConnectURLs = s.getClientConnectURLs()
71 s.listener = l
72
73 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
74 func(_ error) bool {
75 if s.isLameDuckMode() {
76 // Signal that we are not accepting new clients
77 // Jelzi, hogy nem fogadunk új klienseket
78 s.ldmCh <- true
79 // Now wait for the Shutdown...
80 // Most várja meg a leállítást...
81 <-s.quitCh
82 return true
83 }
84 return false
85 })
86 s.mu.Unlock()
87
88 // Let the caller know that we are ready
89 // Értesítse a hívót, hogy készen állunk
90 close(clr)
91 clr = nil
92}
- Az AcceptLoop függvény a következő folyamatokat hajtja végre. Ezek a részek, mint a
TLS
vagy ahostPort
, a hálózati kommunikációval kapcsolatosak, és elhagyhatók, ha in-process communication-t alkalmazunk, mivel akkor nincs rájuk szükség.
Kliens
InProcessServer
1
2// nats-go/nats.go
3
4// Connect will attempt to connect to the NATS system.
5// A Connect megpróbál csatlakozni a NATS rendszerhez.
6// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
7// Az URL tartalmazhat felhasználónév/jelszó szemantikát. pl. nats://derek:pass@localhost:4222
8// Comma separated arrays are also supported, e.g. urlA, urlB.
9// Vesszővel elválasztott tömbök is támogatottak, pl. urlA, urlB.
10// Options start with the defaults but can be overridden.
11// Az opciók az alapértelmezettekkel kezdődnek, de felülírhatók.
12// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
13// A NATS Server websocket portjához való csatlakozáshoz használja a `ws` vagy `wss` sémát, például
14// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
15// `ws://localhost:8080`. Vegye figyelembe, hogy a websocket sémák nem keverhetők másokkal (nats/tls).
16func Connect(url string, options ...Option) (*Conn, error) {
17 opts := GetDefaultOptions()
18 opts.Servers = processUrlString(url)
19 for _, opt := range options {
20 if opt != nil {
21 if err := opt(&opts); err != nil {
22 return nil, err
23 }
24 }
25 }
26 return opts.Connect()
27}
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4// Az Options használható egy testreszabott kapcsolat létrehozására.
5type Options struct {
6 // Url represents a single NATS server url to which the client
7 // Az Url egyetlen NATS szerver URL-t jelöl, amelyhez a kliens
8 // will be connecting. If the Servers option is also set, it
9 // csatlakozni fog. Ha a Servers opció is be van állítva, akkor az
10 // then becomes the first server in the Servers array.
11 // lesz az első szerver a Servers tömbben.
12 Url string
13
14 // InProcessServer represents a NATS server running within the
15 // Az InProcessServer egy NATS szervert jelöl, amely ugyanabban a
16 // same process. If this is set then we will attempt to connect
17 // folyamatban fut. Ha ez be van állítva, akkor megpróbálunk csatlakozni
18 // to the server directly rather than using external TCP conns.
19 // közvetlenül a szerverhez, külső TCP kapcsolatok használata nélkül.
20 InProcessServer InProcessConnProvider
21
22 //...
23}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- A
Connect
függvény, amely a NATS szerver és a NATS kliens közötti kapcsolatot hozza létre, beállíthatja a kliens URL-t és a connect Option-t, és az ezen opciókat tartalmazó Options struct-ban található egyInProcessServer
nevű mező, amelyInProcessConnProvider
interface típusú.
1// main.go of example code
2
3// Initialize new server with options
4// Új szerver inicializálása opciókkal
5ns, err := server.NewServer(opts)
6
7//...
8
9// Connect to server via in-process connection
10// Csatlakozás a szerverhez in-process kapcsolaton keresztül
11nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Amikor a nats kliens
Connect
hívást kezdeményez, és anats.InProcessServer(ns)
értéket adja át azInProcessServer
mezőnek, akkor
1// nats-go/nats.go
2
3// InProcessServer is an Option that will try to establish a direction to a NATS server
4// Az InProcessServer egy opció, amely megpróbálja irányt létesíteni egy NATS szerverhez
5// running within the process instead of dialing via TCP.
6// amely a folyamaton belül fut, a TCP-n keresztüli tárcsázás helyett.
7func InProcessServer(server InProcessConnProvider) Option {
8 return func(o *Options) error {
9 o.InProcessServer = server
10 return nil
11 }
12}
- az opció
InProcessServer
része beágyazott NATS szerverrel helyettesítődik, és
1// nats-go/nats.go
2
3// createConn will connect to the server and wrap the appropriate
4// A createConn csatlakozik a szerverhez, és becsomagolja a megfelelő
5// bufio structures. It will do the right thing when an existing
6// bufio struktúrákat. Helyesen fog működni, ha már létező
7// connection is in place.
8// kapcsolat van érvényben.
9func (nc *Conn) createConn() (err error) {
10 if nc.Opts.Timeout < 0 {
11 return ErrBadTimeout
12 }
13 if _, cur := nc.currentServer(); cur == nil {
14 return ErrNoServers
15 }
16
17 // If we have a reference to an in-process server then establish a
18 // Ha van hivatkozásunk egy in-process szerverre, akkor létesítsünk
19 // connection using that.
20 // kapcsolatot azzal.
21 if nc.Opts.InProcessServer != nil {
22 conn, err := nc.Opts.InProcessServer.InProcessConn()
23 if err != nil {
24 return fmt.Errorf("failed to get in-process connection: %w", err)
25 }
26 nc.conn = conn
27 nc.bindToNewConn()
28 return nil
29 }
30
31 //...
32}
- ez az interface a kapcsolatot létrehozó
createConn
függvényben, ha azInProcessServer
opció nem nil (érvényes), akkor végrehajtja az opcióban találhatóInProcessConn
metódust, és
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// Az InProcessConn egy in-process kapcsolatot ad vissza a szerverhez,
5// avoiding the need to use a TCP listener for local connectivity
6// elkerülve a TCP listener használatának szükségességét a helyi kapcsolódáshoz
7// within the same process. This can be used regardless of the
8// ugyanazon folyamaton belül. Ez használható függetlenül a
9// state of the DontListen option.
10// DontListen opció állapotától.
11func (s *Server) InProcessConn() (net.Conn, error) {
12 pl, pr := net.Pipe()
13 if !s.startGoRoutine(func() {
14 s.createClientInProcess(pl)
15 s.grWG.Done()
16 }) {
17 pl.Close()
18 pr.Close()
19 return nil, fmt.Errorf("failed to create connection")
20 }
21 return pr, nil
22}
- meghívja a szerverben implementált
InProcessConn
függvényt és végrehajtja azt. - Ez a függvény a NATS Go kliensében, a
nats.go
-ban, akkor hívódik meg, ha az nc (nats connection)InProcessServer
mezője nem nil, létrehozza a connection-t (net.Conn
), majd azt a szerver connection-jéhez köti.
Go consumer driven interface-e
A típus az interface-t metódusainak implementálásával valósítja meg. Nincs explicit szándéknyilatkozat, nincs „implements” kulcsszó. Az implicit interface-ek szétválasztják egy interface definícióját az implementációjától, amely aztán bármely csomagban megjelenhet előzetes egyeztetés nélkül.
Az interface-ek implicit módon vannak implementálva, A Tour of Go
Ha egy típus csak egy interface implementálására létezik, és azon túl soha nem lesz exportált metódusa, akkor nincs szükség magának a típusnak az exportálására.
- Ez az interface design jól tükrözi a Go-ban gyakran emlegetett consumer defined interface-t és a structural typing-ot (duck typing), ezért szeretném röviden bemutatni ezt a témát is.
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4// Az Options használható egy testreszabott kapcsolat létrehozására.
5type Options struct {
6 // Url represents a single NATS server url to which the client
7 // Az Url egyetlen NATS szerver URL-t jelöl, amelyhez a kliens
8 // will be connecting. If the Servers option is also set, it
9 // csatlakozni fog. Ha a Servers opció is be van állítva, akkor az
10 // then becomes the first server in the Servers array.
11 // lesz az első szerver a Servers tömbben.
12 Url string
13
14 // InProcessServer represents a NATS server running within the
15 // Az InProcessServer egy NATS szervert jelöl, amely ugyanabban a
16 // same process. If this is set then we will attempt to connect
17 // folyamatban fut. Ha ez be van állítva, akkor megpróbálunk csatlakozni
18 // to the server directly rather than using external TCP conns.
19 // közvetlenül a szerverhez, külső TCP kapcsolatok használata nélkül.
20 InProcessServer InProcessConnProvider
21
22 //...
23}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Térjünk vissza a kódra. A nats.go kliensben az
InProcessServer
opció struct mezőjeInProcessConnProvider
interface-ként van definiálva, amely csak azInProcessConn
metódust hajtja végre.
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// Az InProcessConn egy in-process kapcsolatot ad vissza a szerverhez,
5// avoiding the need to use a TCP listener for local connectivity
6// elkerülve a TCP listener használatának szükségességét a helyi kapcsolódáshoz
7// within the same process. This can be used regardless of the
8// ugyanazon folyamaton belül. Ez használható függetlenül a
9// state of the DontListen option.
10// DontListen opció állapotától.
11func (s *Server) InProcessConn() (net.Conn, error) {
12 pl, pr := net.Pipe()
13 if !s.startGoRoutine(func() {
14 s.createClientInProcess(pl)
15 s.grWG.Done()
16 }) {
17 pl.Close()
18 pr.Close()
19 return nil, fmt.Errorf("failed to create connection")
20 }
21 return pr, nil
22}
- Azonban a bemeneti típus a nats-server
Server
típusa, amely az InProcessConn mellett számos más funkciót is ellát. - Ez azért van így, mert ebben a helyzetben a kliens számára csak az a fontos, hogy az
InProcessConn
interface biztosított-e vagy sem, minden más irreleváns. - Ezért a nats.go kliens csak egy
InProcessConnProvider
nevű consumer defined interface-t hozott létre és használ, amely kizárólag azInProcessConn() (net.Conn, error)
funkciót definiálja.
Összefoglalás
- Röviden tárgyaltuk a NATS embedded mode-ját és működését, valamint a Go consumer defined interface-ét, amelyet a NATS kódjában is megfigyelhetünk.
- Reméljük, hogy ez az információ hasznos lesz azok számára, akik a NATS-ot a fent említett célokra használják, és ezzel zárjuk ezt a cikket.