GoSuda

Como NATS embarcado se comunica com a aplicação Go?

By prravda
views ...

Introdução

Sobre o NATS

Aplicações e serviços de software precisam trocar dados. NATS é uma infraestrutura que permite essa troca de dados, segmentada na forma de mensagens. Chamamos isso de "middleware orientado a mensagens".

Com NATS, desenvolvedores de aplicações podem:

  • Construir sem esforço aplicações cliente-servidor distribuídas e escaláveis.
  • Armazenar e distribuir dados em tempo real de maneira geral. Isso pode ser alcançado de forma flexível em vários ambientes, linguagens, provedores de nuvem e sistemas on-premises.

O que é NATS, NATS docs

  • NATS é um message broker construído em Go.

NATS Embutido

Se sua aplicação está em Go, e se ela se encaixa em seu caso de uso e cenários de implantação, você pode até mesmo embutir um servidor NATS dentro de sua aplicação.

Embedding NATS, NATS docs

  • Além disso, uma característica peculiar do NATS é que, para aplicações construídas em Go, ele oferece suporte ao modo embedded.
  • Em outras palavras, em vez da abordagem comum de um message broker de iniciar um servidor de broker separado e se comunicar com ele através do cliente da aplicação, o próprio broker pode ser incorporado (embed) na aplicação desenvolvida em Go.

Benefícios e casos de uso do NATS embutido

  • Há um vídeo no Youtube que explica bem, então me refiro ao link do vídeo.
  • Mesmo sem implantar um servidor de message broker separado, é possível construir uma modular monolith application para alcançar a separate of concern e, ao mesmo tempo, aproveitar a vantagem de incorporar o NATS. Além disso, a single binary deployment também se torna possível.
  • Pode ser utilmente usado não apenas em platforms with no network (WASM), mas também em offline-first applications.

Exemplo na documentação oficial

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Inicializa um novo servidor com opções
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Inicia o servidor via goroutine
22    go ns.Start()
23
24    // Aguarda o servidor estar pronto para conexões
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("não pronto para conexão")
27    }
28
29    // Conecta-se ao servidor
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Inscreve-se no assunto
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Imprime os dados da mensagem
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Desliga o servidor (opcional)
45        ns.Shutdown()
46    })
47
48    // Publica dados no assunto
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Aguarda o desligamento do servidor
52    ns.WaitForShutdown()
53}
  • Este é um exemplo de NATS Embutido fornecido pela documentação oficial do NATS, mas se você seguir este código de exemplo, a comunicação não ocorrerá no modo embedding.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Usando o comando watch 'netstat -an | grep 127.0.0.1' para verificar a rede de e para o localhost (127.0.0.1), e executando o arquivo Go com go run ., você pode ver novas requisições de rede originadas da porta padrão do NATS, 4222, sendo adicionadas.

Configurações corretas para o modo embedding

  • Para que a comunicação ocorra no modo embedded conforme o esperado, as duas opções a seguir são necessárias:

    • Cliente: A opção InProcessServer deve ser adicionada.
    • Servidor: A flag DontListen em Server.Options deve ser explicitamente definida como true.
  • Essas partes não foram oficialmente documentadas, e o início dessa funcionalidade pode ser identificado através deste PR.

    Este PR adiciona três coisas:

    1. Função InProcessConn() a Server que constrói um net.Pipe para obter uma conexão com o servidor NATS sem usar sockets TCP
    2. Opção DontListen que diz ao servidor NATS para não escutar no listener TCP usual
    3. Canal startupComplete, que é fechado logo antes de iniciarmos AcceptLoop, e readyForConnections esperará por ele

    A principal motivação para isso é que temos uma aplicação que pode rodar tanto no modo monólito (single-process) quanto no modo polilito (multi-process). Gostaríamos de poder usar NATS para ambos os modos por simplicidade, mas o modo monólito deve ser capaz de atender a uma variedade de plataformas onde abrir conexões de socket ou não faz sentido (mobile) ou simplesmente não é possível (WASM). Essas mudanças nos permitirão usar NATS inteiramente em-processo.

    Um PR acompanhante nats-io/nats.go#774 adiciona suporte ao lado do cliente.

    Este é o meu primeiro PR para este projeto, então peço desculpas antecipadamente se perdi algo óbvio em algum lugar.

    /cc @nats-io/core

Exemplo de trabalho para o modo embutido

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// para configurar o servidor NATS embutido
14		// defina DonListen como true
15		DontListen: true,
16	}
17
18	// Inicializa um novo servidor com opções
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Inicia o servidor via goroutine
26	go ns.Start()
27
28	// Aguarda o servidor estar pronto para conexões
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("não pronto para conexão")
31	}
32
33	// Conecta-se ao servidor via conexão in-process
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Inscreve-se no assunto
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Imprime os dados da mensagem
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Desliga o servidor (opcional)
49		ns.Shutdown()
50	})
51
52	// Publica dados no assunto
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Aguarda o desligamento do servidor
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Agora, é possível observar que nenhum network hop adicional ocorre, conforme o esperado.

Nos bastidores

TL;DR

diagram1

  • Este é um sequence diagram que mostra como as funções internas operam quando o código é executado em main.go, e o principal é o seguinte:
    • Através de DontListen: true, o servidor omite a fase de escuta do cliente chamada AcceptLoop.
    • Se a opção InProcessServer do cliente for ativada, uma conexão in-memory será criada, um pipe será estabelecido via net.Pipe, e a extremidade do pipe será retornada ao cliente como um tipo net.Conn.
    • O cliente e o servidor realizam comunicação in-process através dessa conexão.

Servidor

AcceptLoop

1// nats-server/server/server.go
2
3// Aguarda clientes.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Primeiramente, se DontListen for true, a fase de escuta de clientes chamada AcceptLoop é omitida.
 1// nats-server/server/server.go
 2
 3// AcceptLoop é exportado para facilitar o teste.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Se saíssemos antes do listener ser configurado corretamente,
 6	// certifique-se de fechar o canal.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Tira um instantâneo das opções do servidor.
18	opts := s.getOpts()
19
20	// Configura o estado que pode habilitar o desligamento
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Erro ao escutar na porta: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Escutando por conexões de cliente em %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Alerta de TLS ativado.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS necessário para conexões de cliente")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clientes que não estão usando a opção \"TLS Handshake First\" falharão ao conectar")
38		}
39	}
40
41	// Se o servidor foi iniciado com RANDOM_PORT (-1), opts.Port seria igual
42	// a 0 no início desta função. Então precisamos obter a porta real
43	if opts.Port == 0 {
44		// Escreve a porta resolvida de volta nas opções.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Agora que a porta foi definida (se foi definida como RANDOM), define o
49	// Host/Port do servidor com os valores das Opções ou
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Erro ao definir INFO do servidor com o valor de ClientAdvertise de %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Acompanha as URLs de conexão do cliente. Podemos precisar delas mais tarde.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Sinaliza que não estamos aceitando novos clientes
65				s.ldmCh <- true
66				// Agora espera pelo Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Avisa ao chamador que estamos prontos
75	close(clr)
76	clr = nil
77}
  • A função AcceptLoop, a propósito, executa os seguintes processos. Partes relacionadas à comunicação de rede, como TLS ou hostPort, podem ser omitidas, pois são desnecessárias para a comunicação in-process.

Cliente

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect tentará conectar ao sistema NATS.
 5// A URL pode conter semântica de nome de usuário/senha. Ex: nats://derek:pass@localhost:4222
 6// Arrays separados por vírgula também são suportados, Ex: urlA, urlB.
 7// As opções começam com os padrões, mas podem ser sobrescritas.
 8// Para conectar a uma porta websocket de um servidor NATS, use o esquema `ws` ou `wss`, como
 9// `ws://localhost:8080`. Note que esquemas websocket não podem ser misturados com outros (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options podem ser usadas para criar uma conexão personalizada.
 4type Options struct {
 5	// Url representa uma única URL de servidor NATS à qual o cliente
 6	// se conectará. Se a opção Servers também estiver definida, ela
 7	// se tornará o primeiro servidor no array Servers.
 8	Url string
 9
10	// InProcessServer representa um servidor NATS rodando dentro do
11	// mesmo processo. Se isso for definido, tentaremos conectar
12	// ao servidor diretamente em vez de usar conexões TCP externas.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • A função Connect, que estabelece a conexão entre o servidor NATS e o cliente NATS, permite configurar a URL do cliente e as opções de conexão. O struct Options, que agrupa essas opções, possui um campo InProcessServer do tipo interface InProcessConnProvider.
1// main.go do código de exemplo
2
3// Inicializa novo servidor com opções
4ns, err := server.NewServer(opts)
5
6//...
7
8// Conecta ao servidor via conexão in-process
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Ao realizar a conexão no cliente nats, se nats.InProcessServer(ns) for passado para o campo InProcessServer, então
 1// nats-go/nats.go
 2
 3// createConn conectará ao servidor e envolverá as estruturas bufio
 4// apropriadas. Ele fará a coisa certa quando uma conexão existente
 5// estiver em vigor.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Se tivermos uma referência a um servidor in-process, então estabeleça uma
15	// conexão usando isso.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("falha ao obter conexão in-process: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • Essa interface, na função createConn que cria a conexão, se a opção InProcessServer não for nil (válida), ela executa o InProcessConn do InProcessServer na opção e, então,
 1// nats-server/server/server.go
 2
 3// InProcessConn retorna uma conexão in-process para o servidor,
 4// evitando a necessidade de usar um listener TCP para conectividade local
 5// dentro do mesmo processo. Isso pode ser usado independentemente do
 6// estado da opção DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("falha ao criar conexão")
16	}
17	return pr, nil
18}
  • invoca a função InProcessConn implementada no servidor.
  • Essa função é chamada quando o InProcessServer do nc (conexão NATS) no cliente Go do NATS (nats.go) não é nil. Ela cria uma conexão (net.Conn) e a vincula à conexão do servidor.

Interface orientada ao consumidor de Go

Um tipo implementa uma interface implementando seus métodos. Não há declaração explícita de intenção, nenhum "implements" keyword. Interfaces implícitas dissociam a definição de uma interface de sua implementação, que pode então aparecer em qualquer pacote sem arranjo prévio.

Interfaces are implemented implicitly, A Tour of Go

Se um tipo existe apenas para implementar uma interface e nunca terá métodos exportados além dessa interface, não há necessidade de exportar o próprio tipo.

Generality, Effective Go

  • Esse design de interface incorpora bem o que é comumente chamado em Go de consumer defined interface e structural typing (duck typing), então gostaria de apresentar esse tópico também.
 1// nats-go/nats.go
 2
 3// Options podem ser usadas para criar uma conexão personalizada.
 4type Options struct {
 5	// Url representa uma única URL de servidor NATS à qual o cliente
 6	// se conectará. Se a opção Servers também estiver definida, ela
 7	// se tornará o primeiro servidor no array Servers.
 8	Url string
 9
10	// InProcessServer representa um servidor NATS rodando dentro do
11	// mesmo processo. Se isso for definido, tentaremos conectar
12	// ao servidor diretamente em vez de usar conexões TCP externas.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Voltemos ao código. No cliente nats.go, o campo de struct da opção InProcessServer foi definido como a interface InProcessConnProvider, que executa apenas InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn retorna uma conexão in-process para o servidor,
 4// evitando a necessidade de usar um listener TCP para conectividade local
 5// dentro do mesmo processo. Isso pode ser usado independentemente do
 6// estado da opção DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("falha ao criar conexão")
16	}
17	return pr, nil
18}
  • No entanto, o tipo que entra nele é o Server do nats-server, que executa diversas funções além de InProcessConn.
  • Isso ocorre porque, nessa situação, a preocupação do cliente se resume a saber se a interface InProcessConn foi fornecida ou não; outras coisas não são de grande importância.
  • Portanto, o cliente nats.go cria e usa apenas a interface InProcessConnProvider, que é uma consumer defined interface que define apenas a funcionalidade InProcessConn() (net.Conn, error).

Conclusão

  • Abordamos brevemente o modo embedded do NATS e seu funcionamento, bem como a consumer defined interface do Go que pode ser verificada através do código do NATS.
  • Espero que esta informação seja útil para aqueles que usam o NATS para os propósitos mencionados, e com isso, concluo este artigo.