Cum comunică NATS-urile încorporate cu aplicația Go?
Noțiuni introductive
Despre NATS
Aplicațiile și serviciile software trebuie să schimbe date. NATS este o infrastructură care permite un astfel de schimb de date, segmentat sub formă de mesaje. Numim aceasta un "middleware orientat pe mesaje".
Cu NATS, dezvoltatorii de aplicații pot:
- Construi fără efort aplicații client-server distribuite și scalabile.
- Stoca și distribui date în timp real într-o manieră generală. Acest lucru poate fi realizat flexibil în diverse medii, limbaje, furnizori de cloud și sisteme on-premises.
- NATS este un broker de mesaje configurat în Go.
NATS încorporat
Dacă aplicația dumneavoastră este în Go și dacă se potrivește cazului dumneavoastră de utilizare și scenariilor de implementare, puteți chiar încorpora un server NATS în interiorul aplicației dumneavoastră.
Încorporarea NATS, documentația NATS
- O caracteristică particulară a NATS este că, în cazul aplicațiilor construite în Go, acesta suportă modul embedded.
- Astfel, în loc de abordarea tipică a unui broker de mesaje, care implică pornirea unui server broker separat și comunicarea prin intermediul clientului aplicației cu acel server, brokerul însuși poate fi încorporat (embed) direct în aplicația construită în Go.
Beneficiile și cazurile de utilizare ale NATS încorporat
- Există un videoclip pe YouTube bine explicat, așa că voi face referire la linkul videoclipului.
- Chiar și fără a implementa un server broker de mesaje separat, se poate construi o aplicație modular monolith și se poate atinge separate of concern, beneficiind în același timp de avantajul de a încorpora NATS în modul embedded. În plus, devine posibilă și o single binary deployment.
- Poate fi utilizat util nu doar pe platforme fără rețea (WASM), ci și în aplicații offline-first.
Exemplu din documentația oficială
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialize new server with options
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Start the server via goroutine
22 go ns.Start()
23
24 // Wait for server to be ready for connections
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Connect to server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Subscribe to the subject
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Print message data
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Shutdown the server (optional)
45 ns.Shutdown()
46 })
47
48 // Publish data to the subject
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Wait for server shutdown
52 ns.WaitForShutdown()
53}
- Acesta este un exemplu de NATS încorporat din documentația oficială NATS, dar dacă se urmează acest cod exemplu, comunicarea nu se realizează în modul de încorporare.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Când se rulează fișierul Go cu
go run .
în timp ce se monitorizează traficul de rețea de pe localhost (127.0.0.1) folosind comandawatch 'netstat -an | grep 127.0.0.1'
, se pot observa noi cereri de rețea care pornesc de la portul implicit al NATS,4222
.
Configurații corecte pentru modul încorporat
Pentru a comunica în modul embedded așa cum se intenționează, sunt necesare următoarele două opțiuni:
- Client: Trebuie să se includă opțiunea
InProcessServer
. - Server: În
Server.Options
, flag-ulDontListen
trebuie să fie explicit setat latrue
.
- Client: Trebuie să se includă opțiunea
Aceste aspecte nu au fost documentate oficial și originea acestei funcționalități poate fi identificată prin intermediul PR respectiv.
This PR adds three things:
InProcessConn()
function toServer
which builds anet.Pipe
to get a connection to the NATS server without using TCP socketsDontListen
option which tells the NATS server not to listen on the usual TCP listenerstartupComplete
channel, which is closed right before we startAcceptLoop
, andreadyForConnections
will wait for it
The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.
An accompanying PR nats-io/nats.go#774 adds support to the client side.
This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.
/cc @nats-io/core
Exemplu funcțional pentru modul încorporat
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // for configuring the embeded NATS server
14 // set DonListen as true
15 DontListen: true,
16 }
17
18 // Initialize new server with options
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Start the server via goroutine
26 go ns.Start()
27
28 // Wait for server to be ready for connections
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Connect to server via in-process connection
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Subscribe to the subject
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Print message data
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Shutdown the server (optional)
49 ns.Shutdown()
50 })
51
52 // Publish data to the subject
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Wait for server shutdown
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Acum se poate observa că nu mai apare niciun network hop suplimentar, așa cum s-a intenționat.
Sub capotă
TL;DR
- Diagrama de secvență de mai sus ilustrează modul în care funcțiile interne operează atunci când codul este executat în
main.go
, iar esența este următoarea:- Prin
DontListen: true
, serverul omite faza de ascultare a clientului numităAcceptLoop
. - Dacă opțiunea
InProcessServer
a clientului este activată, se creează o conexiune în memorie și, prinnet.Pipe
, se formează un pipe, iar capătul acestuia este returnat clientului ca tipnet.Conn
. - Clientul și serverul realizează o comunicare in-process prin intermediul acestei conexiuni.
- Prin
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- În primul rând, dacă
DontListen
este true, faza de ascultare a clientului,AcceptLoop
, este omisă.
1// nats-server/server/server.go
2
3// AcceptLoop is exported for easier testing.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // If we were to exit before the listener is setup properly,
6 // make sure we close the channel.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Snapshot server options.
18 opts := s.getOpts()
19
20 // Setup state that can enable shutdown
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Error listening on port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Listening for client connections on %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Alert of TLS enabled.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS required for client connections")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38 }
39 }
40
41 // If server was started with RANDOM_PORT (-1), opts.Port would be equal
42 // to 0 at the beginning this function. So we need to get the actual port
43 if opts.Port == 0 {
44 // Write resolved port back to options.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Now that port has been set (if it was set to RANDOM), set the
49 // server's info Host/Port with either values from Options or
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Keep track of client connect URLs. We may need them later.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signal that we are not accepting new clients
65 s.ldmCh <- true
66 // Now wait for the Shutdown...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Let the caller know that we are ready
75 close(clr)
76 clr = nil
77}
- De altfel, funcția AcceptLoop parcurge următoarele etape. Acestea sunt părți legate de comunicarea în rețea, cum ar fi
TLS
sauhostPort
, care sunt inutile în cazul comunicării in-process și pot fi omise.
Client
InProcessServer
1
2// nats-go/nats.go
3
4// Connect will attempt to connect to the NATS system.
5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
6// Comma separated arrays are also supported, e.g. urlA, urlB.
7// Options start with the defaults but can be overridden.
8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4type Options struct {
5 // Url represents a single NATS server url to which the client
6 // will be connecting. If the Servers option is also set, it
7 // then becomes the first server in the Servers array.
8 Url string
9
10 // InProcessServer represents a NATS server running within the
11 // same process. If this is set then we will attempt to connect
12 // to the server directly rather than using external TCP conns.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Funcția
Connect
, care inițiază conexiunea între serverul NATS și clientul NATS, permite configurarea unui URL de client și a unei opțiuni de conectare, iar structura Options, care grupează aceste opțiuni, conține un câmpInProcessServer
de tip interfațăInProcessConnProvider
.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Când clientul NATS inițiază o conexiune, dacă se transmite
nats.InProcessServer(ns)
câmpuluiInProcessServer
, atunci
1// nats-go/nats.go
2
3// InProcessServer is an Option that will try to establish a direction to a NATS server
4// running within the process instead of dialing via TCP.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- opțiunea InProcessServer este înlocuită cu serverul NATS încorporat, și
1// nats-go/nats.go
2
3// createConn will connect to the server and wrap the appropriate
4// bufio structures. It will do the right thing when an existing
5// connection is in place.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // If we have a reference to an in-process server then establish a
15 // connection using that.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("failed to get in-process connection: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- această interfață execută
InProcesConn
al InProcessServer din opțiune, dacă opțiuneaInProcessServer
nu este nil (validă), în funcțiacreateConn
care creează o conexiune.
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// avoiding the need to use a TCP listener for local connectivity
5// within the same process. This can be used regardless of the
6// state of the DontListen option.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- funcția
InProcessConn
implementată pe server este apelată și executată. - Această funcție, atunci când
InProcessServer
al nc (conexiunea NATS) în clientul Go NATS (nats.go
) nu este nil, este apelată pentru a crea o conexiune (net.Conn
) și a o lega la conexiunea serverului.
Interfață driven de consumator în Go
Un tip implementează o interfață prin implementarea metodelor sale. Nu există o declarație explicită de intenție, niciun cuvânt cheie "implements". Interfețele implicite decuplează definirea unei interfețe de implementarea sa, care ar putea apoi să apară în orice pachet fără o aranjare prealabilă.
Interfețele sunt implementate implicit, Un tur al Go
If a type exists only to implement an interface and will never have exported methods beyond that interface, there is no need to export the type itself.
- Acest design de interfață ilustrează bine conceptul de interfață definită de consumator și structural typing (duck typing), frecvent întâlnite în Go, motiv pentru care am dorit să introduc și acest subiect.
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4type Options struct {
5 // Url represents a single NATS server url to which the client
6 // will be connecting. If the Servers option is also set, it
7 // then becomes the first server in the Servers array.
8 Url string
9
10 // InProcessServer represents a NATS server running within the
11 // same process. If this is set then we will attempt to connect
12 // to the server directly rather than using external TCP conns.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Să revenim la cod. În clientul nats.go, câmpul structurii opțiunilor
InProcessServer
a fost definit ca o interfațăInProcessConnProvider
care execută doarInProcessConn
.
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// avoiding the need to use a TCP listener for local connectivity
5// within the same process. This can be used regardless of the
6// state of the DontListen option.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- Totuși, tipul care intră în aceasta este
Server
din nats-server, care îndeplinește o varietate de funcții pe lângă InProcessConn. - Aceasta se datorează faptului că, în această situație, preocuparea clientului este doar dacă a fost furnizată interfața
InProcessConn
, iar alte aspecte nu sunt de o importanță majoră. - Prin urmare, clientul nats.go a creat și utilizează doar o interfață definită de consumator,
InProcessConnProvider
, care definește doar funcționalitateaInProcessConn() (net.Conn, error)
.
Concluzie
- Am abordat pe scurt modul încorporat al NATS și modul său de funcționare, precum și interfața definită de consumator în Go, care poate fi confirmată prin codul NATS.
- Sper ca aceste informații să fie utile celor care utilizează NATS în scopurile menționate mai sus și cu aceasta închei acest articol.