GoSuda

Hvordan kommuniserer innebygde NATS med Go-applikasjoner?

By prravda
views ...

Komme i gang

Om NATS

Programvareapplikasjoner og tjenester må utveksle data. NATS er en infrastruktur som tillater slik datautveksling, segmentert i form av meldinger. Vi kaller dette en «meldingsorientert mellomvare».

Med NATS kan applikasjonsutviklere:

  • Uanstrengt bygge distribuerte og skalerbare klient-server-applikasjoner.
  • Lagre og distribuere data i sanntid på en generell måte. Dette kan fleksibelt oppnås på tvers av ulike miljøer, språk, skyleverandører og lokale systemer.

Hva er NATS, NATS-dokumenter

  • NATS er en meldingsmegler konstruert i Go.

Embedded NATS

Hvis applikasjonen din er i Go, og hvis den passer til bruksområdet og distribusjonsscenariene dine, kan du til og med bygge inn en NATS-server i applikasjonen din.

Embedding NATS, NATS-dokumenter

  • En særegenhet ved NATS er at den støtter embedded mode for applikasjoner konstruert i Go.
  • Dette innebærer at i stedet for den konvensjonelle tilnærmingen med meldingsmeglere, som involverer drift av en separat megler-server og kommunikasjon via applikasjonens klient, er det mulig å bygge inn (embed) selve megleren direkte i en Go-basert applikasjon.

Fordeler og bruksområder for embedded NATS

  • En utmerket YouTube-video forklarer dette, og jeg henviser derfor til lenken til videoen.
  • Selv uten å distribuere en separat meldingsmeglerserver, kan man oppnå separate concerns ved å konstruere en modular monolith application, og dermed dra nytte av fordelene ved å integrere NATS i embedded-modus. I tillegg blir single binary deployment også mulig.
  • Dette kan anvendes for platform with no network (WASM) så vel som for offline-first applikasjoner.

Eksempel i offisiell dokumentasjon

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialiser ny server med opsjoner
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start serveren via goroutine
22    go ns.Start()
23
24    // Vent til serveren er klar for tilkoblinger
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("ikke klar for tilkobling")
27    }
28
29    // Koble til server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Abonner på emnet
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Skriv ut meldingsdata
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Steng ned serveren (valgfritt)
45        ns.Shutdown()
46    })
47
48    // Publiser data til emnet
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Vent på servernedstengning
52    ns.WaitForShutdown()
53}
  • Dette er et eksempel på Embedded NATS som er publisert i den offisielle NATS-dokumentasjonen, men hvis man følger denne eksempelkoden, vil kommunikasjon i embedded mode ikke finne sted.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Ved å kjøre go run . med kommandoen watch 'netstat -an | grep 127.0.0.1' for å overvåke nettverkstrafikken til localhost (127.0.0.1), kan man observere at nye nettverksforespørsler som stammer fra NATS' standardport 4222 blir lagt til.

Korrekte konfigurasjoner for embedding mode

  • For å oppnå den tilsiktede kommunikasjonen i embedded mode, er det nødvendig med følgende to opsjoner:

    • Klient: Opsjonen InProcessServer må inkluderes.
    • Server: Flagget DontListen i Server.Options må eksplisitt settes til true.
  • Disse spesifikke detaljene er ikke formelt dokumentert, men funksjonens opprinnelse kan spores tilbake til denne PR-en.

    Denne PR-en legger til tre ting:

    1. InProcessConn()-funksjon til Server som bygger en net.Pipe for å få en forbindelse til NATS-serveren uten å bruke TCP-sockets
    2. DontListen-opsjon som instruerer NATS-serveren om ikke å lytte på den vanlige TCP-lytteren
    3. startupComplete-kanal, som lukkes rett før vi starter AcceptLoop, og readyForConnections vil vente på den

    Hovedmotivasjonen for dette er at vi har en applikasjon som kan kjøre enten i en monolitisk (single-process) modus eller en polylitisk (multi-process) modus. Vi ønsker å kunne bruke NATS for begge moduser for enkelthets skyld, men monolitisk modus må kunne imøtekomme en rekke plattformer der åpning av socket-forbindelser enten ikke gir mening (mobil) eller rett og slett ikke er mulig (WASM). Disse endringene vil tillate oss å bruke NATS fullstendig in-process i stedet.

    En medfølgende PR nats-io/nats.go#774 legger til støtte på klientsiden.

    Dette er min første PR til dette prosjektet, så beklager på forhånd hvis jeg har oversett noe åpenbart.

    /cc @nats-io/core

Fungerende eksempel for embedded mode

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// for configuring the embeded NATS server
14		// set DonListen as true
15		DontListen: true,
16	}
17
18	// Initialiser ny server med opsjoner
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start serveren via goroutine
26	go ns.Start()
27
28	// Vent til serveren er klar for tilkoblinger
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("ikke klar for tilkobling")
31	}
32
33	// Koble til server via in-process-tilkobling
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Abonner på emnet
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Skriv ut meldingsdata
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Steng ned serveren (valgfritt)
49		ns.Shutdown()
50	})
51
52	// Publiser data til emnet
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Vent på servernedstengning
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Nå kan man observere at det ikke oppstår ytterligere nettverkshopp, i tråd med intensjonen.

Under panseret

TL;DR

diagram1

  • Dette er et sekvensdiagram som illustrerer hvordan funksjonene internt opererer når koden kjøres fra main.go, og kjernen kan oppsummeres som følger:
    • Ved DontListen: true utelater serveren klientlyttefasen AcceptLoop.
    • Hvis klientens Connect-opsjon InProcessServer aktiveres, opprettes en in-memory-forbindelse og en pipe via net.Pipe, hvoretter enden av pipen returneres til klienten som en net.Conn-type.
    • Klienten og serveren utfører in-process-kommunikasjon via denne forbindelsen.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Vent på klienter.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Først, hvis DontListen er true, utelates klientlyttefasen AcceptLoop.
 1// nats-server/server/server.go
 2
 3// AcceptLoop er eksportert for enklere testing.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Hvis vi skulle avslutte før lytteren er riktig satt opp,
 6	// sørg for at vi lukker kanalen.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Øyeblikksbilde av serveropsjoner.
18	opts := s.getOpts()
19
20	// Sett opp tilstand som kan aktivere nedstengning
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Feil ved lytting på port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Lytter etter klienttilkoblinger på %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Varsle om TLS er aktivert.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS kreves for klienttilkoblinger")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Klienter som ikke bruker \"TLS Handshake First\"-opsjonen vil mislykkes i å koble til")
38		}
39	}
40
41	// Hvis serveren ble startet med RANDOM_PORT (-1), ville opts.Port være lik
42	// 0 i begynnelsen av denne funksjonen. Så vi må få den faktiske porten
43	if opts.Port == 0 {
44		// Skriv den løste porten tilbake til opsjonene.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Nå som porten er satt (hvis den ble satt til RANDOM), sett
49	// serverens info Host/Port med enten verdier fra Opsjoner eller
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Feil ved innstilling av server INFO med ClientAdvertise-verdi av %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Hold oversikt over klientens tilkoblings-URL-er. Vi kan trenge dem senere.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signaliser at vi ikke aksepterer nye klienter
65				s.ldmCh <- true
66				// Vent nå på nedstengningen...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// La den som kaller vite at vi er klare
75	close(clr)
76	clr = nil
77}
  • For øvrig utfører AcceptLoop-funksjonen følgende prosesser: Dette omhandler nettverkskommunikasjonsrelaterte aspekter som TLS og hostPort, som kan utelates da de er unødvendige ved in-process-kommunikasjon.

Klient

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect vil forsøke å koble til NATS-systemet.
 5// URL-en kan inneholde brukernavn/passord-semantikk. f.eks. nats://derek:pass@localhost:4222
 6// Komma-separerte arrayer støttes også, f.eks. urlA, urlB.
 7// Opsjoner starter med standardverdier, men kan overstyres.
 8// For å koble til en NATS-servers websockets-port, bruk `ws` eller `wss`-skjemaet, for eksempel
 9// `ws://localhost:8080`. Merk at websockets-skjemaer ikke kan blandes med andre (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Opsjoner kan brukes til å opprette en tilpasset tilkobling.
 4type Options struct {
 5	// Url representerer en enkelt NATS-server-url som klienten
 6	// vil koble til. Hvis Servers-opsjonen også er satt, blir den
 7	// den første serveren i Servers-arrayet.
 8	Url string
 9
10	// InProcessServer representerer en NATS-server som kjører innenfor
11	// samme prosess. Hvis denne er satt, vil vi forsøke å koble
12	// til serveren direkte i stedet for å bruke eksterne TCP-forbindelser.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Connect-funksjonen, som håndterer tilkoblingen mellom NATS-serveren og NATS-klienten, tillater konfigurering av klient-URL og tilkoblingsopsjoner. Options-strukturen, som samler disse opsjonene, inneholder et felt kalt InProcessServer av typen InProcessConnProvider-grensesnitt.
1// main.go of example code
2
3// Initialiser ny server med opsjoner
4ns, err := server.NewServer(opts)
5
6//...
7
8// Koble til server via in-process-tilkobling
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Når NATS-klienten etablerer en forbindelse og sender nats.InProcessServer(ns) til InProcessServer-feltet, vil følgende skje:
 1// nats-go/nats.go
 2
 3// InProcessServer er en opsjon som vil forsøke å etablere en retning til en NATS-server
 4// som kjører innenfor prosessen i stedet for å ringe via TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • Opsjonens InProcessServer blir substituert med den innebygde NATS-serveren.
 1// nats-go/nats.go
 2
 3// createConn vil koble til serveren og pakke inn de passende
 4// bufio-strukturene. Den vil gjøre det riktige når en eksisterende
 5// tilkobling er på plass.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Hvis vi har en referanse til en in-process-server, etabler en
15	// tilkobling ved å bruke den.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("kunne ikke hente in-process-tilkobling: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • Dette grensesnittet, når InProcessServer-opsjonen ikke er null (gyldig), utfører InProcessConn-funksjonen til InProcessServer-opsjonen i createConn-funksjonen, som oppretter en tilkobling.
 1// nats-server/server/server.go
 2
 3// InProcessConn returnerer en in-process-tilkobling til serveren,
 4// og unngår behovet for å bruke en TCP-lytter for lokal tilkobling
 5// innenfor samme prosess. Dette kan brukes uavhengig av
 6// tilstanden til DontListen-opsjonen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("kunne ikke opprette tilkobling")
16	}
17	return pr, nil
18}
  • Serveren kaller og utfører InProcessConn-funksjonen.
  • Denne funksjonen, i Go-klienten til NATS (nats.go), kalles når InProcessServer for nc (nats connection) ikke er null, og den oppretter en tilkobling (net.Conn) og binder den til serverens tilkobling.

Consumer driven interface of Go

En type implementerer et grensesnitt ved å implementere dets metoder. Det er ingen eksplisitt erklæring om intensjon, ingen "implements"-nøkkelord. Implisitte grensesnitt frakobler definisjonen av et grensesnitt fra dets implementasjon, som deretter kan forekomme i hvilken som helst pakke uten forhåndsarrangement.

Grensesnitt er implementert implisitt, A Tour of Go

Hvis en type kun eksisterer for å implementere et grensesnitt og aldri vil ha eksporterte metoder utover det grensesnittet, er det ikke nødvendig å eksportere selve typen.

Generalitet, Effective Go

  • Denne grensesnittdesignen inkorporerer det som ofte refereres til som consumer defined interface og structural typing (duck typing) i Go, og jeg ønsker å introdusere dette emnet sammen med det.
 1// nats-go/nats.go
 2
 3// Opsjoner kan brukes til å opprette en tilpasset tilkobling.
 4type Options struct {
 5	// Url representerer en enkelt NATS-server-url som klienten
 6	// vil koble til. Hvis Servers-opsjonen også er satt, blir den
 7	// den første serveren i Servers-arrayet.
 8	Url string
 9
10	// InProcessServer representerer en NATS-server som kjører innenfor
11	// samme prosess. Hvis denne er satt, vil vi forsøke å koble
12	// til serveren direkte i stedet for å bruke eksterne TCP-forbindelser.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • La oss nå vende tilbake til koden. I nats.go-klienten er InProcessServer-opsjonsstrukturen definert som et InProcessConnProvider-grensesnitt som kun utfører InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn returnerer en in-process-tilkobling til serveren,
 4// og unngår behovet for å bruke en TCP-lytter for lokal tilkobling
 5// innenfor samme prosess. Dette kan brukes uavhengig av
 6// tilstanden til DontListen-opsjonen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("kunne ikke opprette tilkobling")
16	}
17	return pr, nil
18}
  • Imidlertid er typen som inngår i dette grensesnittet, Server fra nats-server, som utfører en rekke funksjoner utover InProcessConn.
  • Dette skyldes at klientens primære interesse i denne situasjonen er om InProcessConn-grensesnittet er tilgjengelig, mens andre aspekter er av mindre betydning.
  • Følgelig har nats.go-klienten kun opprettet og benytter et forbrukerdefinert grensesnitt kalt InProcessConnProvider, som kun definerer funksjonaliteten InProcessConn() (net.Conn, error).

Konklusjon

  • Denne artikkelen har kortfattet behandlet NATS' embedded mode, dens virkemåte, og Go's consumer defined interface som kan observeres gjennom NATS' kode.
  • Jeg håper denne informasjonen er nyttig for de som benytter NATS til lignende formål.