GoSuda

Hvordan kommuniserer innebygde NATS med Go-applikasjoner?

By prravda
views ...

Komme i gang

Om NATS

Programvareapplikasjoner og -tjenester må utveksle data. NATS er en infrastruktur som tillater slik datautveksling, segmentert i form av meldinger. Vi kaller dette en " message oriented middleware".

Med NATS kan applikasjonsutviklere:

  • Bygge distribuerte og skalerbare klient-server-applikasjoner uten anstrengelse.
  • Lagre og distribuere data i sanntid på en generell måte. Dette kan fleksibelt oppnås på tvers av ulike miljøer, språk, skyleverandører og on-premises systemer.

Hva er NATS, NATS docs

  • NATS er en message broker utviklet i Go.

Embedded NATS

Hvis applikasjonen din er i Go, og hvis det passer ditt bruksområde og dine distribusjonsscenarier, kan du til og med embedde en NATS server inne i applikasjonen din.

Embedding NATS, NATS docs

  • Og det er en spesiell egenskap ved NATS, nemlig at den støtter embedded mode for applikasjoner utviklet i Go.
  • Det betyr at i stedet for den vanlige metoden for message brokers, som innebærer å starte en separat broker-server og kommunisere med den via applikasjonens klient, kan brokeren embeddes direkte i Go-applikasjonen.

Fordeler og bruksområder for embedded NATS

  • Det er en godt forklarende Youtube-video som jeg henviser til.
  • Selv uten å distribuere en separat message broker-server, kan man oppnå fordelen med å embedde NATS ved å lage en modular monolith application, som også oppnår separate of concern. I tillegg muliggjør dette single binary deployment.
  • Den kan brukes for platform with no network (WASM), samt i offline-first applicationer.

Eksempel fra offisielle dokumenter

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialiser ny server med opsjoner
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start serveren via goroutine
22    go ns.Start()
23
24    // Vent til serveren er klar for tilkoblinger
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Koble til serveren
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Abonner på emnet
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Skriv ut meldingsdata
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Stopp serveren (valgfritt)
45        ns.Shutdown()
46    })
47
48    // Publiser data til emnet
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Vent på at serveren skal stoppe
52    ns.WaitForShutdown()
53}
  • Dette er et eksempel på embedded NATS fra NATs' offisielle dokumentasjon, men hvis man følger denne eksempelkode, vil kommunikasjonen ikke skje i embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Når man kjører go run . med denne Go-filen, samtidig som man overvåker nettverkstrafikken til localhost (127.0.0.1) med kommandoen watch 'netstat -an | grep 127.0.0.1', kan man observere at nye nettverksforespørsler som starter fra NATS' standardport 4222 blir lagt til.

Riktige konfigurasjoner for embedding mode

  • For å kommunisere i embedded mode som tiltenkt, er følgende to opsjoner nødvendige.

    • Klient: InProcessServer-opsjonen må inkluderes.
    • Server: DontListen-flagget i Server.Options må settes til true.
  • Disse delene var ikke offisielt dokumentert, og opprinnelsen til denne funksjonaliteten kan spores tilbake til denne PR-en.

    This PR adds three things:

    1. InProcessConn() function to Server which builds a net.Pipe to get a connection to the NATS server without using TCP sockets
    2. DontListen option which tells the NATS server not to listen on the usual TCP listener
    3. startupComplete channel, which is closed right before we start AcceptLoop, and readyForConnections will wait for it

    The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.

    An accompanying PR nats-io/nats.go#774 adds support to the client side.

    This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.

    /cc @nats-io/core

Fungerende eksempel for embedded mode

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// for configuring the embeded NATS server
14		// set DonListen as true
15		DontListen: true,
16	}
17
18	// Initialiser ny server med opsjoner
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start serveren via goroutine
26	go ns.Start()
27
28	// Vent til serveren er klar for tilkoblinger
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Koble til serveren via in-process-tilkobling
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Abonner på emnet
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Skriv ut meldingsdata
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Stopp serveren (valgfritt)
49		ns.Shutdown()
50	})
51
52	// Publiser data til emnet
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Vent på at serveren skal stoppe
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Nå kan man se at det ikke oppstår ytterligere network hops, som tiltenkt.

Under the hood

TL;DR

diagram1

  • Dette er et sequence diagram som viser hvilke funksjoner som opererer internt og hvordan de fungerer når koden kjøres i main.go, og hovedpunktene er som følger:
    • Ved DontListen: true hopper serveren over klientens lyttingfase kalt AcceptLoop.
    • Hvis klientens Connect-opsjon InProcessServer er aktivert, opprettes en in-memory connection og et pipe via net.Pipe, deretter returneres slutten av pipen til klienten som en net.Conn-type.
    • Klienten og serveren utfører in-process communication via denne connectionen.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Vent på klienter.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Først, hvis DontListen er true, hopper serveren over klientens lyttingfase kalt AcceptLoop.
 1// nats-server/server/server.go
 2
 3// AcceptLoop er eksportert for enklere testing.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Hvis vi skulle avslutte før lytteren er satt opp riktig,
 6	// sørg for at vi lukker kanalen.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot serveropsjoner.
18	opts := s.getOpts()
19
20	// Sett opp tilstand som kan muliggjøre nedstengning
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Varsle om TLS er aktivert.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS required for client connections")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38		}
39	}
40
41	// Hvis serveren ble startet med RANDOM_PORT (-1), ville opts.Port være lik
42	// 0 i begynnelsen av denne funksjonen. Så vi må få den faktiske porten
43	if opts.Port == 0 {
44		// Skriv løst port tilbake til opsjoner.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Nå som porten er satt (hvis den ble satt til RANDOM), sett
49	// serverens info Host/Port med enten verdier fra Options eller
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Hold oversikt over klienttilkoblings-URL-er. Vi kan trenge dem senere.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signaliser at vi ikke godtar nye klienter
65				s.ldmCh <- true
66				// Nå venter vi på Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// La den som kaller vite at vi er klare
75	close(clr)
76	clr = nil
77}
  • For øvrig utfører AcceptLoop-funksjonen følgende prosesser: Den omhandler nettverkskommunikasjonsrelaterte deler som TLS og hostPort. Disse delene er overflødige ved in-process communication og kan derfor utelates.

Klient

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect vil forsøke å koble til NATS-systemet.
 5// URL-en kan inneholde brukernavn/passord-semantikk. f.eks. nats://derek:pass@localhost:4222
 6// Komma-separerte arrays støttes også, f.eks. urlA, urlB.
 7// Opsjoner starter med standardinnstillingene, men kan overstyres.
 8// For å koble til en NATS Servers websocket-port, bruk `ws` eller `wss` skjemaet, for eksempel
 9// `ws://localhost:8080`. Merk at websocket-skjemaer ikke kan blandes med andre (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options kan brukes til å opprette en tilpasset tilkobling.
 4type Options struct {
 5	// Url representerer en enkelt NATS-server-URL som klienten
 6	// vil koble til. Hvis Servers-opsjonen også er satt, blir den
 7	// den første serveren i Servers-arrayet.
 8	Url string
 9
10	// InProcessServer representerer en NATS-server som kjører innenfor
11	// samme prosess. Hvis dette er satt, vil vi forsøke å koble til
12	// serveren direkte i stedet for å bruke eksterne TCP-tilkoblinger.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Connect-funksjonen, som håndterer tilkoblingen mellom NATS server og NATS client, lar deg konfigurere klientens URL og tilkoblingsalternativer. Options-structen, som samler disse alternativene, inneholder et felt kalt InProcessServer av typen InProcessConnProvider interface.
1// main.go of example code
2
3// Initialiser ny server med opsjoner
4ns, err := server.NewServer(opts)
5
6//...
7
8// Koble til serveren via in-process-tilkobling
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Når Connect utføres på nats client, og nats.InProcessServer(ns) sendes som InProcessServer-feltet, så
 1// nats-go/nats.go
 2
 3// createConn vil koble til serveren og pakke inn de passende
 4// bufio-strukturene. Den vil gjøre det riktige når en eksisterende
 5// tilkobling er på plass.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Hvis vi har en referanse til en in-process-server, etablerer vi en
15	// tilkobling ved å bruke den.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • vil dette grensesnittet, i createConn-funksjonen som oppretter en tilkobling, utføre InProcessConn fra InProcessServer-opsjonen når InProcessServer-opsjonen ikke er nil (gyldig).
 1// nats-server/server/server.go
 2
 3// InProcessConn returnerer en in-process-tilkobling til serveren,
 4// og unngår behovet for å bruke en TCP-lytter for lokal tilkobling
 5// innenfor samme prosess. Dette kan brukes uavhengig av
 6// tilstanden til DontListen-opsjonen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • kaller og utfører InProcessConn implementert på serveren.
  • Denne funksjonen kalles når nc (nats connection) sin InProcessServer ikke er nil i NATs' Go client nats.go. Den oppretter en connection (net.Conn) og binder den til serverens connection.

Consumer driven interface of Go

En type implementerer et grensesnitt ved å implementere dets metoder. Det er ingen eksplisitt erklæring av intensjon, ingen "implements"-nøkkelord. Implisitte grensesnitt frakobler definisjonen av et grensesnitt fra dets implementasjon, som da kan dukke opp i enhver pakke uten forhåndsavtale.

Interfaces are implemented implicitly, A Tour of Go

If a type exists only to implement an interface and will never have exported methods beyond that interface, there is no need to export the type itself.

Generality, Effective Go

  • Denne grensesnittdesignen inkorporerer godt det som ofte kalles consumer defined interface og structural typing (duck typing) i Go, så jeg ønsker å presentere dette temaet samtidig.
 1// nats-go/nats.go
 2
 3// Options kan brukes til å opprette en tilpasset tilkobling.
 4type Options struct {
 5	// Url representerer en enkelt NATS-server-URL som klienten
 6	// vil koble til. Hvis Servers-opsjonen også er satt, blir den
 7	// den første serveren i Servers-arrayet.
 8	Url string
 9
10	// InProcessServer representerer en NATS-server som kjører innenfor
11	// samme prosess. Hvis dette er satt, vil vi forsøke å koble til
12	// serveren direkte i stedet for å bruke eksterne TCP-tilkoblinger.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • La oss gå tilbake til koden. I nats.go-klienten er InProcessServer-opsjonens struct-felt definert som InProcessConnProvider-grensesnittet, som kun utfører InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn returnerer en in-process-tilkobling til serveren,
 4// og unngår behovet for å bruke en TCP-lytter for lokal tilkobling
 5// innenfor samme prosess. Dette kan brukes uavhengig av
 6// tilstanden til DontListen-opsjonen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Imidlertid er typen som legges inn i den Server fra nats-server, som utfører en rekke funksjoner i tillegg til InProcessConn.
  • Dette skyldes at klientens interesse i denne situasjonen kun er om InProcessConn-grensesnittet er levert eller ikke, og andre ting er ikke særlig viktige.
  • Derfor har nats.go-klienten kun laget og brukt et consumer defined interface kalt InProcessConnProvider, som kun definerer funksjonaliteten InProcessConn() (net.Conn, error).

Konklusjon

  • Jeg har kort behandlet NATs' embedded mode og dens virkemåte, samt Go's consumer defined interface som kan bekreftes gjennom NATs' kode.
  • Jeg håper denne informasjonen er nyttig for de som bruker NATS til lignende formål, og med dette avslutter jeg denne artikkelen.