W jaki sposób osadzone NATS komunikują się z aplikacją go?
Rozpoczęcie pracy
Informacje o NATS
Aplikacje i usługi oprogramowania muszą wymieniać dane. NATS to infrastruktura, która umożliwia taką wymianę danych, segmentowaną w formie wiadomości. Nazywamy to „message oriented middleware”.
Dzięki NATS, programiści aplikacji mogą:
- Bez wysiłku tworzyć rozproszone i skalowalne aplikacje klient-serwer.
- Przechowywać i dystrybuować dane w czasie rzeczywistym w ogólny sposób. Można to elastycznie osiągnąć w różnych środowiskach, językach, u dostawców chmury i w systemach lokalnych (on-premises).
- NATS to message broker zbudowany w Go.
Embedded NATS
Jeśli Twoja aplikacja jest napisana w Go, i jeśli pasuje to do Twojego przypadku użycia i scenariuszy wdrożenia, możesz nawet osadzić serwer NATS wewnątrz swojej aplikacji.
- Co więcej, NATS ma tę specyficzną cechę, że w przypadku aplikacji zbudowanych w Go obsługuje tryb embedded.
- Oznacza to, że zamiast typowego dla message brokerów uruchamiania oddzielnego serwera brokera i komunikacji z nim za pośrednictwem klienta aplikacji, sam broker może być osadzony (embed) w aplikacji napisanej w Go.
Korzyści i przypadki użycia embedded NATS
- Istnieje dobrze wyjaśniający film na Youtube, do którego odsyłam.
- Nawet bez wdrażania oddzielnego serwera message broker, można stworzyć modular monolith application, osiągając jednocześnie separate of concern, a także korzystając z zalet osadzenia NATS. Dodatkowo, możliwe staje się single binary deployment.
- Oprócz platform bez sieci (WASM), może być użytecznie stosowany w aplikacjach offline-first.
Przykład z oficjalnej dokumentacji
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialize new server with options
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Start the server via goroutine
22 go ns.Start()
23
24 // Wait for server to be ready for connections
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Connect to server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Subscribe to the subject
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Print message data
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Shutdown the server (optional)
45 ns.Shutdown()
46 })
47
48 // Publish data to the subject
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Wait for server shutdown
52 ns.WaitForShutdown()
53}
- Jest to przykład Embedded NATS zamieszczony w oficjalnej dokumentacji NATS, jednakże postępując zgodnie z tym kodem, komunikacja nie odbywa się w trybie embedding.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Jeśli uruchomimy ten plik Go za pomocą
go run .podczas monitorowania ruchu sieciowego na localhost (127.0.0.1) za pomocą poleceniawatch 'netstat -an | grep 127.0.0.1', zauważymy, że pojawiają się nowe żądania sieciowe wychodzące z domyślnego portu NATS, czyli4222.
Prawidłowe konfiguracje dla trybu embedding
Aby komunikacja odbywała się w zamierzonym trybie embedded, wymagane są następujące dwie opcje:
- Klient: Należy dodać opcję
InProcessServer. - Serwer: W
Server.Optionsnależy jawnie ustawić flagęDontListennatrue.
- Klient: Należy dodać opcję
Te szczegóły nie były oficjalnie udokumentowane, a początek tej funkcjonalności można zidentyfikować poprzez ten PR.
This PR adds three things:
InProcessConn()function toServerwhich builds anet.Pipeto get a connection to the NATS server without using TCP socketsDontListenoption which tells the NATS server not to listen on the usual TCP listenerstartupCompletechannel, which is closed right before we startAcceptLoop, andreadyForConnectionswill wait for it
The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.
An accompanying PR nats-io/nats.go#774 adds support to the client side.
This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.
/cc @nats-io/core
Działający przykład dla trybu embedded
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // for configuring the embeded NATS server
14 // set DonListen as true
15 DontListen: true,
16 }
17
18 // Initialize new server with options
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Start the server via goroutine
26 go ns.Start()
27
28 // Wait for server to be ready for connections
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Connect to server via in-process connection
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Subscribe to the subject
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Print message data
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Shutdown the server (optional)
49 ns.Shutdown()
50 })
51
52 // Publish data to the subject
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Wait for server shutdown
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Teraz można zaobserwować, że nie występują żadne dodatkowe przeskoki sieciowe, zgodnie z zamierzeniami.
Under the hood
TL;DR
- Jest to sequence diagram, który przedstawia, jak wewnętrznie działają funkcje po uruchomieniu tego kodu w
main.go, a jego istota jest następująca:- Za pomocą
DontListen: trueserwer pomija fazę nasłuchiwania klienta, czyliAcceptLoop. - Jeśli opcja
InProcessServerwśród opcji połączenia klienta jest aktywna, tworzy ona połączenie w pamięci i ustanawia pipe za pośrednictwemnet.Pipe, a następnie zwraca koniec pipe'a klientowi jako typnet.Conn. - Klient i serwer komunikują się w ramach procesu za pośrednictwem tego połączenia.
- Za pomocą
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Przede wszystkim, jeśli
DontListenjest ustawione natrue, faza nasłuchiwania klientaAcceptLoopjest pomijana.
1// nats-server/server/server.go
2
3// AcceptLoop is exported for easier testing.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // If we were to exit before the listener is setup properly,
6 // make sure we close the channel.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Snapshot server options.
18 opts := s.getOpts()
19
20 // Setup state that can enable shutdown
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Error listening on port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Listening for client connections on %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Alert of TLS enabled.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS required for client connections")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38 }
39 }
40
41 // If server was started with RANDOM_PORT (-1), opts.Port would be equal
42 // to 0 at the beginning this function. So we need to get the actual port
43 if opts.Port == 0 {
44 // Write resolved port back to options.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Now that port has been set (if it was set to RANDOM), set the
49 // server's info Host/Port with either values from Options or
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Keep track of client connect URLs. We may need them later.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signal that we are not accepting new clients
65 s.ldmCh <- true
66 // Now wait for the Shutdown...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Let the caller know that we are ready
75 close(clr)
76 clr = nil
77}
- Należy zauważyć, że funkcja AcceptLoop wykonuje następujące kroki. Są to części związane z komunikacją sieciową, takie jak
TLSczyhostPort, które mogą zostać pominięte, jeśli komunikacja odbywa się w ramach procesu, ponieważ nie są wówczas potrzebne.
Klient
InProcessServer
1
2// nats-go/nats.go
3
4// Connect will attempt to connect to the NATS system.
5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
6// Comma separated arrays are also supported, e.g. urlA, urlB.
7// Options start with the defaults but can be overridden.
8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4type Options struct {
5 // Url represents a single NATS server url to which the client
6 // will be connecting. If the Servers option is also set, it
7 // then becomes the first server in the Servers array.
8 Url string
9
10 // InProcessServer represents a NATS server running within the
11 // same process. If this is set then we will attempt to connect
12 // to the server directly rather than using external TCP conns.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Funkcja
Connect, która ustanawia połączenie między serwerem NATS a klientem NATS, umożliwia konfigurację URL klienta i opcji połączenia. W strukturze Options, która gromadzi te opcje, znajduje się poleInProcessServero typie interfejsuInProcessConnProvider.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Kiedy klient NATS inicjuje połączenie, przekazując
nats.InProcessServer(ns)do polaInProcessServer,
1// nats-go/nats.go
2
3// InProcessServer is an Option that will try to establish a direction to a NATS server
4// running within the process instead of dialing via TCP.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- opcja InProcessServer zostaje zastąpiona przez embedded nats server, i
1// nats-go/nats.go
2
3// createConn will connect to the server and wrap the appropriate
4// bufio structures. It will do the right thing when an existing
5// connection is in place.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // If we have a reference to an in-process server then establish a
15 // connection using that.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("failed to get in-process connection: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- ten interfejs, w funkcji
createConn, która tworzy połączenie, jeśli opcjaInProcessServernie jestnil(czyli jest ważna), wykonujeInProcesConnzInProcessServerznajdującego się w opcjach.
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// avoiding the need to use a TCP listener for local connectivity
5// within the same process. This can be used regardless of the
6// state of the DontListen option.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- Wówczas wywoływana jest i uruchamiana zaimplementowana na serwerze funkcja
InProcessConn. - Ta funkcja, w kliencie Go NATS (
nats.go), jeśliInProcessServerwnc(nats connection) nie jestnil, jest wywoływana w celu utworzenia połączenia (net.Conn), a następnie wiąże je z połączeniem serwera.
Consumer driven interface w Go
A type implements an interface by implementing its methods. There is no explicit declaration of intent, no "implements" keyword. Implicit interfaces decouple the definition of an interface from its implementation, which could then appear in any package without prearrangement.
Interfaces are implemented implicitly, A Tour of Go
If a type exists only to implement an interface and will never have exported methods beyond that interface, there is no need to export the type itself.
- Ten projekt interfejsu dobrze odzwierciedla koncepcje powszechnie określane w Go jako "consumer defined interface" i "structural typing" (duck typing), dlatego chciałbym je również krótko przedstawić.
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4type Options struct {
5 // Url represents a single NATS server url to which the client
6 // will be connecting. If the Servers option is also set, it
7 // then becomes the first server in the Servers array.
8 Url string
9
10 // InProcessServer represents a NATS server running within the
11 // same process. If this is set then we will attempt to connect
12 // to the server directly rather than using external TCP conns.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Wróćmy do kodu. W kliencie nats.go pole struktury opcji
InProcessServerzostało zdefiniowane jako interfejsInProcessConnProvider, który wykonuje tylkoInProcessConn.
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// avoiding the need to use a TCP listener for local connectivity
5// within the same process. This can be used regardless of the
6// state of the DontListen option.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- Jednakże typ, który jest tam wprowadzany, to
Serverz nats-server, który wykonuje nie tylkoInProcessConn, ale także wiele innych funkcji. - Dzieje się tak, ponieważ w tej sytuacji klienta obchodzi jedynie to, czy interfejs
InProcessConnzostał dostarczony, a inne aspekty nie mają większego znaczenia. - Dlatego klient nats.go tworzy i używa jedynie interfejsu
InProcessConnProviderzdefiniowanego przez konsumenta, który definiuje tylko funkcjęInProcessConn() (net.Conn, error).
Konkluzja
- Krótko omówiono tryb embedded NATS, jego sposób działania oraz interfejs Go zdefiniowany przez konsumenta, który można zaobserwować w kodzie NATS.
- Mam nadzieję, że te informacje będą pomocne dla osób korzystających z NATS w podobnych celach i tym samym kończę ten artykuł.