Como NATS embarcado se comunica com a aplicação Go?
Introdução
Sobre o NATS
Aplicações e serviços de software precisam trocar dados. NATS é uma infraestrutura que permite essa troca de dados, segmentada na forma de mensagens. Chamamos isso de "middleware orientado a mensagens".
Com NATS, desenvolvedores de aplicações podem:
- Construir sem esforço aplicações cliente-servidor distribuídas e escaláveis.
- Armazenar e distribuir dados em tempo real de maneira geral. Isso pode ser alcançado de forma flexível em vários ambientes, linguagens, provedores de nuvem e sistemas on-premises.
- NATS é um message broker construído em Go.
NATS Embutido
Se sua aplicação está em Go, e se ela se encaixa em seu caso de uso e cenários de implantação, você pode até mesmo embutir um servidor NATS dentro de sua aplicação.
- Além disso, uma característica peculiar do NATS é que, para aplicações construídas em Go, ele oferece suporte ao modo embedded.
- Em outras palavras, em vez da abordagem comum de um message broker de iniciar um servidor de broker separado e se comunicar com ele através do cliente da aplicação, o próprio broker pode ser incorporado (embed) na aplicação desenvolvida em Go.
Benefícios e casos de uso do NATS embutido
- Há um vídeo no Youtube que explica bem, então me refiro ao link do vídeo.
- Mesmo sem implantar um servidor de message broker separado, é possível construir uma modular monolith application para alcançar a separate of concern e, ao mesmo tempo, aproveitar a vantagem de incorporar o NATS. Além disso, a single binary deployment também se torna possível.
- Pode ser utilmente usado não apenas em platforms with no network (WASM), mas também em offline-first applications.
Exemplo na documentação oficial
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Inicializa um novo servidor com opções
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Inicia o servidor via goroutine
22 go ns.Start()
23
24 // Aguarda o servidor estar pronto para conexões
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("não pronto para conexão")
27 }
28
29 // Conecta-se ao servidor
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Inscreve-se no assunto
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Imprime os dados da mensagem
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Desliga o servidor (opcional)
45 ns.Shutdown()
46 })
47
48 // Publica dados no assunto
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Aguarda o desligamento do servidor
52 ns.WaitForShutdown()
53}
- Este é um exemplo de NATS Embutido fornecido pela documentação oficial do NATS, mas se você seguir este código de exemplo, a comunicação não ocorrerá no modo embedding.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Usando o comando
watch 'netstat -an | grep 127.0.0.1'
para verificar a rede de e para o localhost (127.0.0.1), e executando o arquivo Go comgo run .
, você pode ver novas requisições de rede originadas da porta padrão do NATS,4222
, sendo adicionadas.
Configurações corretas para o modo embedding
Para que a comunicação ocorra no modo embedded conforme o esperado, as duas opções a seguir são necessárias:
- Cliente: A opção
InProcessServer
deve ser adicionada. - Servidor: A flag
DontListen
emServer.Options
deve ser explicitamente definida comotrue
.
- Cliente: A opção
Essas partes não foram oficialmente documentadas, e o início dessa funcionalidade pode ser identificado através deste PR.
Este PR adiciona três coisas:
- Função
InProcessConn()
aServer
que constrói umnet.Pipe
para obter uma conexão com o servidor NATS sem usar sockets TCP - Opção
DontListen
que diz ao servidor NATS para não escutar no listener TCP usual - Canal
startupComplete
, que é fechado logo antes de iniciarmosAcceptLoop
, ereadyForConnections
esperará por ele
A principal motivação para isso é que temos uma aplicação que pode rodar tanto no modo monólito (single-process) quanto no modo polilito (multi-process). Gostaríamos de poder usar NATS para ambos os modos por simplicidade, mas o modo monólito deve ser capaz de atender a uma variedade de plataformas onde abrir conexões de socket ou não faz sentido (mobile) ou simplesmente não é possível (WASM). Essas mudanças nos permitirão usar NATS inteiramente em-processo.
Um PR acompanhante nats-io/nats.go#774 adiciona suporte ao lado do cliente.
Este é o meu primeiro PR para este projeto, então peço desculpas antecipadamente se perdi algo óbvio em algum lugar.
/cc @nats-io/core
- Função
Exemplo de trabalho para o modo embutido
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // para configurar o servidor NATS embutido
14 // defina DonListen como true
15 DontListen: true,
16 }
17
18 // Inicializa um novo servidor com opções
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Inicia o servidor via goroutine
26 go ns.Start()
27
28 // Aguarda o servidor estar pronto para conexões
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("não pronto para conexão")
31 }
32
33 // Conecta-se ao servidor via conexão in-process
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Inscreve-se no assunto
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Imprime os dados da mensagem
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Desliga o servidor (opcional)
49 ns.Shutdown()
50 })
51
52 // Publica dados no assunto
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Aguarda o desligamento do servidor
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Agora, é possível observar que nenhum network hop adicional ocorre, conforme o esperado.
Nos bastidores
TL;DR
- Este é um sequence diagram que mostra como as funções internas operam quando o código é executado em
main.go
, e o principal é o seguinte:- Através de
DontListen: true
, o servidor omite a fase de escuta do cliente chamadaAcceptLoop
. - Se a opção
InProcessServer
do cliente for ativada, uma conexão in-memory será criada, um pipe será estabelecido vianet.Pipe
, e a extremidade do pipe será retornada ao cliente como um tiponet.Conn
. - O cliente e o servidor realizam comunicação in-process através dessa conexão.
- Através de
Servidor
AcceptLoop
1// nats-server/server/server.go
2
3// Aguarda clientes.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Primeiramente, se
DontListen
for true, a fase de escuta de clientes chamadaAcceptLoop
é omitida.
1// nats-server/server/server.go
2
3// AcceptLoop é exportado para facilitar o teste.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Se saíssemos antes do listener ser configurado corretamente,
6 // certifique-se de fechar o canal.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Tira um instantâneo das opções do servidor.
18 opts := s.getOpts()
19
20 // Configura o estado que pode habilitar o desligamento
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Erro ao escutar na porta: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Escutando por conexões de cliente em %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Alerta de TLS ativado.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS necessário para conexões de cliente")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clientes que não estão usando a opção \"TLS Handshake First\" falharão ao conectar")
38 }
39 }
40
41 // Se o servidor foi iniciado com RANDOM_PORT (-1), opts.Port seria igual
42 // a 0 no início desta função. Então precisamos obter a porta real
43 if opts.Port == 0 {
44 // Escreve a porta resolvida de volta nas opções.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Agora que a porta foi definida (se foi definida como RANDOM), define o
49 // Host/Port do servidor com os valores das Opções ou
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Erro ao definir INFO do servidor com o valor de ClientAdvertise de %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Acompanha as URLs de conexão do cliente. Podemos precisar delas mais tarde.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Sinaliza que não estamos aceitando novos clientes
65 s.ldmCh <- true
66 // Agora espera pelo Shutdown...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Avisa ao chamador que estamos prontos
75 close(clr)
76 clr = nil
77}
- A função AcceptLoop, a propósito, executa os seguintes processos. Partes relacionadas à comunicação de rede, como
TLS
ouhostPort
, podem ser omitidas, pois são desnecessárias para a comunicação in-process.
Cliente
InProcessServer
1
2// nats-go/nats.go
3
4// Connect tentará conectar ao sistema NATS.
5// A URL pode conter semântica de nome de usuário/senha. Ex: nats://derek:pass@localhost:4222
6// Arrays separados por vírgula também são suportados, Ex: urlA, urlB.
7// As opções começam com os padrões, mas podem ser sobrescritas.
8// Para conectar a uma porta websocket de um servidor NATS, use o esquema `ws` ou `wss`, como
9// `ws://localhost:8080`. Note que esquemas websocket não podem ser misturados com outros (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options podem ser usadas para criar uma conexão personalizada.
4type Options struct {
5 // Url representa uma única URL de servidor NATS à qual o cliente
6 // se conectará. Se a opção Servers também estiver definida, ela
7 // se tornará o primeiro servidor no array Servers.
8 Url string
9
10 // InProcessServer representa um servidor NATS rodando dentro do
11 // mesmo processo. Se isso for definido, tentaremos conectar
12 // ao servidor diretamente em vez de usar conexões TCP externas.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- A função
Connect
, que estabelece a conexão entre o servidor NATS e o cliente NATS, permite configurar a URL do cliente e as opções de conexão. O struct Options, que agrupa essas opções, possui um campoInProcessServer
do tipo interfaceInProcessConnProvider
.
1// main.go do código de exemplo
2
3// Inicializa novo servidor com opções
4ns, err := server.NewServer(opts)
5
6//...
7
8// Conecta ao servidor via conexão in-process
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Ao realizar a conexão no cliente nats, se
nats.InProcessServer(ns)
for passado para o campoInProcessServer
, então
1// nats-go/nats.go
2
3// createConn conectará ao servidor e envolverá as estruturas bufio
4// apropriadas. Ele fará a coisa certa quando uma conexão existente
5// estiver em vigor.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Se tivermos uma referência a um servidor in-process, então estabeleça uma
15 // conexão usando isso.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("falha ao obter conexão in-process: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- Essa interface, na função
createConn
que cria a conexão, se a opçãoInProcessServer
não for nil (válida), ela executa oInProcessConn
doInProcessServer
na opção e, então,
1// nats-server/server/server.go
2
3// InProcessConn retorna uma conexão in-process para o servidor,
4// evitando a necessidade de usar um listener TCP para conectividade local
5// dentro do mesmo processo. Isso pode ser usado independentemente do
6// estado da opção DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("falha ao criar conexão")
16 }
17 return pr, nil
18}
- invoca a função
InProcessConn
implementada no servidor. - Essa função é chamada quando o
InProcessServer
do nc (conexão NATS) no cliente Go do NATS (nats.go
) não é nil. Ela cria uma conexão (net.Conn
) e a vincula à conexão do servidor.
Interface orientada ao consumidor de Go
Um tipo implementa uma interface implementando seus métodos. Não há declaração explícita de intenção, nenhum "implements" keyword. Interfaces implícitas dissociam a definição de uma interface de sua implementação, que pode então aparecer em qualquer pacote sem arranjo prévio.
Interfaces are implemented implicitly, A Tour of Go
Se um tipo existe apenas para implementar uma interface e nunca terá métodos exportados além dessa interface, não há necessidade de exportar o próprio tipo.
- Esse design de interface incorpora bem o que é comumente chamado em Go de consumer defined interface e structural typing (duck typing), então gostaria de apresentar esse tópico também.
1// nats-go/nats.go
2
3// Options podem ser usadas para criar uma conexão personalizada.
4type Options struct {
5 // Url representa uma única URL de servidor NATS à qual o cliente
6 // se conectará. Se a opção Servers também estiver definida, ela
7 // se tornará o primeiro servidor no array Servers.
8 Url string
9
10 // InProcessServer representa um servidor NATS rodando dentro do
11 // mesmo processo. Se isso for definido, tentaremos conectar
12 // ao servidor diretamente em vez de usar conexões TCP externas.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Voltemos ao código. No cliente nats.go, o campo de struct da opção
InProcessServer
foi definido como a interfaceInProcessConnProvider
, que executa apenasInProcessConn
.
1// nats-server/server/server.go
2
3// InProcessConn retorna uma conexão in-process para o servidor,
4// evitando a necessidade de usar um listener TCP para conectividade local
5// dentro do mesmo processo. Isso pode ser usado independentemente do
6// estado da opção DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("falha ao criar conexão")
16 }
17 return pr, nil
18}
- No entanto, o tipo que entra nele é o
Server
do nats-server, que executa diversas funções além deInProcessConn
. - Isso ocorre porque, nessa situação, a preocupação do cliente se resume a saber se a interface
InProcessConn
foi fornecida ou não; outras coisas não são de grande importância. - Portanto, o cliente nats.go cria e usa apenas a interface
InProcessConnProvider
, que é uma consumer defined interface que define apenas a funcionalidadeInProcessConn() (net.Conn, error)
.
Conclusão
- Abordamos brevemente o modo embedded do NATS e seu funcionamento, bem como a consumer defined interface do Go que pode ser verificada através do código do NATS.
- Espero que esta informação seja útil para aqueles que usam o NATS para os propósitos mencionados, e com isso, concluo este artigo.