GoSuda

Miten sulautetut NATS-instanssit kommunikoivat Go-sovelluksen kanssa?

By prravda
views ...

Aloitus

Tietoja NATSista

Ohjelmistosovellusten ja -palveluiden on vaihdettava tietoja. NATS on infrastruktuuri, joka mahdollistaa tällaisen tiedonvaihdon, segmentoituna viestien muodossa. Kutsumme tätä "viestiorientoituneeksi väliohjelmistoksi".

NATSilla sovelluskehittäjät voivat:

  • Rakentaa vaivattomasti hajautettuja ja skaalautuvia client-server-sovelluksia.
  • Tallentaa ja jakaa tietoja reaaliaikaisesti yleisellä tavalla. Tämä voidaan joustavasti saavuttaa eri ympäristöissä, kielillä, pilvipalveluntarjoajilla ja on-premises-järjestelmissä.

What is NATS, NATS docs

  • NATS on Go-kielellä toteutettu message broker.

Embedded NATS

Jos sovelluksesi on Go-kielellä ja se sopii käyttötapaukseesi ja käyttöönottoympäristöösi, voit jopa upottaa NATS-palvelimen sovellukseesi.

Embedding NATS, NATS docs

  • NATSilla on erityispiirre: se tukee embedded mode -tilaa Go-kielellä toteutetuille sovelluksille.
  • Tämä tarkoittaa, että sen sijaan, että viestivälittäjän yleinen tapa olisi erillisen broker-palvelimen käynnistäminen ja sovelluksen clientin kommunikointi sen kanssa, brokerin itsensä voi upottaa (embed) Go-kielellä tehtyyn sovellukseen.

Embedded NATS:n edut ja käyttötapaukset

  • Hyvin selitetty Youtube-video on olemassa, joten viittaan siihen.
  • Vaikka erillistä message broker -palvelinta ei otettaisi käyttöön, on mahdollista luoda modular monolith application ja saavuttaa separate of concern, samalla kun hyödynnetään NATS:n upottamisen etuja. Lisäksi mahdollistuu single binary deployment.
  • Sitä voidaan käyttää hyödyllisesti paitsi platform with no network (WASM) -ympäristöissä, myös offline-first application -sovelluksissa.

Esimerkki virallisista dokumenteista

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Alusta uusi palvelin asetuksilla
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Käynnistä palvelin goroutinen kautta
22    go ns.Start()
23
24    // Odota, että palvelin on valmis yhteyksiä varten
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Yhdistä palvelimeen
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Tilaa aihe
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Tulosta viestin tiedot
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Sammuta palvelin (valinnainen)
45        ns.Shutdown()
46    })
47
48    // Julkaise tietoja aiheeseen
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Odota palvelimen sammumista
52    ns.WaitForShutdown()
53}
  • Tämä on NATS-virallisdokumentaatiossa esitetty Embedded NATS -esimerkki, mutta jos tätä esimerkkikoodia noudatetaan, communication ei tapahdu embedding mode -tilassa.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Komennolla watch 'netstat -an | grep 127.0.0.1' voidaan tarkastella localhost (127.0.0.1) -verkkoa ja kun kyseinen Go-tiedosto suoritetaan komennolla go run ., voidaan havaita uusia verkkopyyntöjä NATS:n oletusportista 4222.

Oikeat konfiguraatiot embedded mode -tilaa varten

  • Jotta communication tapahtuisi tarkoitetussa embedded mode -tilassa, tarvitaan seuraavat kaksi vaihtoehtoa.

    • Client: InProcessServer-optio on annettava.
    • Server: Server.Options:iin on määritettävä DontListen-lippu true:ksi.
  • Nämä osat eivät olleet virallisesti dokumentoituja, ja tämän ominaisuuden alkuperä voidaan jäljittää kyseiseen PR:ään.

    Tämä PR lisää kolme asiaa:

    1. InProcessConn()-funktio Server-objektiin, joka rakentaa net.Pipe-objektin yhteyden muodostamiseksi NATS-palvelimeen ilman TCP-socketsien käyttöä.
    2. DontListen-optio, joka käskee NATS-palvelinta olemaan kuuntelematta tavallista TCP-kuuntelijaa.
    3. startupComplete-kanava, joka suljetaan juuri ennen AcceptLoop-toiminnon käynnistämistä, ja readyForConnections odottaa sitä.

    Tärkein motiivi tälle on, että meillä on sovellus, joka voi toimia joko monoliittisessa (yksiprosessisessa) tai polyliittisessä (moniprosessisessa) tilassa. Haluaisimme voida käyttää NATSia molemmissa tiloissa yksinkertaisuuden vuoksi, mutta monoliittisen tilan on pystyttävä palvelemaan erilaisia alustoja, joissa socket-yhteyksien avaaminen joko ei ole järkevää (mobiili) tai ei vain ole mahdollista (WASM). Nämä muutokset mahdollistavat NATSin käytön kokonaan prosessin sisällä.

    Mukana tuleva PR nats-io/nats.go#774 lisää tuen client-puolelle.

    Tämä on ensimmäinen PR:ni tähän projektiin, joten pyydän anteeksi etukäteen, jos olen jättänyt jotain ilmeistä huomaamatta.

    /cc @nats-io/core

Toimiva esimerkki embedded mode -tilasta

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// upotetun NATS-palvelimen konfiguroimiseksi
14		// aseta DonListen trueksi
15		DontListen: true,
16	}
17
18	// Alusta uusi palvelin asetuksilla
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Käynnistä palvelin goroutinen kautta
26	go ns.Start()
27
28	// Odota, että palvelin on valmis yhteyksiä varten
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Yhdistä palvelimeen prosessin sisäisen yhteyden kautta
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Tilaa aihe
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Tulosta viestin tiedot
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Sammuta palvelin (valinnainen)
49		ns.Shutdown()
50	})
51
52	// Julkaise tietoja aiheeseen
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Odota palvelimen sammumista
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Nyt voidaan havaita, ettei ylimääräisiä network hop -toimintoja enää tapahdu, kuten oli tarkoituskin.

Under the hood

TL;DR

diagram1

  • Tämä on sequence diagram, joka osoittaa, miten tiettyjä funktioita suoritetaan sisäisesti, kun koodi suoritetaan main.go:ssa, ja pääkohdat ovat seuraavat:
    • DontListen: true ohittaa serverin AcceptLoop-vaiheen, joka on client listening phase.
    • Jos clientin Connect-option InProcessServer on aktivoitu, se luo in-memory connectionin ja net.Pipe:n kautta pipe:n, jonka jälkeen pipe:n pää palautetaan clientille net.Conn-tyyppinä.
    • Client ja server kommunikoivat in-process communication -tilassa tämän connectionin kautta.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Odota clientteja.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Ensinnäkin, jos DontListen on true, AcceptLoop-vaihe, joka on client listening phase, ohitetaan.
 1// nats-server/server/server.go
 2
 3// AcceptLoop on eksportoitu helpompaa testausta varten.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Jos meidän pitäisi poistua ennen kuin kuuntelija on asennettu oikein,
 6	// varmista, että suljemme kanavan.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Tilannekuva palvelimen asetuksista.
18	opts := s.getOpts()
19
20	// Määritä tila, joka voi mahdollistaa sammutuksen
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Virhe kuunnellessa porttia: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Kuunnellaan client-yhteyksiä osoitteessa %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Ilmoita TLS:n käytöstä.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS vaaditaan client-yhteyksille")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clientit, jotka eivät käytä \"TLS Handshake First\" -optiota, eivät pysty muodostamaan yhteyttä")
38		}
39	}
40
41	// Jos palvelin käynnistettiin RANDOM_PORT (-1) -asetuksella, opts.Port olisi yhtä suuri
42	// kuin 0 tämän funktion alussa. Joten meidän on hankittava todellinen portti
43	if opts.Port == 0 {
44		// Kirjoita ratkaistu portti takaisin asetuksiin.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Nyt kun portti on asetettu (jos se oli asetettu RANDOM-tilaan), aseta
49	// palvelimen info Host/Port joko Options- tai ClientAdvertise-arvoilla.
50	if err := s.setInfoHostPort(); err != nil {
51		s.Fatalf("Virhe asetettaessa palvelimen INFOa ClientAdvertise-arvolla %s, err=%v", opts.ClientAdvertise, err)
52		l.Close()
53		s.mu.Unlock()
54		return
55	}
56	// Pidä kirjaa client-yhteys-URL-osoitteista. Saatamme tarvita niitä myöhemmin.
57	s.clientConnectURLs = s.getClientConnectURLs()
58	s.listener = l
59
60	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
61		func(_ error) bool {
62			if s.isLameDuckMode() {
63				// Ilmoita, että emme hyväksy uusia clientteja
64				s.ldmCh <- true
65				// Odota nyt sammutusta...
66				<-s.quitCh
67				return true
68			}
69			return false
70		})
71	s.mu.Unlock()
72
73	// Ilmoita kutsujalle, että olemme valmiita
74	close(clr)
75	clr = nil
76}
  • Huomautettakoon, että AcceptLoop-funktio suorittaa seuraavat vaiheet. Nämä osat, kuten TLS ja hostPort, liittyvät network communication -toimintoihin, ja ne ovat tarpeettomia in-process communication -tilassa, joten ne voidaan jättää pois.

Client

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect yrittää yhdistää NATS-järjestelmään.
 5// URL voi sisältää käyttäjänimi/salasana-semantiikkaa. esim. nats://derek:pass@localhost:4222
 6// Pilkuilla erotetut taulukot ovat myös tuettuja, esim. urlA, urlB.
 7// Asetukset alkavat oletusarvoilla, mutta ne voidaan ohittaa.
 8// Yhdistääksesi NATS-palvelimen websocket-porttiin, käytä `ws` tai `wss` skeemaa, kuten
 9// `ws://localhost:8080`. Huomaa, että websocket-skeemoja ei voida sekoittaa muiden (nats/tls) kanssa.
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Optioita voidaan käyttää mukautetun yhteyden luomiseen.
 4type Options struct {
 5	// Url edustaa yhtä NATS-palvelimen URL-osoitetta, johon client
 6	// yhdistää. Jos Servers-optio on myös asetettu, siitä tulee
 7	// ensimmäinen palvelin Servers-taulukossa.
 8	Url string
 9
10	// InProcessServer edustaa NATS-palvelinta, joka on käynnissä
11	// samassa prosessissa. Jos tämä on asetettu, yritämme yhdistää
12	// palvelimeen suoraan TCP-yhteyksien sijaan.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Connect-funktio, joka hoitaa nats serverin ja nats clientin välisen yhteyden, mahdollistaa client URL:n ja connect Option:n asettamisen. Näiden Option:ien kokoelmassa, Options structissa, on InProcessConnProvider interface-tyyppinen InProcessServer-kenttä.
1// main.go of example code
2
3// Alusta uusi palvelin asetuksilla
4ns, err := server.NewServer(opts)
5
6//...
7
8// Yhdistä palvelimeen prosessin sisäisen yhteyden kautta
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Kun nats client muodostaa yhteyden ja nats.InProcessServer(ns) välitetään InProcessServer-kenttään,
 1// nats-go/nats.go
 2
 3// createConn yhdistää palvelimeen ja käärii asianmukaiset
 4// bufio-rakenteet. Se tekee oikein, kun olemassa oleva
 5// yhteys on käytössä.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Jos meillä on viittaus in-process-palvelimeen, muodosta yhteys
15	// käyttämällä sitä.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("epäonnistui in-process-yhteyden saaminen: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • kyseinen interface korvataan embedded nats serverillä ja
 1// nats-server/server/server.go
 2
 3// InProcessConn palauttaa prosessin sisäisen yhteyden palvelimeen,
 4// välttäen TCP-kuuntelijan tarpeen paikallista yhteyttä varten
 5// samassa prosessissa. Tätä voidaan käyttää riippumatta
 6// DontListen-option tilasta.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("yhteyden luominen epäonnistui")
16	}
17	return pr, nil
18}
  • serveriin toteutettua InProcessConn:ia kutsutaan ja suoritetaan.
  • Kyseinen funktio kutsutaan natsin Go clientissä nats.go, jos nc (nats connection) InProcessServer ei ole nil (eli validi), jolloin se luo connectionin (net.Conn) ja sitoo sen serverin connectioniin.

Consumer driven interface of Go

Tyyppi toteuttaa rajapinnan toteuttamalla sen metodit. Tarkoituksesta ei ole eksplisiittistä ilmoitusta, ei "implements"-avainsanaa. Implisiittiset rajapinnat irrottavat rajapinnan määrittelyn sen toteutuksesta, joka voi sitten esiintyä missä tahansa paketissa ilman ennalta sopimista.

Interfaces are implemented implicitly, A Tour of Go

Jos tyyppi on olemassa vain rajapinnan toteuttamiseksi eikä sillä koskaan ole rajapinnan ulkopuolisia eksportoituja metodeja, tyyppiä itsessään ei tarvitse eksportoida.

Generality, Effective Go

  • Tämä interface design ilmentää hyvin Gon yleisesti tunnettuja consumer defined interface- ja structural typing (duck typing) -konsepteja, ja haluan esitellä ne tässä yhteydessä.
 1// nats-go/nats.go
 2
 3// Optioita voidaan käyttää mukautetun yhteyden luomiseen.
 4type Options struct {
 5	// Url edustaa yhtä NATS-palvelimen URL-osoitetta, johon client
 6	// yhdistää. Jos Servers-optio on myös asetettu, siitä tulee
 7	// ensimmäinen palvelin Servers-taulukossa.
 8	Url string
 9
10	// InProcessServer edustaa NATS-palvelinta, joka on käynnissä
11	// samassa prosessissa. Jos tämä on asetettu, yritämme yhdistää
12	// palvelimeen suoraan TCP-yhteyksien sijaan.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Palataanpa koodiin. nats.go clientissä InProcessServer option struct field on määritelty InProcessConnProvider-rajapinnaksi, joka suorittaa vain InProcessConn:n.
 1// nats-server/server/server.go
 2
 3// InProcessConn palauttaa prosessin sisäisen yhteyden palvelimeen,
 4// välttäen TCP-kuuntelijan tarpeen paikallista yhteyttä varten
 5// samassa prosessissa. Tätä voidaan käyttää riippumatta
 6// DontListen-option tilasta.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("yhteyden luominen epäonnistui")
16	}
17	return pr, nil
18}
  • Kuitenkin siihen syötettävä tyyppi on nats-serverin Server, joka suorittaa InProcessConn:n lisäksi monia muita toimintoja.
  • Tämä johtuu siitä, että clientin kiinnostus kyseisessä tilanteessa on ainoastaan siinä, onko InProcessConn-rajapinta tarjottu vai ei; muut asiat eivät ole kovin merkittäviä.
  • Siksi nats.go client luo ja käyttää ainoastaan InProcessConnProvider-nimistä consumer defined interfacea, joka määrittelee vain toiminnallisuuden InProcessConn() (net.Conn, error).

Johtopäätös

  • Olemme lyhyesti käsitelleet NATS:n embedded mode -tilaa ja sen toimintaa, sekä Gon consumer defined interfacea, joka voidaan havaita NATS:n koodista.
  • Toivon, että tämä tieto on hyödyllistä niille, jotka käyttävät NATSia vastaaviin tarkoituksiin, ja tähän päättyy tämä artikkeli.