Miten sulautetut NATS-instanssit kommunikoivat Go-sovelluksen kanssa?
Aloitus
Tietoja NATSista
Ohjelmistosovellusten ja -palveluiden on vaihdettava tietoja. NATS on infrastruktuuri, joka mahdollistaa tällaisen tiedonvaihdon, segmentoituna viestien muodossa. Kutsumme tätä "viestiorientoituneeksi väliohjelmistoksi".
NATSilla sovelluskehittäjät voivat:
- Rakentaa vaivattomasti hajautettuja ja skaalautuvia client-server-sovelluksia.
- Tallentaa ja jakaa tietoja reaaliaikaisesti yleisellä tavalla. Tämä voidaan joustavasti saavuttaa eri ympäristöissä, kielillä, pilvipalveluntarjoajilla ja on-premises-järjestelmissä.
- NATS on Go-kielellä toteutettu message broker.
Embedded NATS
Jos sovelluksesi on Go-kielellä ja se sopii käyttötapaukseesi ja käyttöönottoympäristöösi, voit jopa upottaa NATS-palvelimen sovellukseesi.
- NATSilla on erityispiirre: se tukee embedded mode -tilaa Go-kielellä toteutetuille sovelluksille.
- Tämä tarkoittaa, että sen sijaan, että viestivälittäjän yleinen tapa olisi erillisen broker-palvelimen käynnistäminen ja sovelluksen clientin kommunikointi sen kanssa, brokerin itsensä voi upottaa (embed) Go-kielellä tehtyyn sovellukseen.
Embedded NATS:n edut ja käyttötapaukset
- Hyvin selitetty Youtube-video on olemassa, joten viittaan siihen.
- Vaikka erillistä message broker -palvelinta ei otettaisi käyttöön, on mahdollista luoda modular monolith application ja saavuttaa separate of concern, samalla kun hyödynnetään NATS:n upottamisen etuja. Lisäksi mahdollistuu single binary deployment.
- Sitä voidaan käyttää hyödyllisesti paitsi platform with no network (WASM) -ympäristöissä, myös offline-first application -sovelluksissa.
Esimerkki virallisista dokumenteista
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Alusta uusi palvelin asetuksilla
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Käynnistä palvelin goroutinen kautta
22 go ns.Start()
23
24 // Odota, että palvelin on valmis yhteyksiä varten
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Yhdistä palvelimeen
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Tilaa aihe
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Tulosta viestin tiedot
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Sammuta palvelin (valinnainen)
45 ns.Shutdown()
46 })
47
48 // Julkaise tietoja aiheeseen
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Odota palvelimen sammumista
52 ns.WaitForShutdown()
53}
- Tämä on NATS-virallisdokumentaatiossa esitetty Embedded NATS -esimerkki, mutta jos tätä esimerkkikoodia noudatetaan, communication ei tapahdu embedding mode -tilassa.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Komennolla
watch 'netstat -an | grep 127.0.0.1'voidaan tarkastella localhost (127.0.0.1) -verkkoa ja kun kyseinen Go-tiedosto suoritetaan komennollago run ., voidaan havaita uusia verkkopyyntöjä NATS:n oletusportista4222.
Oikeat konfiguraatiot embedded mode -tilaa varten
Jotta communication tapahtuisi tarkoitetussa embedded mode -tilassa, tarvitaan seuraavat kaksi vaihtoehtoa.
- Client:
InProcessServer-optio on annettava. - Server:
Server.Options:iin on määritettäväDontListen-lipputrue:ksi.
- Client:
Nämä osat eivät olleet virallisesti dokumentoituja, ja tämän ominaisuuden alkuperä voidaan jäljittää kyseiseen PR:ään.
Tämä PR lisää kolme asiaa:
InProcessConn()-funktioServer-objektiin, joka rakentaanet.Pipe-objektin yhteyden muodostamiseksi NATS-palvelimeen ilman TCP-socketsien käyttöä.DontListen-optio, joka käskee NATS-palvelinta olemaan kuuntelematta tavallista TCP-kuuntelijaa.startupComplete-kanava, joka suljetaan juuri ennenAcceptLoop-toiminnon käynnistämistä, jareadyForConnectionsodottaa sitä.
Tärkein motiivi tälle on, että meillä on sovellus, joka voi toimia joko monoliittisessa (yksiprosessisessa) tai polyliittisessä (moniprosessisessa) tilassa. Haluaisimme voida käyttää NATSia molemmissa tiloissa yksinkertaisuuden vuoksi, mutta monoliittisen tilan on pystyttävä palvelemaan erilaisia alustoja, joissa socket-yhteyksien avaaminen joko ei ole järkevää (mobiili) tai ei vain ole mahdollista (WASM). Nämä muutokset mahdollistavat NATSin käytön kokonaan prosessin sisällä.
Mukana tuleva PR nats-io/nats.go#774 lisää tuen client-puolelle.
Tämä on ensimmäinen PR:ni tähän projektiin, joten pyydän anteeksi etukäteen, jos olen jättänyt jotain ilmeistä huomaamatta.
/cc @nats-io/core
Toimiva esimerkki embedded mode -tilasta
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // upotetun NATS-palvelimen konfiguroimiseksi
14 // aseta DonListen trueksi
15 DontListen: true,
16 }
17
18 // Alusta uusi palvelin asetuksilla
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Käynnistä palvelin goroutinen kautta
26 go ns.Start()
27
28 // Odota, että palvelin on valmis yhteyksiä varten
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Yhdistä palvelimeen prosessin sisäisen yhteyden kautta
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Tilaa aihe
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Tulosta viestin tiedot
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Sammuta palvelin (valinnainen)
49 ns.Shutdown()
50 })
51
52 // Julkaise tietoja aiheeseen
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Odota palvelimen sammumista
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Nyt voidaan havaita, ettei ylimääräisiä network hop -toimintoja enää tapahdu, kuten oli tarkoituskin.
Under the hood
TL;DR
- Tämä on sequence diagram, joka osoittaa, miten tiettyjä funktioita suoritetaan sisäisesti, kun koodi suoritetaan
main.go:ssa, ja pääkohdat ovat seuraavat:DontListen: trueohittaa serverinAcceptLoop-vaiheen, joka on client listening phase.- Jos clientin Connect-option
InProcessServeron aktivoitu, se luo in-memory connectionin janet.Pipe:n kautta pipe:n, jonka jälkeen pipe:n pää palautetaan clientillenet.Conn-tyyppinä. - Client ja server kommunikoivat in-process communication -tilassa tämän connectionin kautta.
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Odota clientteja.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Ensinnäkin, jos
DontListenon true,AcceptLoop-vaihe, joka on client listening phase, ohitetaan.
1// nats-server/server/server.go
2
3// AcceptLoop on eksportoitu helpompaa testausta varten.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Jos meidän pitäisi poistua ennen kuin kuuntelija on asennettu oikein,
6 // varmista, että suljemme kanavan.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Tilannekuva palvelimen asetuksista.
18 opts := s.getOpts()
19
20 // Määritä tila, joka voi mahdollistaa sammutuksen
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Virhe kuunnellessa porttia: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Kuunnellaan client-yhteyksiä osoitteessa %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Ilmoita TLS:n käytöstä.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS vaaditaan client-yhteyksille")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clientit, jotka eivät käytä \"TLS Handshake First\" -optiota, eivät pysty muodostamaan yhteyttä")
38 }
39 }
40
41 // Jos palvelin käynnistettiin RANDOM_PORT (-1) -asetuksella, opts.Port olisi yhtä suuri
42 // kuin 0 tämän funktion alussa. Joten meidän on hankittava todellinen portti
43 if opts.Port == 0 {
44 // Kirjoita ratkaistu portti takaisin asetuksiin.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Nyt kun portti on asetettu (jos se oli asetettu RANDOM-tilaan), aseta
49 // palvelimen info Host/Port joko Options- tai ClientAdvertise-arvoilla.
50 if err := s.setInfoHostPort(); err != nil {
51 s.Fatalf("Virhe asetettaessa palvelimen INFOa ClientAdvertise-arvolla %s, err=%v", opts.ClientAdvertise, err)
52 l.Close()
53 s.mu.Unlock()
54 return
55 }
56 // Pidä kirjaa client-yhteys-URL-osoitteista. Saatamme tarvita niitä myöhemmin.
57 s.clientConnectURLs = s.getClientConnectURLs()
58 s.listener = l
59
60 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
61 func(_ error) bool {
62 if s.isLameDuckMode() {
63 // Ilmoita, että emme hyväksy uusia clientteja
64 s.ldmCh <- true
65 // Odota nyt sammutusta...
66 <-s.quitCh
67 return true
68 }
69 return false
70 })
71 s.mu.Unlock()
72
73 // Ilmoita kutsujalle, että olemme valmiita
74 close(clr)
75 clr = nil
76}
- Huomautettakoon, että AcceptLoop-funktio suorittaa seuraavat vaiheet. Nämä osat, kuten
TLSjahostPort, liittyvät network communication -toimintoihin, ja ne ovat tarpeettomia in-process communication -tilassa, joten ne voidaan jättää pois.
Client
InProcessServer
1
2// nats-go/nats.go
3
4// Connect yrittää yhdistää NATS-järjestelmään.
5// URL voi sisältää käyttäjänimi/salasana-semantiikkaa. esim. nats://derek:pass@localhost:4222
6// Pilkuilla erotetut taulukot ovat myös tuettuja, esim. urlA, urlB.
7// Asetukset alkavat oletusarvoilla, mutta ne voidaan ohittaa.
8// Yhdistääksesi NATS-palvelimen websocket-porttiin, käytä `ws` tai `wss` skeemaa, kuten
9// `ws://localhost:8080`. Huomaa, että websocket-skeemoja ei voida sekoittaa muiden (nats/tls) kanssa.
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Optioita voidaan käyttää mukautetun yhteyden luomiseen.
4type Options struct {
5 // Url edustaa yhtä NATS-palvelimen URL-osoitetta, johon client
6 // yhdistää. Jos Servers-optio on myös asetettu, siitä tulee
7 // ensimmäinen palvelin Servers-taulukossa.
8 Url string
9
10 // InProcessServer edustaa NATS-palvelinta, joka on käynnissä
11 // samassa prosessissa. Jos tämä on asetettu, yritämme yhdistää
12 // palvelimeen suoraan TCP-yhteyksien sijaan.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
Connect-funktio, joka hoitaa nats serverin ja nats clientin välisen yhteyden, mahdollistaa client URL:n ja connect Option:n asettamisen. Näiden Option:ien kokoelmassa, Options structissa, onInProcessConnProviderinterface-tyyppinenInProcessServer-kenttä.
1// main.go of example code
2
3// Alusta uusi palvelin asetuksilla
4ns, err := server.NewServer(opts)
5
6//...
7
8// Yhdistä palvelimeen prosessin sisäisen yhteyden kautta
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Kun nats client muodostaa yhteyden ja
nats.InProcessServer(ns)välitetäänInProcessServer-kenttään,
1// nats-go/nats.go
2
3// createConn yhdistää palvelimeen ja käärii asianmukaiset
4// bufio-rakenteet. Se tekee oikein, kun olemassa oleva
5// yhteys on käytössä.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Jos meillä on viittaus in-process-palvelimeen, muodosta yhteys
15 // käyttämällä sitä.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("epäonnistui in-process-yhteyden saaminen: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- kyseinen interface korvataan embedded nats serverillä ja
1// nats-server/server/server.go
2
3// InProcessConn palauttaa prosessin sisäisen yhteyden palvelimeen,
4// välttäen TCP-kuuntelijan tarpeen paikallista yhteyttä varten
5// samassa prosessissa. Tätä voidaan käyttää riippumatta
6// DontListen-option tilasta.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("yhteyden luominen epäonnistui")
16 }
17 return pr, nil
18}
- serveriin toteutettua
InProcessConn:ia kutsutaan ja suoritetaan. - Kyseinen funktio kutsutaan natsin Go clientissä
nats.go, jos nc (nats connection)InProcessServerei ole nil (eli validi), jolloin se luo connectionin (net.Conn) ja sitoo sen serverin connectioniin.
Consumer driven interface of Go
Tyyppi toteuttaa rajapinnan toteuttamalla sen metodit. Tarkoituksesta ei ole eksplisiittistä ilmoitusta, ei "implements"-avainsanaa. Implisiittiset rajapinnat irrottavat rajapinnan määrittelyn sen toteutuksesta, joka voi sitten esiintyä missä tahansa paketissa ilman ennalta sopimista.
Interfaces are implemented implicitly, A Tour of Go
Jos tyyppi on olemassa vain rajapinnan toteuttamiseksi eikä sillä koskaan ole rajapinnan ulkopuolisia eksportoituja metodeja, tyyppiä itsessään ei tarvitse eksportoida.
- Tämä interface design ilmentää hyvin Gon yleisesti tunnettuja consumer defined interface- ja structural typing (duck typing) -konsepteja, ja haluan esitellä ne tässä yhteydessä.
1// nats-go/nats.go
2
3// Optioita voidaan käyttää mukautetun yhteyden luomiseen.
4type Options struct {
5 // Url edustaa yhtä NATS-palvelimen URL-osoitetta, johon client
6 // yhdistää. Jos Servers-optio on myös asetettu, siitä tulee
7 // ensimmäinen palvelin Servers-taulukossa.
8 Url string
9
10 // InProcessServer edustaa NATS-palvelinta, joka on käynnissä
11 // samassa prosessissa. Jos tämä on asetettu, yritämme yhdistää
12 // palvelimeen suoraan TCP-yhteyksien sijaan.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Palataanpa koodiin. nats.go clientissä
InProcessServeroption struct field on määriteltyInProcessConnProvider-rajapinnaksi, joka suorittaa vainInProcessConn:n.
1// nats-server/server/server.go
2
3// InProcessConn palauttaa prosessin sisäisen yhteyden palvelimeen,
4// välttäen TCP-kuuntelijan tarpeen paikallista yhteyttä varten
5// samassa prosessissa. Tätä voidaan käyttää riippumatta
6// DontListen-option tilasta.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("yhteyden luominen epäonnistui")
16 }
17 return pr, nil
18}
- Kuitenkin siihen syötettävä tyyppi on nats-serverin
Server, joka suorittaaInProcessConn:n lisäksi monia muita toimintoja. - Tämä johtuu siitä, että clientin kiinnostus kyseisessä tilanteessa on ainoastaan siinä, onko
InProcessConn-rajapinta tarjottu vai ei; muut asiat eivät ole kovin merkittäviä. - Siksi nats.go client luo ja käyttää ainoastaan
InProcessConnProvider-nimistä consumer defined interfacea, joka määrittelee vain toiminnallisuudenInProcessConn() (net.Conn, error).
Johtopäätös
- Olemme lyhyesti käsitelleet NATS:n embedded mode -tilaa ja sen toimintaa, sekä Gon consumer defined interfacea, joka voidaan havaita NATS:n koodista.
- Toivon, että tämä tieto on hyödyllistä niille, jotka käyttävät NATSia vastaaviin tarkoituksiin, ja tähän päättyy tämä artikkeli.