GoSuda

Hogyan kommunikálnak az embedded NATS-ek a go alkalmazással?

By prravda
views ...

Első lépések

A NATS-ról

A szoftveralkalmazásoknak és szolgáltatásoknak adatot kell cserélniük. A NATS egy olyan infrastruktúra, amely lehetővé teszi az ilyen adatcserét, üzenetek formájában szegmentálva. Ezt nevezzük „üzenetorientált middleware”-nek.

A NATS segítségével az alkalmazásfejlesztők a következőket tehetik:

  • Könnyedén építhetnek elosztott és skálázható kliens-szerver alkalmazásokat.
  • Általános módon tárolhatnak és oszthatnak meg adatokat valós időben. Ez rugalmasan megvalósítható különböző környezetekben, nyelveken, felhőszolgáltatóknál és helyszíni rendszereken.

What is NATS, NATS docs

  • A NATS egy Go nyelven írt üzenetbróker.

Beágyazott NATS

Amennyiben az alkalmazása Go nyelven íródott, és illeszkedik a használati esetéhez és a telepítési forgatókönyveihez, akár egy NATS szervert is beágyazhat az alkalmazásába.

Embedding NATS, NATS docs

  • A NATS egyik különlegessége, hogy a Go nyelven írt alkalmazások esetében támogatja az embedded mode-ot.
  • Ez azt jelenti, hogy nem a megszokott üzenetbróker-módszerrel, külön bróker szerver futtatásával és az alkalmazás kliensein keresztül történő kommunikációval működik, hanem maga a bróker beágyazható (embed) a Go nyelven írt alkalmazásba.

A beágyazott NATS előnyei és használati esetei

  • Egy jól magyarázó Youtube videó áll rendelkezésre, ezért annak linkjére hivatkozom.
  • Még külön üzenetbróker szerver telepítése nélkül is létrehozható egy modular monolith alkalmazás, amely eléri a concern-ek szétválasztását, miközben kihasználja a NATS beágyazásának előnyeit. Ezenfelül lehetővé válik a single binary deployment is.
  • Nemcsak a platform with no network (WASM) esetében, hanem offline-first alkalmazásokban is hasznosítható.

Példa a hivatalos dokumentációból

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialize new server with options
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start the server via goroutine
22    go ns.Start()
23
24    // Wait for server to be ready for connections
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Connect to server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Subscribe to the subject
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Print message data
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Shutdown the server (optional)
45        ns.Shutdown()
46    })
47
48    // Publish data to the subject
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Wait for server shutdown
52    ns.WaitForShutdown()
53}
  • Ez a NATS hivatalos dokumentációjában található példa a beágyazott NATS-ra, azonban a megadott példakód szerint haladva a kommunikáció nem embedding mode-ban valósul meg.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • A watch 'netstat -an | grep 127.0.0.1' parancs segítségével figyelemmel kísérve a localhost (127.0.0.1) felé irányuló hálózati forgalmat, majd a go run . paranccsal futtatva a Go fájlt, látható, hogy új hálózati kérések adódnak hozzá a NATS alapértelmezett 4222-es portjáról indulva.

Helyes konfigurációk az embedding mode-hoz

  • A kívánt embedded mode-ban történő kommunikációhoz a következő két opcióra van szükség:

    • Kliens: Az InProcessServer opciót kell megadni.
    • Szerver: A Server.Options-ben a DontListen flag-et true értékre kell állítani.
  • Ezek a részek hivatalosan nem voltak dokumentálva, és e funkció kezdete a következő PR segítségével deríthető ki.

    Ez a PR három dolgot ad hozzá:

    1. InProcessConn() függvény a Server-hez, amely net.Pipe-ot épít ki a NATS szerverhez való csatlakozáshoz TCP aljzatok használata nélkül
    2. DontListen opció, amely megmondja a NATS szervernek, hogy ne hallgasson a szokásos TCP figyelőn
    3. startupComplete csatorna, amely közvetlenül az AcceptLoop indítása előtt záródik, és a readyForConnections várni fog rá

    Ennek fő motivációja az, hogy van egy alkalmazásunk, amely futhat monolit (egyfolyamatos) vagy polilit (többfolyamatos) módban. A NATS-ot mindkét módhoz használni szeretnénk az egyszerűség kedvéért, de a monolit módnak képesnek kell lennie különféle platformok kezelésére, ahol a socket kapcsolatok megnyitása vagy nincs értelme (mobil), vagy egyszerűen nem lehetséges (WASM). Ezek a változtatások lehetővé teszik számunkra, hogy a NATS-ot teljes mértékben folyamaton belül használjuk.

    Egy kísérő PR, a nats-io/nats.go#774 hozzáadja az ügyféloldali támogatást.

    Ez az első PR-em ehhez a projekthez, ezért előre is elnézést kérek, ha bármi nyilvánvalót kihagytam volna.

    /cc @nats-io/core

Működő példa az embedded mode-hoz

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// for configuring the embeded NATS server
14		// set DonListen as true
15		DontListen: true,
16	}
17
18	// Initialize new server with options
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start the server via goroutine
26	go ns.Start()
27
28	// Wait for server to be ready for connections
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Connect to server via in-process connection
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Subscribe to the subject
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Print message data
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Shutdown the server (optional)
49		ns.Shutdown()
50	})
51
52	// Publish data to the subject
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Wait for server shutdown
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Most látható, hogy a kívánt módon nem keletkezik további network hop.

Működés a háttérben

TL;DR

diagram1

  • Amikor ezt a kódot main.go-ból futtatjuk, egy sequence diagram mutatja be, hogy belsőleg milyen függvények és hogyan működnek, a lényeg az alábbiakban foglalható össze:
    • A DontListen: true beállítás révén a szerver kihagyja az AcceptLoop nevű kliens-figyelési fázist.
    • Ha a kliens Connect opciói közül az InProcessServer aktiválódik, akkor egy in-memory kapcsolat jön létre, amely net.Pipe segítségével egy pipe-ot hoz létre, majd a pipe végét net.Conn típusban visszaadja a kliensnek.
    • A kliens és a szerver ezen a kapcsolaton keresztül in-process kommunikációt folytat.

Szerver

AcceptLoop

1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Először is, ha a DontListen értéke true, akkor az AcceptLoop nevű kliens-figyelési fázis kimarad.
 1// nats-server/server/server.go
 2
 3// AcceptLoop is exported for easier testing.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// If we were to exit before the listener is setup properly,
 6	// make sure we close the channel.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot server options.
18	opts := s.getOpts()
19
20	// Setup state that can enable shutdown
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Alert of TLS enabled.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS required for client connections")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38		}
39	}
40
41	// If server was started with RANDOM_PORT (-1), opts.Port would be equal
42	// to 0 at the beginning this function. So we need to get the actual port
43	if opts.Port == 0 {
44		// Write resolved port back to options.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Now that port has been set (if it was set to RANDOM), set the
49	// server's info Host/Port with either values from Options or
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Keep track of client connect URLs. We may need them later.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signal that we are not accepting new clients
65				s.ldmCh <- true
66				// Now wait for the Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Let the caller know that we are ready
75	close(clr)
76	clr = nil
77}
  • Az AcceptLoop függvény egyébként a következő lépéseket hajtja végre: TLS vagy hostPort és hasonló, hálózati kommunikációval kapcsolatos részek, amelyek in-process kommunikáció esetén szükségtelenek, így elhagyhatók.

Kliens

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect will attempt to connect to the NATS system.
 5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
 6// Comma separated arrays are also supported, e.g. urlA, urlB.
 7// Options start with the defaults but can be overridden.
 8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
 9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • A Connect függvény, amely a NATS szerver és a NATS kliens közötti kapcsolatot hozza létre, lehetővé teszi a kliens URL és a connect opciók beállítását. Az opciókat tartalmazó Options struct-ban található egy InProcessServer nevű mező, amely InProcessConnProvider interface típusú.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Amikor a NATS kliens a Connect hívást kezdeményezi, és az InProcessServer mezőbe nats.InProcessServer(ns) értéket adunk át, akkor...
 1// nats-go/nats.go
 2
 3// createConn will connect to the server and wrap the appropriate
 4// bufio structures. It will do the right thing when an existing
 5// connection is in place.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// If we have a reference to an in-process server then establish a
15	// connection using that.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • ez az interface a createConn függvényben, amely a kapcsolatot hozza létre, amennyiben az InProcessServer opció nem nil (érvényes), akkor végrehajtja az opcióban található InProcessServer InProcessConn metódusát, és
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • meghívja a szerverben implementált InProcessConn függvényt.
  • Ez a funkció a NATS Go kliensében, a nats.go-ban akkor hívódik meg, ha az nc (NATS connection) InProcessServer mezője nem nil, létrehoz egy kapcsolatot (net.Conn), és azt a szerver kapcsolatához köti.

Go Consumer driven interface-e

Egy típus úgy implementál egy interfészt, hogy implementálja annak metódusait. Nincs explicit szándéknyilatkozat, nincs "implements" kulcsszó. Az implicit interfészek szétválasztják az interfész definícióját az implementációjától, amely így bármely csomagban megjelenhet előzetes megállapodás nélkül.

Interfaces are implemented implicitly, A Tour of Go

Ha egy típus kizárólag egy interfész implementálására létezik, és azon túl soha nem lesz exportált metódusa, akkor nincs szükség magának a típusnak az exportálására.

Generality, Effective Go

  • Ez az interfésztervezés jól tükrözi a Go-ban gyakran emlegetett „consumer defined interface” és a „structural typing” (duck typing) fogalmait, ezért szeretném ezt a témát is bemutatni.
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Térjünk vissza a kódhoz. A nats.go kliensben az InProcessServer opciós struct mezője egy InProcessConnProvider interfészként van definiálva, amely csak az InProcessConn metódust valósítja meg.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Azonban az ebbe beépülő típus a nats-server Server típusa, amely nemcsak az InProcessConn metódust, hanem számos más funkciót is ellát.
  • Ennek oka, hogy a kliens szempontjából ebben a helyzetben csak az InProcessConn interfész biztosítása a fontos, a többi szempont nem annyira lényeges.
  • Ezért a nats.go kliens csak egy InProcessConnProvider nevű, fogyasztó által definiált interfészt hozott létre, amely kizárólag az InProcessConn() (net.Conn, error) funkciót definiálja.

Összefoglalás

  • Röviden bemutattam a NATS embedded mode-ját és működését, valamint a NATS kódján keresztül megfigyelhető Go consumer defined interface koncepcióját.
  • Remélem, hogy ez az információ hasznos lesz azok számára, akik a NATS-ot a fent említett célokra használják, és ezzel zárom ezt a cikket.