GoSuda

¿Cómo se comunican los NATS embebidos con la aplicación Go?

By prravda
views ...

Primeros pasos

Acerca de NATS

Las aplicaciones y servicios de software necesitan intercambiar datos. NATS es una infraestructura que permite dicho intercambio de datos, segmentado en forma de mensajes. A esto lo denominamos "middleware orientado a mensajes".

Con NATS, los desarrolladores de aplicaciones pueden:

  • Construir sin esfuerzo aplicaciones cliente-servidor distribuidas y escalables.
  • Almacenar y distribuir datos en tiempo real de manera general. Esto puede lograrse de forma flexible en diversos entornos, lenguajes, proveedores de nube y sistemas locales.

¿Qué es NATS, NATS docs

  • NATS es un Message Broker compuesto en Go.

NATS embebido

Si su aplicación está en Go, y si se ajusta a su caso de uso y escenarios de despliegue, incluso puede embeber un servidor NATS dentro de su aplicación.

Embebiendo NATS, NATS docs

  • Y hay una característica particular de NATS: para las aplicaciones compuestas en Go, soporta el modo embebido.
  • Es decir, en lugar de la forma común de los Message Brokers, que implica la ejecución de un servidor de broker separado y la comunicación a través del cliente de la aplicación con dicho servidor, es posible integrar (embed) el propio broker directamente en la aplicación desarrollada en Go.

Beneficios y casos de uso de NATS embebido

  • Existe un video de Youtube bien explicado, por lo que me referiré al enlace del video.
  • Incluso sin desplegar un servidor de Message Broker separado, es posible crear una aplicación modular monolítica y lograr la separación de responsabilidades, al mismo tiempo que se aprovecha la ventaja de embeber NATS. Además, se habilita el despliegue de un único binario.
  • Puede ser utilizado de manera útil no solo en plataformas sin red (WASM), sino también en aplicaciones offline-first.

Ejemplo en la documentación oficial

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialize new server with options
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start the server via goroutine
22    go ns.Start()
23
24    // Wait for server to be ready for connections
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Connect to server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Subscribe to the subject
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Print message data
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Shutdown the server (optional)
45        ns.Shutdown()
46    })
47
48    // Publish data to the subject
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Wait for server shutdown
52    ns.WaitForShutdown()
53}
  • Este es un ejemplo de NATS embebido proporcionado en la documentación oficial de NATS, pero si se sigue este código de ejemplo, la comunicación no se realiza en modo embebido.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Al ejecutar el archivo go con go run . mientras se verifica la red que se comunica con localhost (127.0.0.1) mediante el comando watch 'netstat -an | grep 127.0.0.1', se puede observar que se añaden nuevas solicitudes de red originadas en el puerto predeterminado de NATS, 4222.

Configuraciones correctas para el modo embebido

  • Para que la comunicación se realice en el modo embebido, tal como se desea, se requieren las dos opciones siguientes:

    • Cliente: Se debe incluir la opción InProcessServer.
    • Servidor: En Server.Options, se debe especificar la bandera DontListen como true.
  • Estas secciones no están documentadas oficialmente, y el origen de esta funcionalidad se puede rastrear a través de este PR.

    Este PR añade tres elementos:

    1. Función InProcessConn() a Server que construye un net.Pipe para obtener una conexión al servidor NATS sin usar sockets TCP.
    2. Opción DontListen que indica al servidor NATS que no escuche en el listener TCP habitual.
    3. Canal startupComplete, que se cierra justo antes de iniciar AcceptLoop, y readyForConnections esperará por él.

    La motivación principal para esto es que tenemos una aplicación que puede ejecutarse tanto en modo monolítico (proceso único) como en modo polilítico (múltiples procesos). Nos gustaría poder usar NATS para ambos modos por simplicidad, pero el modo monolítico debe ser capaz de atender una variedad de plataformas donde abrir conexiones de socket o bien no tiene sentido (móvil) o simplemente no es posible (WASM). Estos cambios nos permitirán usar NATS completamente en el mismo proceso.

    Un PR complementario nats-io/nats.go#774 añade soporte para el lado del cliente.

    Este es mi primer PR a este proyecto, así que disculpas de antemano si he pasado por alto algo obvio en algún lugar.

    /cc @nats-io/core

Ejemplo funcional para el modo embebido

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// for configuring the embeded NATS server
14		// set DonListen as true
15		DontListen: true,
16	}
17
18	// Initialize new server with options
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start the server via goroutine
26	go ns.Start()
27
28	// Wait for server to be ready for connections
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Connect to server via in-process connection
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Subscribe to the subject
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Print message data
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Shutdown the server (optional)
49		ns.Shutdown()
50	})
51
52	// Publish data to the subject
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Wait for server shutdown
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Ahora se puede observar que no se produce ningún "network hop" adicional, tal como se pretendía.

Bajo el capó

TL;DR

diagram1

  • Este es un diagrama de secuencia que ilustra cómo funcionan internamente las funciones cuando se ejecuta este código en main.go, y en esencia, lo siguiente:
    • A través de DontListen: true, el servidor omite la fase de escucha del cliente llamada AcceptLoop.
    • Si la opción InProcessServer de conexión del cliente está activada, se crea una conexión en memoria y se establece una pipe mediante net.Pipe, devolviendo el extremo de la pipe al cliente como tipo net.Conn.
    • El cliente y el servidor se comunican en proceso a través de esta conexión.

Servidor

AcceptLoop

1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • En primer lugar, si DontListen es true, se omite la fase de escucha del cliente denominada AcceptLoop.
 1// nats-server/server/server.go
 2
 3// AcceptLoop is exported for easier testing.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// If we were to exit before the listener is setup properly,
 6	// make sure we close the channel.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot server options.
18	opts := s.getOpts()
19
20	// Setup state that can enable shutdown
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Alert of TLS enabled.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS required for client connections")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38		}
39	}
40
41	// If server was started with RANDOM_PORT (-1), opts.Port would be equal
42	// to 0 at the beginning this function. So we need to get the actual port
43	if opts.Port == 0 {
44		// Write resolved port back to options.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Now that port has been set (if it was set to RANDOM), set the
49	// server's info Host/Port with either values from Options or
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Keep track of client connect URLs. We may need them later.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signal that we are not accepting new clients
65				s.ldmCh <- true
66				// Now wait for the Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Let the caller know that we are ready
75	close(clr)
76	clr = nil
77}
  • Cabe señalar que la función AcceptLoop realiza los siguientes procesos: las partes relacionadas con la comunicación de red, como TLS o hostPort, son innecesarias en la comunicación en proceso, por lo que pueden omitirse.

Cliente

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect will attempt to connect to the NATS system.
 5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
 6// Comma separated arrays are also supported, e.g. urlA, urlB.
 7// Options start with the defaults but can be overridden.
 8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
 9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • La función Connect, que establece la conexión entre el servidor NATS y el cliente NATS, permite configurar la URL del cliente y la opción de conexión. La estructura Options, que agrupa estas opciones, contiene un campo InProcessServer de tipo de interfaz InProcessConnProvider.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Cuando el cliente NATS realiza la conexión, si se pasa nats.InProcessServer(ns) al campo InProcessServer...
 1// nats-go/nats.go
 2
 3// InProcessServer is an Option that will try to establish a direction to a NATS server
 4// running within the process instead of dialing via TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • La opción InProcessServer es reemplazada por el servidor NATS embebido.
 1// nats-go/nats.go
 2
 3// createConn will connect to the server and wrap the appropriate
 4// bufio structures. It will do the right thing when an existing
 5// connection is in place.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// If we have a reference to an in-process server then establish a
15	// connection using that.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • Esta interfaz, en la función createConn que establece la conexión, si la opción InProcessServer no es nula (es válida), ejecuta InProcessConn de la opción InProcessServer.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Se invoca y ejecuta InProcessConn, implementado en el servidor.
  • Esta función, en el cliente Go de NATS, nats.go, se invoca cuando InProcessServer de nc (conexión NATS) no es nulo, creando una conexión (net.Conn) y vinculándola a la conexión del servidor.

Interfaz dirigida por el consumidor de Go

Un tipo implementa una interfaz al implementar sus métodos. No hay una declaración explícita de intención, ni una palabra clave "implements". Las interfaces implícitas desacoplan la definición de una interfaz de su implementación, la cual podría aparecer en cualquier paquete sin acuerdo previo.

Interfaces are implemented implicitly, A Tour of Go

Si un tipo existe solo para implementar una interfaz y nunca tendrá métodos exportados más allá de esa interfaz, no es necesario exportar el tipo mismo.

Generality, Effective Go

  • Este diseño de interfaz incorpora bien lo que comúnmente se denomina interfaz definida por el consumidor y tipado estructural (duck typing) en Go, por lo que me gustaría presentar este tema también.
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Volvamos al código. En el cliente nats.go, el campo de estructura de opciones InProcessServer se definió como la interfaz InProcessConnProvider, que solo realiza InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Sin embargo, el tipo que se introduce en ella es Server de nats-server, que realiza una variedad de funciones además de InProcessConn.
  • Esto se debe a que la preocupación del cliente en esta situación es simplemente si se ha proporcionado o no la interfaz InProcessConn, y otras consideraciones no son de gran importancia.
  • Por lo tanto, el cliente nats.go solo ha creado y utiliza la interfaz InProcessConnProvider definida por el consumidor, que define únicamente la funcionalidad InProcessConn() (net.Conn, error).

Conclusión

  • Se ha abordado brevemente el modo embebido de NATS y su funcionamiento, así como la interfaz definida por el consumidor de Go, que puede confirmarse a través del código de NATS.
  • Espero que esta información sea de utilidad para quienes utilizan NATS con los propósitos mencionados, y con esto concluyo este artículo.