GoSuda

¿Cómo se comunican los NATS embebidos con la aplicación go?

By prravda
views ...

Primeros pasos

Acerca de NATS

Las aplicaciones y servicios de software necesitan intercambiar datos. NATS es una infraestructura que permite dicho intercambio de datos, segmentado en forma de mensajes. A esto lo llamamos un "middleware orientado a mensajes".

Con NATS, los desarrolladores de aplicaciones pueden:

  • Construir sin esfuerzo aplicaciones cliente-servidor distribuidas y escalables.
  • Almacenar y distribuir datos en tiempo real de manera general. Esto se puede lograr de forma flexible en varios entornos, lenguajes, proveedores de la nube y sistemas locales.

What is NATS, NATS docs

  • NATS es un message broker configurado en Go.

NATS embebido

Si su aplicación está en Go, y si se ajusta a su caso de uso y escenarios de despliegue, incluso puede embeber un servidor NATS dentro de su aplicación.

Embedding NATS, NATS docs

  • Una particularidad de NATS es que, para aplicaciones desarrolladas en Go, soporta el modo embedded.
  • Es decir, en lugar de la forma habitual de un message broker, que implica iniciar un servidor broker separado y comunicarse con él a través de clientes de la aplicación, es posible embeber (embed) el propio broker en una aplicación creada con Go.

Beneficios y casos de uso de NATS embebido

  • Existe un video de Youtube bien explicado, por lo que me remitiré a él.
  • Incluso sin desplegar un servidor de message broker separado, es posible crear una modular monolith application y lograr la separation of concern, aprovechando la ventaja de poder integrar NATS de forma embebida. Además, esto permite el single binary deployment.
  • Es útil no solo para platform with no network (wasm), sino también para offline-first application.

Ejemplo en la documentación oficial

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Inicializar nuevo servidor con opciones
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Iniciar el servidor a través de goroutine
22    go ns.Start()
23
24    // Esperar a que el servidor esté listo para las conexiones
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Conectarse al servidor
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Suscribirse al subject
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Imprimir datos del mensaje
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Apagar el servidor (opcional)
45        ns.Shutdown()
46    })
47
48    // Publicar datos en el subject
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Esperar el apagado del servidor
52    ns.WaitForShutdown()
53}
  • Este es un ejemplo de NATS embebido proporcionado en la documentación oficial de NATS, pero si se sigue este código de ejemplo, la comunicación no se realiza en modo embedding.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Al ejecutar el archivo go con go run . mientras se monitorea la red que pasa a través de localhost (127.0.0.1) con el comando watch 'netstat -an | grep 127.0.0.1', se puede observar que se añaden nuevas solicitudes de red originadas en el puerto predeterminado de NATS, 4222.

Configuraciones correctas para el modo embebido

  • Para que la comunicación se realice en modo embedded como se desea, se requieren las siguientes dos opciones:

    • Cliente: Se debe incluir la opción InProcessServer.
    • Servidor: Se debe especificar la flag DontListen como true en Server.Options.
  • Estas partes no estaban documentadas oficialmente, y el origen de esta funcionalidad se puede rastrear a través de este PR.

    Este PR añade tres cosas:

    1. Función InProcessConn() a Server que construye un net.Pipe para obtener una conexión al servidor NATS sin usar sockets TCP.
    2. Opción DontListen que le dice al servidor NATS que no escuche en el listener TCP habitual.
    3. Canal startupComplete, que se cierra justo antes de que iniciemos AcceptLoop, y readyForConnections esperará por él.

    La motivación principal para esto es que tenemos una aplicación que puede ejecutarse en modo monolítico (single-process) o polilítico (multi-process). Nos gustaría poder usar NATS para ambos modos por simplicidad, pero el modo monolítico tiene que ser capaz de atender una variedad de plataformas donde abrir conexiones de socket no tiene sentido (móvil) o simplemente no es posible (WASM). Estos cambios nos permitirán usar NATS completamente in-process en su lugar.

    Un PR adjunto nats-io/nats.go#774 añade soporte al lado del cliente.

    Este es mi primer PR a este proyecto, así que disculpas de antemano si he pasado por alto algo obvio en algún lugar.

    /cc @nats-io/core

Ejemplo funcional para el modo embebido

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// para configurar el servidor NATS embebido
14		// establecer DonListen como verdadero
15		DontListen: true,
16	}
17
18	// Inicializar nuevo servidor con opciones
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Iniciar el servidor a través de goroutine
26	go ns.Start()
27
28	// Esperar a que el servidor esté listo para las conexiones
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Conectarse al servidor a través de una conexión en proceso
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Suscribirse al subject
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Imprimir datos del mensaje
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Apagar el servidor (opcional)
49		ns.Shutdown()
50	})
51
52	// Publicar datos en el subject
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Esperar el apagado del servidor
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Ahora se puede observar que no se produce ningún salto de red adicional, tal como se pretendía.

Bajo el capó

TL;DR

diagram1

  • El diagrama de secuencia muestra cómo funcionan las funciones internamente cuando se ejecuta este código en main.go, y el punto principal es el siguiente:
    • A través de DontListen: true, el servidor omite la fase de escucha del cliente llamada AcceptLoop.
    • Si la opción InProcessServer de la conexión del cliente está activada, se crea una in-memory connection y se establece un pipe a través de net.Pipe, devolviendo el end of pipe al cliente como un tipo net.Conn.
    • El cliente y el servidor realizan una in-process communication a través de dicha conexión.

Servidor

AcceptLoop

1// nats-server/server/server.go
2
3// Esperar clientes.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Primero, si DontListen es true, se omite la fase de escucha del cliente llamada AcceptLoop.
 1// nats-server/server/server.go
 2
 3// AcceptLoop se exporta para facilitar las pruebas.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Si salimos antes de que el listener esté configurado correctamente,
 6	// nos aseguramos de cerrar el canal.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Instantánea de las opciones del servidor.
18	opts := s.getOpts()
19
20	// Configurar el estado que puede habilitar el apagado
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error escuchando en el puerto: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Escuchando conexiones de clientes en %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Alerta de TLS habilitado.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS requerido para conexiones de clientes")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Los clientes que no utilicen la opción \"TLS Handshake First\" no podrán conectarse")
38		}
39	}
40
41	// Si el servidor se inició con RANDOM_PORT (-1), opts.Port sería igual
42	// a 0 al principio de esta función. Por lo tanto, necesitamos obtener el puerto real
43	if opts.Port == 0 {
44		// Escribir el puerto resuelto de nuevo en las opciones.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Ahora que el puerto ha sido configurado (si fue configurado como RANDOM), establecer
49	// la información del servidor Host/Port con los valores de Options o
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error al establecer la información del servidor con el valor ClientAdvertise de %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Mantener un registro de las URLs de conexión del cliente. Podríamos necesitarlas más tarde.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Señal de que no estamos aceptando nuevos clientes
65				s.ldmCh <- true
66				// Ahora esperar el apagado...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Avisar al llamador de que estamos listos
75	close(clr)
76	clr = nil
77}
  • Cabe señalar que la función AcceptLoop realiza los siguientes procesos. Son partes relacionadas con la comunicación de red, como TLS o hostPort, que pueden omitirse si se realiza una in-process communication, ya que no son necesarias.

Cliente

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect intentará conectarse al sistema NATS.
 5// La URL puede contener semántica de nombre de usuario/contraseña. Por ejemplo: nats://derek:pass@localhost:4222
 6// También se admiten arrays separados por comas, por ejemplo: urlA, urlB.
 7// Las opciones comienzan con los valores predeterminados, pero se pueden sobrescribir.
 8// Para conectarse al puerto websocket de un servidor NATS, use el esquema `ws` o `wss`, como
 9// `ws://localhost:8080`. Tenga en cuenta que los esquemas websocket no se pueden mezclar con otros (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Las opciones se pueden usar para crear una conexión personalizada.
 4type Options struct {
 5	// Url representa una única url de servidor NATS a la que el cliente
 6	// se conectará. Si la opción Servers también está configurada,
 7	// entonces se convierte en el primer servidor en el array Servers.
 8	Url string
 9
10	// InProcessServer representa un servidor NATS ejecutándose dentro del
11	// mismo proceso. Si esto está configurado, intentaremos conectarnos
12	// directamente al servidor en lugar de usar conexiones TCP externas.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • La función Connect, que establece la conexión entre el servidor NATS y el cliente NATS, permite configurar la URL del cliente y la opción de conexión. La estructura Options, que agrupa estas opciones, contiene un campo InProcessServer de tipo de interfaz InProcessConnProvider.
1// main.go del código de ejemplo
2
3// Inicializar nuevo servidor con opciones
4ns, err := server.NewServer(opts)
5
6//...
7
8// Conectarse al servidor a través de una conexión en proceso
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Cuando el cliente NATS procede con la conexión, si se pasa nats.InProcessServer(ns) como el campo InProcessServer...
 1// nats-go/nats.go
 2
 3// InProcessServer es una opción que intentará establecer una dirección a un servidor NATS
 4// que se ejecuta dentro del proceso en lugar de marcar a través de TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • la opción InProcessServer se reemplaza por el servidor NATS embebido, y...
 1// nats-go/nats.go
 2
 3// createConn se conectará al servidor y envolverá las estructuras
 4// bufio apropiadas. Hará lo correcto cuando ya exista una
 5// conexión.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Si tenemos una referencia a un servidor en proceso, establecer una
15	// conexión usando eso.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • si la opción InProcessServer de la interfaz no es nil (es válida), se ejecuta InProcessConn de InProcessServer en la función createConn que crea la conexión, y...
 1// nats-server/server/server.go
 2
 3// InProcessConn devuelve una conexión en proceso al servidor,
 4// evitando la necesidad de usar un listener TCP para la conectividad local
 5// dentro del mismo proceso. Esto se puede usar independientemente del
 6// estado de la opción DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • se invoca y ejecuta InProcessConn implementada en el servidor.
  • Esta función, cuando se invoca si InProcessServer de nc (conexión nats) en el cliente Go de NATS (nats.go) no es nil, crea una conexión (net.Conn) y la enlaza a la conexión del servidor.

Interfaz dirigida por el consumidor de Go

Un tipo implementa una interfaz implementando sus métodos. No hay una declaración explícita de intención, ninguna palabra clave "implements". Las interfaces implícitas desvinculan la definición de una interfaz de su implementación, que luego podría aparecer en cualquier paquete sin previo acuerdo.

Interfaces are implemented implicitly, A Tour of Go

Si un tipo existe solo para implementar una interfaz y nunca tendrá métodos exportados más allá de esa interfaz, no es necesario exportar el tipo en sí.

Generality, Effective Go

  • Este diseño de interfaz incorpora bien lo que comúnmente se denomina consumer defined interface y structural typing (duck typing) en Go, por lo que me gustaría presentar este tema también.
 1// nats-go/nats.go
 2
 3// Las opciones se pueden usar para crear una conexión personalizada.
 4type Options struct {
 5	// Url representa una única url de servidor NATS a la que el cliente
 6	// se conectará. Si la opción Servers también está configurada,
 7	// entonces se convierte en el primer servidor en el array Servers.
 8	Url string
 9
10	// InProcessServer representa un servidor NATS ejecutándose dentro del
11	// mismo proceso. Si esto está configurado, intentaremos conectarnos
12	// directamente al servidor en lugar de usar conexiones TCP externas.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Volvamos al código. En el cliente nats.go, el campo de estructura de opciones InProcessServer se definió como la interfaz InProcessConnProvider que solo ejecuta InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn devuelve una conexión en proceso al servidor,
 4// evitando la necesidad de usar un listener TCP para la conectividad local
 5// dentro del mismo proceso. Esto se puede usar independientemente del
 6// estado de la opción DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Sin embargo, el tipo que se inserta es el Server de nats-server, que realiza diversas funciones además de InProcessConn.
  • Esto se debe a que, en esta situación, la preocupación del cliente es únicamente si se ha proporcionado o no la interfaz InProcessConn, y otras consideraciones no son de gran importancia.
  • Por lo tanto, el cliente nats.go solo crea y utiliza una consumer defined interface llamada InProcessConnProvider que define únicamente la función InProcessConn() (net.Conn, error).

Conclusión

  • Se ha abordado brevemente el modo embebido de NATS, su funcionamiento y la interfaz definida por el consumidor de Go, que se puede verificar a través del código de NATS.
  • Espero que esta información sea útil para quienes utilicen NATS con los propósitos mencionados, y con esto concluyo este artículo.