GoSuda

Como o NATS incorporado se comunica com o aplicativo Go?

By prravda
views ...

Introdução

Sobre o NATS

Aplicações e serviços de software necessitam trocar dados. NATS é uma infraestrutura que permite tal troca de dados, segmentada na forma de mensagens. A isso denominamos "middleware orientado a mensagens".

Com o NATS, desenvolvedores de aplicações podem:

  • Construir sem esforço aplicações cliente-servidor distribuídas e escaláveis.
  • Armazenar e distribuir dados em tempo real de forma geral. Isso pode ser alcançado de forma flexível em diversos ambientes, linguagens, provedores de nuvem e sistemas on-premises.

O que é NATS, documentação NATS

  • NATS é um message broker construído em Go.

NATS Embutido

Se sua aplicação está em Go, e se ela se adequa ao seu caso de uso e cenários de implantação, você pode até mesmo embutir um servidor NATS dentro de sua aplicação.

Embutindo NATS, documentação NATS

  • E há uma particularidade do NATS: ele suporta o modo embedded para aplicações construídas em Go.
  • Ou seja, em vez da abordagem comum de message brokers, que envolve a execução de um servidor de broker separado e a comunicação com ele por meio de um cliente da aplicação, é possível incorporar (embed) o próprio broker na aplicação Go.

Benefícios e casos de uso do NATS embutido

  • Há um vídeo no Youtube que explica bem, então me referirei a ele pelo link.
  • Mesmo sem implantar um servidor de message broker separado, é possível construir uma modular monolith application para alcançar a separação de preocupações e aproveitar a vantagem de embutir o NATS. Além disso, torna-se possível a single binary deployment.
  • Pode ser utilmente empregado não apenas em platform with no network (WASM), mas também em offline-first application.

Exemplo na documentação oficial

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialize new server with options
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start the server via goroutine
22    go ns.Start()
23
24    // Wait for server to be ready for connections
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Connect to server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Subscribe to the subject
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Print message data
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Shutdown the server (optional)
45        ns.Shutdown()
46    })
47
48    // Publish data to the subject
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Wait for server shutdown
52    ns.WaitForShutdown()
53}
  • Este é um exemplo de NATS embutido fornecido pela documentação oficial do NATS, mas se você seguir o código de exemplo, a comunicação não ocorrerá no modo embedding.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Ao executar o arquivo Go com go run . enquanto monitora o tráfego de rede para localhost (127.0.0.1) usando o comando watch 'netstat -an | grep 127.0.0.1', pode-se observar que novas requisições de rede originadas da porta padrão do NATS, 4222, são adicionadas.

Configurações corretas para o modo embedded

  • Para que a comunicação ocorra no modo embedded conforme o esperado, são necessárias as duas opções a seguir:

    • Cliente: A opção InProcessServer deve ser incluída.
    • Servidor: A flag DontListen deve ser explicitamente definida como true em Server.Options.
  • Essas partes não foram documentadas oficialmente, e o início dessa funcionalidade pode ser rastreado através desta PR.

    Este PR adiciona três coisas:

    1. A função InProcessConn() a Server que constrói um net.Pipe para obter uma conexão com o servidor NATS sem usar sockets TCP.
    2. A opção DontListen que instrui o servidor NATS a não escutar na porta TCP usual.
    3. Um canal startupComplete, que é fechado logo antes de iniciarmos o AcceptLoop, e readyForConnections aguardará por ele.

    A principal motivação para isso é que temos uma aplicação que pode rodar tanto em modo monolítico (single-process) quanto em modo polilítico (multi-process). Gostaríamos de poder usar o NATS para ambos os modos por simplicidade, mas o modo monolítico precisa ser capaz de atender a uma variedade de plataformas onde abrir conexões de socket ou não faz sentido (mobile) ou simplesmente não é possível (WASM). Essas mudanças nos permitirão usar o NATS inteiramente in-process.

    Um PR complementar nats-io/nats.go#774 adiciona suporte ao lado do cliente.

    Este é o meu primeiro PR para este projeto, então peço desculpas antecipadamente se perdi algo óbvio em qualquer lugar.

    /cc @nats-io/core

Exemplo funcional para o modo embedded

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// for configuring the embeded NATS server
14		// set DonListen as true
15		DontListen: true,
16	}
17
18	// Initialize new server with options
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start the server via goroutine
26	go ns.Start()
27
28	// Wait for server to be ready for connections
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Connect to server via in-process connection
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Subscribe to the subject
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Print message data
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Shutdown the server (optional)
49		ns.Shutdown()
50	})
51
52	// Publish data to the subject
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Wait for server shutdown
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Agora, é possível observar que nenhuma network hop adicional ocorre, conforme o esperado.

Sob o capô

TL;DR

diagram1

  • Este é um sequence diagram que ilustra como as funções internas operam quando o código é executado em main.go, e o essencial é o seguinte:
    • Com DontListen: true, o servidor omite a fase de escuta do cliente, conhecida como AcceptLoop.
    • Se a opção InProcessServer do cliente for ativada, uma in-memory connection é criada e um pipe é estabelecido através de net.Pipe, retornando a extremidade do pipe ao cliente como um net.Conn.
    • O cliente e o servidor realizam in-process communication através dessa conexão.

Servidor

AcceptLoop

1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Primeiramente, se DontListen for true, a fase de escuta do cliente, denominada AcceptLoop, é omitida.
 1// nats-server/server/server.go
 2
 3// AcceptLoop is exported for easier testing.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// If we were to exit before the listener is setup properly,
 6	// make sure we close the channel.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot server options.
18	opts := s.getOpts()
19
20	// Setup state that can enable shutdown
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Alert of TLS enabled.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS required for client connections")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38		}
39	}
40
41	// If server was started with RANDOM_PORT (-1), opts.Port would be equal
42	// to 0 at the beginning this function. So we need to get the actual port
43	if opts.Port == 0 {
44		// Write resolved port back to options.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Now that port has been set (if it was set to RANDOM), set the
49	// server's info Host/Port with either values from Options or
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Keep track of client connect URLs. We may need them later.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signal that we are not accepting new clients
65				s.ldmCh <- true
66				// Now wait for the Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Let the caller know that we are ready
75	close(clr)
76	clr = nil
77}
  • Para sua informação, a função AcceptLoop procede com os seguintes passos. Como se trata de partes relacionadas à comunicação de rede, como TLS e hostPort, que são desnecessárias para a in-process communication, podem ser omitidas.

Cliente

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect will attempt to connect to the NATS system.
 5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
 6// Comma separated arrays are also supported, e.g. urlA, urlB.
 7// Options start with the defaults but can be overridden.
 8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
 9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
  • A função Connect, que estabelece a conexão entre o servidor NATS e o cliente NATS, permite configurar a URL do cliente e as opções de conexão. A struct Options, que agrupa essas opções, possui um campo InProcessServer do tipo interface InProcessConnProvider.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Quando o cliente NATS procede com a conexão, ao passar nats.InProcessServer(ns) para o campo InProcessServer,
 1// nats-go/nats.go
 2
 3// InProcessServer is an Option that will try to establish a direction to a NATS server
 4// running within the process instead of dialing via TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • O InProcessServer da opção é substituído pelo embedded nats server, e
 1// nats-go/nats.go
 2
 3// createConn will connect to the server and wrap the appropriate
 4// bufio structures. It will do the right thing when an existing
 5// connection is in place.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// If we have a reference to an in-process server then establish a
15	// connection using that.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • Essa interface executa o InProcessConn do InProcessServer na opção, se a opção InProcessServer não for nula (válida) na função createConn que cria a conexão.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • A função InProcessConn implementada no servidor é invocada e executada.
  • Essa função, no cliente Go do NATS, nats.go, é chamada quando o InProcessServer de nc (conexão NATS) não é nulo, criando uma conexão (net.Conn) e vinculando-a à conexão do servidor.

Interface orientada ao consumidor em Go

Um tipo implementa uma interface ao implementar seus métodos. Não há uma declaração explícita de intenção, nenhum palavra-chave "implements". Interfaces implícitas desvinculam a definição de uma interface de sua implementação, que pode então aparecer em qualquer pacote sem pré-arranjos.

Interfaces são implementadas implicitamente, A Tour of Go

Se um tipo existe apenas para implementar uma interface e nunca terá métodos exportados além dessa interface, não há necessidade de exportar o próprio tipo.

Generalidade, Effective Go

  • Esse design de interface incorpora bem o que comumente se chama de consumer defined interface e structural typing (duck typing) em Go, por isso gostaria de apresentar este tópico também.
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Voltemos ao código. No cliente nats.go, o campo da struct de opção InProcessServer foi definido como a interface InProcessConnProvider, que apenas executa InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Contudo, o tipo que entra nele é o Server do nats-server, que executa não apenas InProcessConn, mas também diversas outras funções.
  • Isso ocorre porque o interesse do cliente, nessa situação, reside apenas em saber se a interface InProcessConn foi fornecida ou não, sendo as demais funcionalidades de menor importância.
  • Consequentemente, o cliente nats.go criou e utiliza apenas uma consumer defined interface chamada InProcessConnProvider, que define unicamente a funcionalidade InProcessConn() (net.Conn, error).

Conclusão

  • Abordamos brevemente o modo embedded do NATS, seu funcionamento e a consumer defined interface do Go, que pode ser verificada através do código do NATS.
  • Espero que esta informação seja útil para aqueles que utilizam o NATS com os propósitos mencionados, e com isso encerro este artigo.