GoSuda

Kuinka sulautetut NATS-palvelimet kommunikoivat Go-sovelluksen kanssa?

By prravda
views ...

Aloittaminen

Tietoja NATSista

Ohjelmistosovellusten ja -palveluiden on vaihdettava tietoja. NATS on infrastruktuuri, joka mahdollistaa tällaisen tiedonvaihdon, segmentoituna viestien muodossa. Kutsumme tätä ”viestiorientoituneeksi väliohjelmistoksi”.

NATSin avulla sovelluskehittäjät voivat:

  • Rakentaa vaivattomasti hajautettuja ja skaalautuvia client-server-sovelluksia.
  • Tallentaa ja jakaa tietoja reaaliaikaisesti yleisellä tavalla. Tämä voidaan saavuttaa joustavasti eri ympäristöissä, kielissä, pilvipalveluntarjoajilla ja paikallisissa järjestelmissä.

What is NATS, NATS docs

  • NATS on Go-kielellä toteutettu viestivälittäjä.

Embedded NATS

Jos sovelluksesi on Go-kielellä, ja jos se sopii käyttötapaukseesi ja käyttöönotto-skenaarioihisi, voit jopa upottaa NATS-palvelimen sovellukseesi.

Embedding NATS, NATS docs

  • Lisäksi NATSissa on erikoisuus: Go-kielellä toteutettujen sovellusten osalta se tukee embedded mode -tilaa.
  • Toisin sanoen, viestivälittäjän tyypillisen tavan, jossa erillinen välittäjäpalvelin käynnistetään ja sovelluksen clientit kommunikoivat sen kanssa, sijaan, välittäjä itsessään voidaan upottaa (embed) Go-kielellä tehtyyn sovellukseen.

Embedded NATS -toiminnallisuuden edut ja käyttötapaukset

  • Oheinen Youtube-video selittää asian hyvin, joten viittaan siihen.
  • Vaikka erillistä viestivälittäjäpalvelinta ei otettaisikaan käyttöön, voidaan luoda modular monolith application, joka saavuttaa separate of concern -periaatteen ja johon NATS voidaan upottaa embedded-tilassa. Lisäksi single binary deployment tulee mahdolliseksi.
  • Se on hyödyllinen paitsi platform with no network (wasm) -ympäristöissä, myös offline-first application -sovelluksissa.

Esimerkki virallisista dokumenteista

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Alustaa uuden palvelimen asetuksilla
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Käynnistää palvelimen goroutinen kautta
22    go ns.Start()
23
24    // Odottaa, että palvelin on valmis yhteyksille
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Yhdistää palvelimeen
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Tilaa aiheen
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Tulostaa viestin tiedot
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Sammuttaa palvelimen (valinnainen)
45        ns.Shutdown()
46    })
47
48    // Julkaisee tiedot aiheeseen
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Odottaa palvelimen sammumista
52    ns.WaitForShutdown()
53}
  • NATS:in virallisen dokumentaation Embedded NATS -esimerkki ei mahdollista kommunikaatiota embedded mode -tilassa, jos sitä ajetaan sellaisenaan.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Kun suoritat Go-tiedoston go run . -komennolla ja tarkistat localhostiin (127.0.0.1) menevää verkkoliikennettä komennolla watch 'netstat -an | grep 127.0.0.1', huomaat, että NATS:in oletusportista 4222 lähteviä uusia verkkopyyntöjä lisääntyy.

Oikeat konfiguraatiot embedded mode -tilaan

  • Jotta kommunikaatio tapahtuisi embedded mode -tilassa tarkoitetulla tavalla, tarvitaan seuraavat kaksi vaihtoehtoa:

    • Client: On lisättävä InProcessServer-vaihtoehto.
    • Server: Server.Options -asetuksiin on määritettävä DontListen-lippu true:ksi.
  • Nämä osat eivät olleet virallisesti dokumentoituja, ja tämän ominaisuuden alkuperä löytyy tästä PR:stä.

    Tämä PR lisää kolme asiaa:

    1. InProcessConn()-funktion Server-objektiin, joka rakentaa net.Pipe-yhteyden NATS-palvelimeen ilman TCP-socketien käyttöä.
    2. DontListen-asetuksen, joka käskee NATS-palvelimen olemaan kuuntelematta tavallisella TCP-kuuntelijalla.
    3. startupComplete-kanavan, joka suljetaan juuri ennen AcceptLoop-toiminnon aloittamista, ja readyForConnections odottaa sitä.

    Tärkein motivaatio tähän on, että meillä on sovellus, joka voi toimia joko monoliittisessa (yhden prosessin) tai polyliittisessä (monen prosessin) tilassa. Haluamme pystyä käyttämään NATSia molemmissa tiloissa yksinkertaisuuden vuoksi, mutta monoliittisen tilan on kyettävä palvelemaan erilaisia alustoja, joissa socket-yhteyksien avaaminen joko ei ole järkevää (mobiili) tai ei vain ole mahdollista (WASM). Nämä muutokset mahdollistavat NATSin käytön kokonaan prosessin sisällä.

    Mukana tuleva PR nats-io/nats.go#774 lisää tuen client-puolelle.

    Tämä on ensimmäinen PR:ni tähän projektiin, joten pyydän anteeksi etukäteen, jos olen missannut jotain ilmeistä.

    /cc @nats-io/core

Toimiva esimerkki embedded mode -tilasta

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// upotetun NATS-palvelimen konfiguroimiseksi
14		// aseta DonListen arvoksi true
15		DontListen: true,
16	}
17
18	// Alustaa uuden palvelimen asetuksilla
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Käynnistää palvelimen goroutinen kautta
26	go ns.Start()
27
28	// Odottaa, että palvelin on valmis yhteyksille
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Yhdistää palvelimeen prosessin sisäisen yhteyden kautta
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Tilaa aiheen
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Tulostaa viestin tiedot
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Sammuttaa palvelimen (valinnainen)
49		ns.Shutdown()
50	})
51
52	// Julkaisee tiedot aiheeseen
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Odottaa palvelimen sammumista
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Nyt voidaan havaita, ettei ylimääräisiä network hop -hyppyjä tapahdu, kuten oli tarkoitus.

Under the hood

TL;DR

diagram1

  • Kun tämä koodi suoritetaan main.go-tiedostossa, se on sekvenssikaavio, joka näyttää, miten sisäiset funktiot toimivat. Ydinasiat ovat seuraavat:
    • DontListen: true -asetuksen avulla palvelin ohittaa AcceptLoop-nimisen client listening phase -vaiheen.
    • Jos clientin Connect option -asetuksessa InProcessServer on aktivoitu, se luo in-memory connection -yhteyden ja rakentaa pipen net.Pipe-toiminnolla, jonka jälkeen se palauttaa pipen pään clientille net.Conn-tyyppinä.
    • Client ja serveri kommunikoivat in-process -tilassa tämän yhteyden kautta.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Odottaa clienttejä.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Ensinnäkin, jos DontListen on true, AcceptLoop-niminen client listening phase ohitetaan.
 1// nats-server/server/server.go
 2
 3// AcceptLoop on eksportoitu helpottamaan testausta.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Jos meidän pitäisi poistua ennen kuin kuuntelija on asetettu oikein,
 6	// varmista, että suljemme kanavan.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Tilannekuva palvelimen asetuksista.
18	opts := s.getOpts()
19
20	// Aseta tila, joka voi mahdollistaa sammutuksen
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Virhe kuunneltaessa porttia: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Kuuntelee client-yhteyksiä osoitteessa %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Ilmoitus TLS:n käytöstä.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS vaaditaan client-yhteyksiin")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clientit, jotka eivät käytä \"TLS Handshake First\" -vaihtoehtoa, eivät voi muodostaa yhteyttä")
38		}
39	}
40
41	// Jos palvelin käynnistettiin RANDOM_PORT (-1) -asetuksella, opts.Port olisi
42	// aluksi 0 tässä funktiossa. Joten meidän on saatava todellinen portti
43	if opts.Port == 0 {
44		// Kirjoita ratkaistu portti takaisin asetuksiin.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Nyt kun portti on asetettu (jos se oli asetettu RANDOM-tilaan), aseta
49	// palvelimen info Host/Port joko Options-arvoista tai
50	// ClientAdvertise-arvoista.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Virhe palvelimen INFO-asetuksessa ClientAdvertise-arvolla %s, virhe=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Pidä kirjaa client-yhteys-URL-osoitteista. Saatamme tarvita niitä myöhemmin.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signaali, että emme hyväksy uusia clienttejä
65				s.ldmCh <- true
66				// Nyt odotetaan sammutusta...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Ilmoita kutsujalle, että olemme valmiita
75	close(clr)
76	clr = nil
77}
  • Muuten, AcceptLoop-funktio suorittaa seuraavat prosessit. Nämä ovat osia, jotka liittyvät verkkokommunikaatioon, kuten TLS tai hostPort, ja jotka voidaan jättää pois, jos kyseessä on in-process communication.

Client

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect yrittää muodostaa yhteyden NATS-järjestelmään.
 5// URL voi sisältää käyttäjätunnus/salasana-semantiikkaa. esim. nats://derek:pass@localhost:4222
 6// Pilkuin erotellut taulukot ovat myös tuettuja, esim. urlA, urlB.
 7// Asetukset alkavat oletusarvoista, mutta ne voidaan ohittaa.
 8// Yhdistääksesi NATS Serverin websocket-porttiin, käytä `ws` tai `wss` -skeemaa, kuten
 9// `ws://localhost:8080`. Huomaa, että websocket-skeemoja ei voi sekoittaa muihin (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options-rakennetta voidaan käyttää räätälöidyn yhteyden luomiseen.
 4type Options struct {
 5	// Url edustaa yksittäistä NATS-palvelimen URL-osoitetta, johon client
 6	// muodostaa yhteyden. Jos Servers-asetus on myös asetettu, siitä
 7	// tulee ensimmäinen palvelin Servers-taulukossa.
 8	Url string
 9
10	// InProcessServer edustaa NATS-palvelinta, joka toimii
11	// samassa prosessissa. Jos tämä on asetettu, yritämme muodostaa
12	// yhteyden palvelimeen suoraan TCP-yhteyksien sijaan.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Connect-funktio, joka suorittaa nats serverin ja nats clientin välisen yhteyden, voi asettaa client URL:n ja connect Optionin, ja nämä Optionit sisältävä Options-rakenne sisältää InProcessConnProvider-rajapintatyypin InProcessServer-kentän.
1// main.go of example code
2
3// Alustaa uuden palvelimen asetuksilla
4ns, err := server.NewServer(opts)
5
6//...
7
8// Yhdistää palvelimeen prosessin sisäisen yhteyden kautta
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Kun nats client muodostaa yhteyden ja välittää nats.InProcessServer(ns):n InProcessServer-kenttään,
 1// nats-go/nats.go
 2
 3// createConn muodostaa yhteyden palvelimeen ja käärii asianmukaiset
 4// bufio-rakenteet. Se toimii oikein, kun olemassa oleva
 5// yhteys on käytössä.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Jos meillä on viittaus prosessin sisäiseen palvelimeen, luo
15	// yhteys sitä käyttäen.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • kyseinen rajapinta suorittaa InProcessServer-vaihtoehdon InProcesConn-funktion createConn-funktiossa, jos InProcessServer-vaihtoehto ei ole nil (eli on validi), ja
 1// nats-server/server/server.go
 2
 3// InProcessConn palauttaa prosessin sisäisen yhteyden palvelimeen,
 4// välttäen TCP-kuuntelijan tarpeen paikallisessa yhteydenmuodostuksessa
 5// samassa prosessissa. Tätä voidaan käyttää riippumatta
 6// DontListen-asetuksen tilasta.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • serveriin toteutettua InProcessConn-funktiota kutsutaan ja suoritetaan.
  • Tämä funktio kutsutaan natsin Go-clientissä nats.go, kun nc:n (nats connection) InProcessServer ei ole nil, ja se luo yhteyden (net.Conn) ja sitoo sen serverin yhteyteen.

Go:n consumer driven interface

Tyyppi toteuttaa rajapinnan toteuttamalla sen metodit. Tarkoituksen nimenomaista ilmoitusta tai "implements"-avainsanaa ei ole. Implisiittiset rajapinnat irrottavat rajapinnan määrittelyn sen toteutuksesta, joka voi sitten esiintyä missä tahansa paketissa ilman ennakkojärjestelyjä.

Interfaces are implemented implicitly, A Tour of Go

Jos tyyppi on olemassa vain rajapinnan toteuttamiseksi eikä sillä ole koskaan rajapinnan ulkopuolisia eksportoituja metodeja, tyyppiä itseään ei tarvitse eksportoida.

Generality, Effective Go

  • Tämä rajapintasuunnittelu ilmentää hyvin Go-kielessä usein puhuttua consumer defined interface -periaatetta ja structural typing (duck typing) -tyyppisyyttä, joten haluan esitellä myös tämän aiheen.
 1// nats-go/nats.go
 2
 3// Options-rakennetta voidaan käyttää räätälöidyn yhteyden luomiseen.
 4type Options struct {
 5	// Url edustaa yksittäistä NATS-palvelimen URL-osoitetta, johon client
 6	// muodostaa yhteyden. Jos Servers-asetus on myös asetettu, siitä
 7	// tulee ensimmäinen palvelin Servers-taulukossa.
 8	Url string
 9
10	// InProcessServer edustaa NATS-palvelinta, joka toimii
11	// samassa prosessissa. Jos tämä on asetettu, yritämme muodostaa
12	// yhteyden palvelimeen suoraan TCP-yhteyksien sijaan.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Palataan koodiin. nats.go clientin InProcessServer option struct field on määritelty InProcessConnProvider-rajapinnaksi, joka suorittaa vain InProcessConn-toiminnon.
 1// nats-server/server/server.go
 2
 3// InProcessConn palauttaa prosessin sisäisen yhteyden palvelimeen,
 4// välttäen TCP-kuuntelijan tarpeen paikallisessa yhteydenmuodostuksessa
 5// samassa prosessissa. Tätä voidaan käyttää riippumatta
 6// DontListen-asetuksen tilasta.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Kuitenkin sen tyyppi on nats-serverin Server, joka suorittaa InProcessConn-toiminnon lisäksi monia muita toimintoja.
  • Tämä johtuu siitä, että tässä tilanteessa clientin kiinnostuksen kohteena on vain se, onko InProcessConn-rajapinta tarjottu vai ei; muut asiat eivät ole kovin tärkeitä.
  • Siksi nats.go client luo ja käyttää vain InProcessConnProvider-nimistä consumer defined interface -rajapintaa, joka määrittelee ainoastaan InProcessConn() (net.Conn, error) -toiminnon.

Johtopäätös

  • Olen käsitellyt lyhyesti NATSin embedded mode -tilaa ja sen toimintatapaa sekä Go-kielen consumer defined interface -periaatetta, joka on havaittavissa NATSin koodista.
  • Toivon, että tämä tieto on hyödyllistä niille, jotka käyttävät NATSia vastaaviin tarkoituksiin, ja tähän päättyy tämä artikkeli.