GoSuda

W jaki sposób wbudowany NATS komunikuje się z aplikacją Go?

By prravda
views ...

Rozpoczęcie pracy

Informacje o NATS

Aplikacje i usługi oprogramowania muszą wymieniać dane. NATS to infrastruktura, która umożliwia taką wymianę danych, segmentowaną w formie wiadomości. Nazywamy to „oprogramowaniem pośredniczącym zorientowanym na wiadomości”.

Dzięki NATS, twórcy aplikacji mogą:

  • Bez wysiłku budować rozproszone i skalowalne aplikacje klient-serwer.
  • Przechowywać i rozpowszechniać dane w czasie rzeczywistym w sposób ogólny. Można to elastycznie osiągnąć w różnych środowiskach, językach, u dostawców chmury i w systemach lokalnych.

Co to jest NATS, dokumentacja NATS

  • NATS to broker wiadomości zbudowany w Go.

Wbudowany NATS

Jeśli Twoja aplikacja jest napisana w Go i pasuje do Twojego przypadku użycia oraz scenariuszy wdrożenia, możesz nawet osadzić serwer NATS wewnątrz swojej aplikacji.

Osadzanie NATS, dokumentacja NATS

  • NATS ma tę szczególną cechę, że w przypadku aplikacji zbudowanych w Go, obsługuje tryb embedded.
  • Oznacza to, że zamiast typowego dla brokera wiadomości podejścia, w którym oddzielny serwer brokera jest uruchamiany, a komunikacja odbywa się za pośrednictwem klienta aplikacji, sam broker może być osadzony (embed) w aplikacji napisanej w Go.

Korzyści i przypadki użycia wbudowanego NATS

  • Istnieje dobrze wyjaśniający film na YouTube, więc podaję link do tego filmu.
  • Nawet bez wdrażania oddzielnego serwera brokera wiadomości, możliwe jest stworzenie modularnej aplikacji monolitycznej i osiągnięcie separation of concerns, a jednocześnie wykorzystanie zalet osadzenia NATS w trybie embedded. Dodatkowo, umożliwia to wdrożenie jako pojedynczy plik binarny.
  • Może być użyteczny nie tylko dla platform bez sieci (WASM), ale także w aplikacjach offline-first.

Przykład z oficjalnej dokumentacji

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialize new server with options
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start the server via goroutine
22    go ns.Start()
23
24    // Wait for server to be ready for connections
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Connect to server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Subscribe to the subject
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Print message data
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Shutdown the server (optional)
45        ns.Shutdown()
46    })
47
48    // Publish data to the subject
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Wait for server shutdown
52    ns.WaitForShutdown()
53}
  • Jest to przykład Embedded NATS zamieszczony w oficjalnej dokumentacji NATS, jednakże postępując zgodnie z tym kodem, komunikacja w trybie embedding nie nastąpi.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Używając komendy watch 'netstat -an | grep 127.0.0.1' do monitorowania ruchu sieciowego do i z localhosta (127.0.0.1), a następnie uruchamiając plik Go za pomocą go run ., można zaobserwować dodawanie nowych żądań sieciowych wychodzących z domyślnego portu NATS, czyli 4222.

Prawidłowe konfiguracje dla trybu osadzania

  • Aby komunikacja w trybie embedded działała zgodnie z zamierzeniami, wymagane są dwie poniższe opcje:

    • Klient: Należy dodać opcję InProcessServer.
    • Serwer: W Server.Options należy ustawić flagę DontListen na true.
  • Te szczegóły nie były oficjalnie udokumentowane, a początek tej funkcji można prześledzić poprzez ten PR.

    This PR adds three things:

    1. InProcessConn() function to Server which builds a net.Pipe to get a connection to the NATS server without using TCP sockets
    2. DontListen option which tells the NATS server not to listen on the usual TCP listener
    3. startupComplete channel, which is closed right before we start AcceptLoop, and readyForConnections will wait for it

    The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.

    An accompanying PR nats-io/nats.go#774 adds support to the client side.

    This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.

    /cc @nats-io/core

Działający przykład dla trybu embedded

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// for configuring the embeded NATS server
14		// set DonListen as true
15		DontListen: true,
16	}
17
18	// Initialize new server with options
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start the server via goroutine
26	go ns.Start()
27
28	// Wait for server to be ready for connections
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Connect to server via in-process connection
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Subscribe to the subject
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Print message data
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Shutdown the server (optional)
49		ns.Shutdown()
50	})
51
52	// Publish data to the subject
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Wait for server shutdown
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Teraz można zaobserwować, że nie występują żadne dodatkowe przeskoki sieciowe, zgodnie z zamierzeniami.

Pod maską

TL;DR

diagram1

  • Jest to diagram sekwencji, który ilustruje, które funkcje i w jaki sposób działają wewnętrznie po uruchomieniu tego kodu w main.go, a jego główne założenia są następujące:
    • Poprzez DontListen: true, serwer pomija fazę nasłuchiwania klienta, zwaną AcceptLoop.
    • Jeśli opcja InProcessServer klienta zostanie aktywowana, zostanie utworzone połączenie in-memory, a poprzez net.Pipe zostanie utworzony potok, którego koniec zostanie zwrócony klientowi jako typ net.Conn.
    • Klient i serwer komunikują się wewnątrzprocesowo za pośrednictwem tego połączenia.

Serwer

AcceptLoop

1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Po pierwsze, jeśli DontListen jest ustawione na true, faza nasłuchiwania klienta o nazwie AcceptLoop jest pomijana.
 1// nats-server/server/server.go
 2
 3// AcceptLoop is exported for easier testing.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// If we were to exit before the listener is setup properly,
 6	// make sure we close the channel.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot server options.
18	opts := s.getOpts()
19
20	// Setup state that can enable shutdown
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Alert of TLS enabled.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS required for client connections")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38		}
39	}
40
41	// If server was started with RANDOM_PORT (-1), opts.Port would be equal
42	// to 0 at the beginning this function. So we need to get the actual port
43	if opts.Port == 0 {
44		// Write resolved port back to options.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Now that port has been set (if it was set to RANDOM), set the
49	// server's info Host/Port with either values from Options or
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Keep track of client connect URLs. We may need them later.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signal that we are not accepting new clients
65				s.ldmCh <- true
66				// Now wait for the Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Let the caller know that we are ready
75	close(clr)
76	clr = nil
77}
  • Należy zauważyć, że funkcja AcceptLoop wykonuje następujące kroki. Są to części związane z komunikacją sieciową, takie jak TLS czy hostPort, które można pominąć, jeśli komunikacja odbywa się wewnątrz procesu.

Klient

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect will attempt to connect to the NATS system.
 5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
 6// Comma separated arrays are also supported, e.g. urlA, urlB.
 7// Options start with the defaults but can be overridden.
 8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
 9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Funkcja Connect, która obsługuje połączenie między serwerem NATS a klientem NATS, umożliwia ustawienie adresu URL klienta i opcji połączenia. W strukturze Options, która zawiera te opcje, znajduje się pole InProcessServer typu interfejsu InProcessConnProvider.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Kiedy klient NATS próbuje nawiązać połączenie, przekazując nats.InProcessServer(ns) do pola InProcessServer,
 1// nats-go/nats.go
 2
 3// InProcessServer is an Option that will try to establish a direction to a NATS server
 4// running within the process instead of dialing via TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • opcja InProcessServer zostaje zastąpiona przez osadzony serwer NATS, a następnie
 1// nats-go/nats.go
 2
 3// createConn will connect to the server and wrap the appropriate
 4// bufio structures. It will do the right thing when an existing
 5// connection is in place.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// If we have a reference to an in-process server then establish a
15	// connection using that.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • ten interfejs wywołuje InProcesConn w InProcessServer z opcji, jeśli opcja InProcessServer nie jest nil (jest prawidłowa), w funkcji createConn, która tworzy połączenie.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • wywołuje i wykonuje InProcessConn zaimplementowaną w serwerze.
  • Ta funkcja jest wywoływana w kliencie Go NATS (nats.go), jeśli InProcessServer w nc (połączeniu NATS) nie jest nullem. Tworzy ona połączenie (net.Conn) i wiąże je z połączeniem serwera.

Interfejs sterowany przez konsumenta w Go

Typ implementuje interfejs poprzez implementację jego metod. Nie ma jawnej deklaracji intencji, żadnego słowa kluczowego „implements”. Interfejsy niejawne oddzielają definicję interfejsu od jego implementacji, która może następnie pojawić się w dowolnym pakiecie bez wcześniejszych ustaleń.

Interfejsy są implementowane niejawnie, A Tour of Go

If a type exists only to implement an interface and will never have exported methods beyond that interface, there is no need to export the type itself.

Ogólność, Effective Go

  • Ten projekt interfejsu dobrze odzwierciedla często wspominane w Go interfejsy zdefiniowane przez konsumenta i structural typing (duck typing), dlatego chciałbym poruszyć również ten temat.
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Wróćmy do kodu. Pole struktury opcji InProcessServer w kliencie nats.go zostało zdefiniowane jako interfejs InProcessConnProvider, który wykonuje tylko InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Jednakże, typem, który się w nim znajduje, jest Server z nats-server, który wykonuje różnorodne funkcje, nie tylko InProcessConn.
  • Dzieje się tak, ponieważ w tej sytuacji klienta obchodzi tylko to, czy dostarczono interfejs InProcessConn, a inne rzeczy nie są tak ważne.
  • Dlatego klient nats.go stworzył i używa tylko interfejsu InProcessConnProvider zdefiniowanego przez konsumenta, który definiuje tylko funkcję InProcessConn() (net.Conn, error).

Wnioski

  • Krótko omówiono tryb embedded NATS, jego działanie oraz interfejs zdefiniowany przez konsumenta w Go, który można znaleźć w kodzie NATS.
  • Mam nadzieję, że te informacje okażą się pomocne dla osób korzystających z NATS w podobnych celach.