Kuinka sulautetut NATS-palvelimet kommunikoivat Go-sovelluksen kanssa?
Aloittaminen
Tietoja NATSista
Ohjelmistosovellusten ja -palveluiden on vaihdettava tietoja. NATS on infrastruktuuri, joka mahdollistaa tällaisen tiedonvaihdon, segmentoituna viestien muodossa. Kutsumme tätä ”viestiorientoituneeksi väliohjelmistoksi”.
NATSin avulla sovelluskehittäjät voivat:
- Rakentaa vaivattomasti hajautettuja ja skaalautuvia client-server-sovelluksia.
- Tallentaa ja jakaa tietoja reaaliaikaisesti yleisellä tavalla. Tämä voidaan saavuttaa joustavasti eri ympäristöissä, kielissä, pilvipalveluntarjoajilla ja paikallisissa järjestelmissä.
- NATS on Go-kielellä toteutettu viestivälittäjä.
Embedded NATS
Jos sovelluksesi on Go-kielellä, ja jos se sopii käyttötapaukseesi ja käyttöönotto-skenaarioihisi, voit jopa upottaa NATS-palvelimen sovellukseesi.
- Lisäksi NATSissa on erikoisuus: Go-kielellä toteutettujen sovellusten osalta se tukee embedded mode -tilaa.
- Toisin sanoen, viestivälittäjän tyypillisen tavan, jossa erillinen välittäjäpalvelin käynnistetään ja sovelluksen clientit kommunikoivat sen kanssa, sijaan, välittäjä itsessään voidaan upottaa (embed) Go-kielellä tehtyyn sovellukseen.
Embedded NATS -toiminnallisuuden edut ja käyttötapaukset
- Oheinen Youtube-video selittää asian hyvin, joten viittaan siihen.
- Vaikka erillistä viestivälittäjäpalvelinta ei otettaisikaan käyttöön, voidaan luoda modular monolith application, joka saavuttaa separate of concern -periaatteen ja johon NATS voidaan upottaa embedded-tilassa. Lisäksi single binary deployment tulee mahdolliseksi.
- Se on hyödyllinen paitsi platform with no network (wasm) -ympäristöissä, myös offline-first application -sovelluksissa.
Esimerkki virallisista dokumenteista
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Alustaa uuden palvelimen asetuksilla
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Käynnistää palvelimen goroutinen kautta
22 go ns.Start()
23
24 // Odottaa, että palvelin on valmis yhteyksille
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Yhdistää palvelimeen
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Tilaa aiheen
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Tulostaa viestin tiedot
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Sammuttaa palvelimen (valinnainen)
45 ns.Shutdown()
46 })
47
48 // Julkaisee tiedot aiheeseen
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Odottaa palvelimen sammumista
52 ns.WaitForShutdown()
53}
- NATS:in virallisen dokumentaation Embedded NATS -esimerkki ei mahdollista kommunikaatiota embedded mode -tilassa, jos sitä ajetaan sellaisenaan.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Kun suoritat Go-tiedoston
go run .
-komennolla ja tarkistat localhostiin (127.0.0.1) menevää verkkoliikennettä komennollawatch 'netstat -an | grep 127.0.0.1'
, huomaat, että NATS:in oletusportista4222
lähteviä uusia verkkopyyntöjä lisääntyy.
Oikeat konfiguraatiot embedded mode -tilaan
Jotta kommunikaatio tapahtuisi embedded mode -tilassa tarkoitetulla tavalla, tarvitaan seuraavat kaksi vaihtoehtoa:
- Client: On lisättävä
InProcessServer
-vaihtoehto. - Server:
Server.Options
-asetuksiin on määritettäväDontListen
-lipputrue
:ksi.
- Client: On lisättävä
Nämä osat eivät olleet virallisesti dokumentoituja, ja tämän ominaisuuden alkuperä löytyy tästä PR:stä.
Tämä PR lisää kolme asiaa:
InProcessConn()
-funktionServer
-objektiin, joka rakentaanet.Pipe
-yhteyden NATS-palvelimeen ilman TCP-socketien käyttöä.DontListen
-asetuksen, joka käskee NATS-palvelimen olemaan kuuntelematta tavallisella TCP-kuuntelijalla.startupComplete
-kanavan, joka suljetaan juuri ennenAcceptLoop
-toiminnon aloittamista, jareadyForConnections
odottaa sitä.
Tärkein motivaatio tähän on, että meillä on sovellus, joka voi toimia joko monoliittisessa (yhden prosessin) tai polyliittisessä (monen prosessin) tilassa. Haluamme pystyä käyttämään NATSia molemmissa tiloissa yksinkertaisuuden vuoksi, mutta monoliittisen tilan on kyettävä palvelemaan erilaisia alustoja, joissa socket-yhteyksien avaaminen joko ei ole järkevää (mobiili) tai ei vain ole mahdollista (WASM). Nämä muutokset mahdollistavat NATSin käytön kokonaan prosessin sisällä.
Mukana tuleva PR nats-io/nats.go#774 lisää tuen client-puolelle.
Tämä on ensimmäinen PR:ni tähän projektiin, joten pyydän anteeksi etukäteen, jos olen missannut jotain ilmeistä.
/cc @nats-io/core
Toimiva esimerkki embedded mode -tilasta
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // upotetun NATS-palvelimen konfiguroimiseksi
14 // aseta DonListen arvoksi true
15 DontListen: true,
16 }
17
18 // Alustaa uuden palvelimen asetuksilla
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Käynnistää palvelimen goroutinen kautta
26 go ns.Start()
27
28 // Odottaa, että palvelin on valmis yhteyksille
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Yhdistää palvelimeen prosessin sisäisen yhteyden kautta
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Tilaa aiheen
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Tulostaa viestin tiedot
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Sammuttaa palvelimen (valinnainen)
49 ns.Shutdown()
50 })
51
52 // Julkaisee tiedot aiheeseen
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Odottaa palvelimen sammumista
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Nyt voidaan havaita, ettei ylimääräisiä network hop -hyppyjä tapahdu, kuten oli tarkoitus.
Under the hood
TL;DR
- Kun tämä koodi suoritetaan
main.go
-tiedostossa, se on sekvenssikaavio, joka näyttää, miten sisäiset funktiot toimivat. Ydinasiat ovat seuraavat:DontListen: true
-asetuksen avulla palvelin ohittaaAcceptLoop
-nimisen client listening phase -vaiheen.- Jos clientin Connect option -asetuksessa
InProcessServer
on aktivoitu, se luo in-memory connection -yhteyden ja rakentaa pipennet.Pipe
-toiminnolla, jonka jälkeen se palauttaa pipen pään clientillenet.Conn
-tyyppinä. - Client ja serveri kommunikoivat in-process -tilassa tämän yhteyden kautta.
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Odottaa clienttejä.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Ensinnäkin, jos
DontListen
on true,AcceptLoop
-niminen client listening phase ohitetaan.
1// nats-server/server/server.go
2
3// AcceptLoop on eksportoitu helpottamaan testausta.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Jos meidän pitäisi poistua ennen kuin kuuntelija on asetettu oikein,
6 // varmista, että suljemme kanavan.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Tilannekuva palvelimen asetuksista.
18 opts := s.getOpts()
19
20 // Aseta tila, joka voi mahdollistaa sammutuksen
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Virhe kuunneltaessa porttia: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Kuuntelee client-yhteyksiä osoitteessa %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Ilmoitus TLS:n käytöstä.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS vaaditaan client-yhteyksiin")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clientit, jotka eivät käytä \"TLS Handshake First\" -vaihtoehtoa, eivät voi muodostaa yhteyttä")
38 }
39 }
40
41 // Jos palvelin käynnistettiin RANDOM_PORT (-1) -asetuksella, opts.Port olisi
42 // aluksi 0 tässä funktiossa. Joten meidän on saatava todellinen portti
43 if opts.Port == 0 {
44 // Kirjoita ratkaistu portti takaisin asetuksiin.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Nyt kun portti on asetettu (jos se oli asetettu RANDOM-tilaan), aseta
49 // palvelimen info Host/Port joko Options-arvoista tai
50 // ClientAdvertise-arvoista.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Virhe palvelimen INFO-asetuksessa ClientAdvertise-arvolla %s, virhe=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Pidä kirjaa client-yhteys-URL-osoitteista. Saatamme tarvita niitä myöhemmin.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signaali, että emme hyväksy uusia clienttejä
65 s.ldmCh <- true
66 // Nyt odotetaan sammutusta...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Ilmoita kutsujalle, että olemme valmiita
75 close(clr)
76 clr = nil
77}
- Muuten, AcceptLoop-funktio suorittaa seuraavat prosessit. Nämä ovat osia, jotka liittyvät verkkokommunikaatioon, kuten
TLS
taihostPort
, ja jotka voidaan jättää pois, jos kyseessä on in-process communication.
Client
InProcessServer
1
2// nats-go/nats.go
3
4// Connect yrittää muodostaa yhteyden NATS-järjestelmään.
5// URL voi sisältää käyttäjätunnus/salasana-semantiikkaa. esim. nats://derek:pass@localhost:4222
6// Pilkuin erotellut taulukot ovat myös tuettuja, esim. urlA, urlB.
7// Asetukset alkavat oletusarvoista, mutta ne voidaan ohittaa.
8// Yhdistääksesi NATS Serverin websocket-porttiin, käytä `ws` tai `wss` -skeemaa, kuten
9// `ws://localhost:8080`. Huomaa, että websocket-skeemoja ei voi sekoittaa muihin (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options-rakennetta voidaan käyttää räätälöidyn yhteyden luomiseen.
4type Options struct {
5 // Url edustaa yksittäistä NATS-palvelimen URL-osoitetta, johon client
6 // muodostaa yhteyden. Jos Servers-asetus on myös asetettu, siitä
7 // tulee ensimmäinen palvelin Servers-taulukossa.
8 Url string
9
10 // InProcessServer edustaa NATS-palvelinta, joka toimii
11 // samassa prosessissa. Jos tämä on asetettu, yritämme muodostaa
12 // yhteyden palvelimeen suoraan TCP-yhteyksien sijaan.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
Connect
-funktio, joka suorittaa nats serverin ja nats clientin välisen yhteyden, voi asettaa client URL:n ja connect Optionin, ja nämä Optionit sisältävä Options-rakenne sisältääInProcessConnProvider
-rajapintatyypinInProcessServer
-kentän.
1// main.go of example code
2
3// Alustaa uuden palvelimen asetuksilla
4ns, err := server.NewServer(opts)
5
6//...
7
8// Yhdistää palvelimeen prosessin sisäisen yhteyden kautta
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Kun nats client muodostaa yhteyden ja välittää
nats.InProcessServer(ns)
:nInProcessServer
-kenttään,
1// nats-go/nats.go
2
3// createConn muodostaa yhteyden palvelimeen ja käärii asianmukaiset
4// bufio-rakenteet. Se toimii oikein, kun olemassa oleva
5// yhteys on käytössä.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Jos meillä on viittaus prosessin sisäiseen palvelimeen, luo
15 // yhteys sitä käyttäen.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("failed to get in-process connection: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- kyseinen rajapinta suorittaa
InProcessServer
-vaihtoehdonInProcesConn
-funktioncreateConn
-funktiossa, josInProcessServer
-vaihtoehto ei ole nil (eli on validi), ja
1// nats-server/server/server.go
2
3// InProcessConn palauttaa prosessin sisäisen yhteyden palvelimeen,
4// välttäen TCP-kuuntelijan tarpeen paikallisessa yhteydenmuodostuksessa
5// samassa prosessissa. Tätä voidaan käyttää riippumatta
6// DontListen-asetuksen tilasta.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- serveriin toteutettua
InProcessConn
-funktiota kutsutaan ja suoritetaan. - Tämä funktio kutsutaan natsin Go-clientissä
nats.go
, kun nc:n (nats connection)InProcessServer
ei ole nil, ja se luo yhteyden (net.Conn
) ja sitoo sen serverin yhteyteen.
Go:n consumer driven interface
Tyyppi toteuttaa rajapinnan toteuttamalla sen metodit. Tarkoituksen nimenomaista ilmoitusta tai "implements"-avainsanaa ei ole. Implisiittiset rajapinnat irrottavat rajapinnan määrittelyn sen toteutuksesta, joka voi sitten esiintyä missä tahansa paketissa ilman ennakkojärjestelyjä.
Interfaces are implemented implicitly, A Tour of Go
Jos tyyppi on olemassa vain rajapinnan toteuttamiseksi eikä sillä ole koskaan rajapinnan ulkopuolisia eksportoituja metodeja, tyyppiä itseään ei tarvitse eksportoida.
- Tämä rajapintasuunnittelu ilmentää hyvin Go-kielessä usein puhuttua consumer defined interface -periaatetta ja structural typing (duck typing) -tyyppisyyttä, joten haluan esitellä myös tämän aiheen.
1// nats-go/nats.go
2
3// Options-rakennetta voidaan käyttää räätälöidyn yhteyden luomiseen.
4type Options struct {
5 // Url edustaa yksittäistä NATS-palvelimen URL-osoitetta, johon client
6 // muodostaa yhteyden. Jos Servers-asetus on myös asetettu, siitä
7 // tulee ensimmäinen palvelin Servers-taulukossa.
8 Url string
9
10 // InProcessServer edustaa NATS-palvelinta, joka toimii
11 // samassa prosessissa. Jos tämä on asetettu, yritämme muodostaa
12 // yhteyden palvelimeen suoraan TCP-yhteyksien sijaan.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Palataan koodiin. nats.go clientin
InProcessServer
option struct field on määriteltyInProcessConnProvider
-rajapinnaksi, joka suorittaa vainInProcessConn
-toiminnon.
1// nats-server/server/server.go
2
3// InProcessConn palauttaa prosessin sisäisen yhteyden palvelimeen,
4// välttäen TCP-kuuntelijan tarpeen paikallisessa yhteydenmuodostuksessa
5// samassa prosessissa. Tätä voidaan käyttää riippumatta
6// DontListen-asetuksen tilasta.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- Kuitenkin sen tyyppi on nats-serverin
Server
, joka suorittaaInProcessConn
-toiminnon lisäksi monia muita toimintoja. - Tämä johtuu siitä, että tässä tilanteessa clientin kiinnostuksen kohteena on vain se, onko
InProcessConn
-rajapinta tarjottu vai ei; muut asiat eivät ole kovin tärkeitä. - Siksi nats.go client luo ja käyttää vain
InProcessConnProvider
-nimistä consumer defined interface -rajapintaa, joka määrittelee ainoastaanInProcessConn() (net.Conn, error)
-toiminnon.
Johtopäätös
- Olen käsitellyt lyhyesti NATSin embedded mode -tilaa ja sen toimintatapaa sekä Go-kielen consumer defined interface -periaatetta, joka on havaittavissa NATSin koodista.
- Toivon, että tämä tieto on hyödyllistä niille, jotka käyttävät NATSia vastaaviin tarkoituksiin, ja tähän päättyy tämä artikkeli.