Como o NATS incorporado se comunica com o aplicativo Go?
Introdução
Sobre o NATS
Aplicações e serviços de software necessitam trocar dados. NATS é uma infraestrutura que permite tal troca de dados, segmentada na forma de mensagens. A isso denominamos "middleware orientado a mensagens".
Com o NATS, desenvolvedores de aplicações podem:
- Construir sem esforço aplicações cliente-servidor distribuídas e escaláveis.
- Armazenar e distribuir dados em tempo real de forma geral. Isso pode ser alcançado de forma flexível em diversos ambientes, linguagens, provedores de nuvem e sistemas on-premises.
- NATS é um message broker construído em Go.
NATS Embutido
Se sua aplicação está em Go, e se ela se adequa ao seu caso de uso e cenários de implantação, você pode até mesmo embutir um servidor NATS dentro de sua aplicação.
Embutindo NATS, documentação NATS
- E há uma particularidade do NATS: ele suporta o modo embedded para aplicações construídas em Go.
- Ou seja, em vez da abordagem comum de message brokers, que envolve a execução de um servidor de broker separado e a comunicação com ele por meio de um cliente da aplicação, é possível incorporar (embed) o próprio broker na aplicação Go.
Benefícios e casos de uso do NATS embutido
- Há um vídeo no Youtube que explica bem, então me referirei a ele pelo link.
- Mesmo sem implantar um servidor de message broker separado, é possível construir uma modular monolith application para alcançar a separação de preocupações e aproveitar a vantagem de embutir o NATS. Além disso, torna-se possível a single binary deployment.
- Pode ser utilmente empregado não apenas em platform with no network (WASM), mas também em offline-first application.
Exemplo na documentação oficial
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialize new server with options
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Start the server via goroutine
22 go ns.Start()
23
24 // Wait for server to be ready for connections
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Connect to server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Subscribe to the subject
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Print message data
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Shutdown the server (optional)
45 ns.Shutdown()
46 })
47
48 // Publish data to the subject
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Wait for server shutdown
52 ns.WaitForShutdown()
53}
- Este é um exemplo de NATS embutido fornecido pela documentação oficial do NATS, mas se você seguir o código de exemplo, a comunicação não ocorrerá no modo embedding.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Ao executar o arquivo Go com
go run .enquanto monitora o tráfego de rede para localhost (127.0.0.1) usando o comandowatch 'netstat -an | grep 127.0.0.1', pode-se observar que novas requisições de rede originadas da porta padrão do NATS,4222, são adicionadas.
Configurações corretas para o modo embedded
Para que a comunicação ocorra no modo embedded conforme o esperado, são necessárias as duas opções a seguir:
- Cliente: A opção
InProcessServerdeve ser incluída. - Servidor: A flag
DontListendeve ser explicitamente definida comotrueemServer.Options.
- Cliente: A opção
Essas partes não foram documentadas oficialmente, e o início dessa funcionalidade pode ser rastreado através desta PR.
Este PR adiciona três coisas:
- A função
InProcessConn()aServerque constrói umnet.Pipepara obter uma conexão com o servidor NATS sem usar sockets TCP. - A opção
DontListenque instrui o servidor NATS a não escutar na porta TCP usual. - Um canal
startupComplete, que é fechado logo antes de iniciarmos oAcceptLoop, ereadyForConnectionsaguardará por ele.
A principal motivação para isso é que temos uma aplicação que pode rodar tanto em modo monolítico (single-process) quanto em modo polilítico (multi-process). Gostaríamos de poder usar o NATS para ambos os modos por simplicidade, mas o modo monolítico precisa ser capaz de atender a uma variedade de plataformas onde abrir conexões de socket ou não faz sentido (mobile) ou simplesmente não é possível (WASM). Essas mudanças nos permitirão usar o NATS inteiramente in-process.
Um PR complementar nats-io/nats.go#774 adiciona suporte ao lado do cliente.
Este é o meu primeiro PR para este projeto, então peço desculpas antecipadamente se perdi algo óbvio em qualquer lugar.
/cc @nats-io/core
- A função
Exemplo funcional para o modo embedded
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // for configuring the embeded NATS server
14 // set DonListen as true
15 DontListen: true,
16 }
17
18 // Initialize new server with options
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Start the server via goroutine
26 go ns.Start()
27
28 // Wait for server to be ready for connections
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Connect to server via in-process connection
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Subscribe to the subject
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Print message data
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Shutdown the server (optional)
49 ns.Shutdown()
50 })
51
52 // Publish data to the subject
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Wait for server shutdown
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Agora, é possível observar que nenhuma network hop adicional ocorre, conforme o esperado.
Sob o capô
TL;DR
- Este é um sequence diagram que ilustra como as funções internas operam quando o código é executado em
main.go, e o essencial é o seguinte:- Com
DontListen: true, o servidor omite a fase de escuta do cliente, conhecida comoAcceptLoop. - Se a opção
InProcessServerdo cliente for ativada, uma in-memory connection é criada e um pipe é estabelecido através denet.Pipe, retornando a extremidade do pipe ao cliente como umnet.Conn. - O cliente e o servidor realizam in-process communication através dessa conexão.
- Com
Servidor
AcceptLoop
1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Primeiramente, se
DontListenfortrue, a fase de escuta do cliente, denominadaAcceptLoop, é omitida.
1// nats-server/server/server.go
2
3// AcceptLoop is exported for easier testing.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // If we were to exit before the listener is setup properly,
6 // make sure we close the channel.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Snapshot server options.
18 opts := s.getOpts()
19
20 // Setup state that can enable shutdown
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Error listening on port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Listening for client connections on %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Alert of TLS enabled.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS required for client connections")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38 }
39 }
40
41 // If server was started with RANDOM_PORT (-1), opts.Port would be equal
42 // to 0 at the beginning this function. So we need to get the actual port
43 if opts.Port == 0 {
44 // Write resolved port back to options.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Now that port has been set (if it was set to RANDOM), set the
49 // server's info Host/Port with either values from Options or
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Keep track of client connect URLs. We may need them later.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signal that we are not accepting new clients
65 s.ldmCh <- true
66 // Now wait for the Shutdown...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Let the caller know that we are ready
75 close(clr)
76 clr = nil
77}
- Para sua informação, a função AcceptLoop procede com os seguintes passos. Como se trata de partes relacionadas à comunicação de rede, como
TLSehostPort, que são desnecessárias para a in-process communication, podem ser omitidas.
Cliente
InProcessServer
1
2// nats-go/nats.go
3
4// Connect will attempt to connect to the NATS system.
5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
6// Comma separated arrays are also supported, e.g. urlA, urlB.
7// Options start with the defaults but can be overridden.
8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4type Options struct {
5 // Url represents a single NATS server url to which the client
6 // will be connecting. If the Servers option is also set, it
7 // then becomes the first server in the Servers array.
8 Url string
9
10 // InProcessServer represents a NATS server running within the
11 // same process. If this is set then we will attempt to connect
12 // to the server directly rather than using external TCP conns.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
- A função
Connect, que estabelece a conexão entre o servidor NATS e o cliente NATS, permite configurar a URL do cliente e as opções de conexão. A struct Options, que agrupa essas opções, possui um campoInProcessServerdo tipo interfaceInProcessConnProvider.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Quando o cliente NATS procede com a conexão, ao passar
nats.InProcessServer(ns)para o campoInProcessServer,
1// nats-go/nats.go
2
3// InProcessServer is an Option that will try to establish a direction to a NATS server
4// running within the process instead of dialing via TCP.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- O
InProcessServerda opção é substituído pelo embedded nats server, e
1// nats-go/nats.go
2
3// createConn will connect to the server and wrap the appropriate
4// bufio structures. It will do the right thing when an existing
5// connection is in place.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // If we have a reference to an in-process server then establish a
15 // connection using that.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("failed to get in-process connection: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- Essa interface executa o
InProcessConndo InProcessServer na opção, se a opçãoInProcessServernão for nula (válida) na funçãocreateConnque cria a conexão.
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// avoiding the need to use a TCP listener for local connectivity
5// within the same process. This can be used regardless of the
6// state of the DontListen option.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- A função
InProcessConnimplementada no servidor é invocada e executada. - Essa função, no cliente Go do NATS,
nats.go, é chamada quando oInProcessServerdenc(conexão NATS) não é nulo, criando uma conexão (net.Conn) e vinculando-a à conexão do servidor.
Interface orientada ao consumidor em Go
Um tipo implementa uma interface ao implementar seus métodos. Não há uma declaração explícita de intenção, nenhum palavra-chave "implements". Interfaces implícitas desvinculam a definição de uma interface de sua implementação, que pode então aparecer em qualquer pacote sem pré-arranjos.
Interfaces são implementadas implicitamente, A Tour of Go
Se um tipo existe apenas para implementar uma interface e nunca terá métodos exportados além dessa interface, não há necessidade de exportar o próprio tipo.
- Esse design de interface incorpora bem o que comumente se chama de consumer defined interface e structural typing (duck typing) em Go, por isso gostaria de apresentar este tópico também.
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4type Options struct {
5 // Url represents a single NATS server url to which the client
6 // will be connecting. If the Servers option is also set, it
7 // then becomes the first server in the Servers array.
8 Url string
9
10 // InProcessServer represents a NATS server running within the
11 // same process. If this is set then we will attempt to connect
12 // to the server directly rather than using external TCP conns.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Voltemos ao código. No cliente nats.go, o campo da struct de opção
InProcessServerfoi definido como a interfaceInProcessConnProvider, que apenas executaInProcessConn.
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// avoiding the need to use a TCP listener for local connectivity
5// within the same process. This can be used regardless of the
6// state of the DontListen option.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- Contudo, o tipo que entra nele é o
Serverdo nats-server, que executa não apenasInProcessConn, mas também diversas outras funções. - Isso ocorre porque o interesse do cliente, nessa situação, reside apenas em saber se a interface
InProcessConnfoi fornecida ou não, sendo as demais funcionalidades de menor importância. - Consequentemente, o cliente nats.go criou e utiliza apenas uma consumer defined interface chamada
InProcessConnProvider, que define unicamente a funcionalidadeInProcessConn() (net.Conn, error).
Conclusão
- Abordamos brevemente o modo embedded do NATS, seu funcionamento e a consumer defined interface do Go, que pode ser verificada através do código do NATS.
- Espero que esta informação seja útil para aqueles que utilizam o NATS com os propósitos mencionados, e com isso encerro este artigo.