GoSuda

Как вградените NATS комуникират с go приложение?

By prravda
views ...

Начало

Относно NATS

Софтуерните приложения и услуги трябва да обменят данни. NATS е инфраструктура, която позволява такъв обмен на данни, сегментиран във формата на съобщения. Ние наричаме това "message oriented middleware".

С NATS разработчиците на приложения могат:

  • Без усилие да изграждат разпределени и мащабируеми клиент-сървър приложения.
  • Да съхраняват и разпространяват данни в реално време по общ начин. Това може гъвкаво да бъде постигнато в различни среди, езици, cloud доставчици и on-premises системи.

What is NATS, NATS docs

  • NATS е message broker, изграден на Go.

Embedded NATS

Ако вашето приложение е на Go и ако отговаря на вашия случай на употреба и сценарии за разгръщане, можете дори да вградите NATS server във вашето приложение.

Embedding NATS, NATS docs

  • Съществува и особеност на NATS, а именно, че поддържа embedded mode за приложения, изградени на Go.
  • Това означава, че вместо обичайния подход за message broker, при който отделен брокер сървър се стартира и комуникацията се осъществява чрез клиента на приложението към този сървър, брокерът може да бъде вграден (embed) директно в приложението, написано на Go.

Предимства и случаи на употреба на embedded NATS

  • Има добре обяснено видео в Youtube, затова ще използвам връзка към видеото.
  • Дори без да разполагате с отделен message broker сървър, можете да създадете modular monolith application и да постигнете separate of concern, като същевременно използвате предимствата на вградения NATS. Освен това става възможно и single binary deployment.
  • Може да се използва ефективно не само за platform with no network (wasm), но и за offline-first application.

Пример от официалната документация

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Инициализиране на нов сървър с опции
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Стартиране на сървъра чрез goroutine
22    go ns.Start()
23
24    // Изчакване сървърът да бъде готов за връзки
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Свързване към сървъра
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Абониране за темата
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Отпечатване на данните от съобщението
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Изключване на сървъра (по избор)
45        ns.Shutdown()
46    })
47
48    // Публикуване на данни към темата
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Изчакване на изключването на сървъра
52    ns.WaitForShutdown()
53}
  • Това е пример за Embedded NATS, посочен в официалната документация на NATS, но ако изпълните примерния код, комуникацията няма да се осъществи в embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Чрез командата watch 'netstat -an | grep 127.0.0.1' може да се наблюдава мрежовият трафик към и от localhost (127.0.0.1). Когато изпълните go файла с go run ., ще забележите добавянето на нови мрежови заявки, започващи от порт 4222, който е стандартният порт на NATS.

Правилни конфигурации за embedded mode

  • За да комуникирате в embedded mode по желания начин, са необходими следните две опции:

    • Client: Трябва да се добави опцията InProcessServer.
    • Server: Във Server.Options трябва да се укаже DontListen флагът като true.
  • Тези части не са официално документирани, като началото на тази функционалност може да се проследи чрез този PR.

    Този PR добавя три неща:

    1. Функцията InProcessConn() към Server, която изгражда net.Pipe за получаване на връзка към NATS сървъра без използване на TCP сокети
    2. Опцията DontListen, която указва на NATS сървъра да не слуша на обичайния TCP слушател
    3. Каналът startupComplete, който се затваря непосредствено преди да стартираме AcceptLoop, и readyForConnections ще го изчака

    Основната мотивация за това е, че имаме приложение, което може да работи или в monolith (единичен процес) режим, или в polylith (многопроцесов) режим. Бихме искали да можем да използваме NATS и за двата режима за простота, но режимът monolith трябва да може да обслужва различни платформи, където отварянето на сокет връзки или няма смисъл (мобилни), или просто не е възможно (WASM). Тези промени ще ни позволят да използваме NATS изцяло в процеса.

    Придружаващият PR nats-io/nats.go#774 добавя поддръжка от страна на клиента.

    Това е първият ми PR към този проект, така че се извинявам предварително, ако съм пропуснал нещо очевидно някъде.

    /cc @nats-io/core

Работещ пример за embedded mode

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// за конфигуриране на вградения NATS сървър
14		// задайте DonListen като true
15		DontListen: true,
16	}
17
18	// Инициализиране на нов сървър с опции
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Стартиране на сървъра чрез goroutine
26	go ns.Start()
27
28	// Изчакване сървърът да бъде готов за връзки
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Свързване към сървъра чрез in-process връзка
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Абониране за темата
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Отпечатване на данните от съобщението
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Изключване на сървъра (по избор)
49		ns.Shutdown()
50	})
51
52	// Публикуване на данни към темата
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Изчакване на изключването на сървъра
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Сега може да се види, че не възникват допълнителни network hop, както е било предвидено.

Under the hood

TL;DR

diagram1

  • Тази диаграма на последователността показва какви функции работят вътрешно, когато този код се изпълнява в main.go, а основната идея е следната:
    • Чрез DontListen: true сървърът пропуска фазата на слушане на клиенти, наречена AcceptLoop.
    • Ако опцията InProcessServer сред опциите за свързване на клиента е активирана, се създава in-memory connection и чрез net.Pipe се създава pipe, след което краят на pipe се връща на клиента като net.Conn type.
    • Клиентът и сървърът осъществяват in-process communication чрез тази връзка.

Server

AcceptLoop

1// nats-server/server/server.go
2
3// Изчакване на клиенти.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Първо, ако DontListen е true, фазата на слушане на клиенти, наречена AcceptLoop, се пропуска.
 1// nats-server/server/server.go
 2
 3// AcceptLoop се експортира за по-лесно тестване.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Ако трябваше да излезем, преди слушателят да е настроен правилно,
 6	// уверете се, че затваряме канала.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Снимка на опциите на сървъра.
18	opts := s.getOpts()
19
20	// Настройка на състояние, което може да позволи изключване
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Грешка при слушане на порт: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Слушане за клиентски връзки на %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Предупреждение за активиран TLS.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS е необходим за клиентски връзки")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Клиентите, които не използват опцията \"TLS Handshake First\", няма да могат да се свържат")
38		}
39	}
40
41	// Ако сървърът е стартиран с RANDOM_PORT (-1), opts.Port ще бъде равен
42	// на 0 в началото на тази функция. Затова трябва да получим действителния порт
43	if opts.Port == 0 {
44		// Записване на разрешения порт обратно в опциите.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Сега, след като портът е зададен (ако е бил зададен на RANDOM), задайте
49	// Info Host/Port на сървъра с една от стойностите от Options или
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Грешка при настройване на информацията за сървъра със стойност ClientAdvertise %s, грешка=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Следете URL адресите за свързване на клиенти. Може да ни потрябват по-късно.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Сигнализиране, че не приемаме нови клиенти
65				s.ldmCh <- true
66				// Сега изчакайте изключването...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Уведомете повикващия, че сме готови
75	close(clr)
76	clr = nil
77}
  • За сведение, функцията AcceptLoop изпълнява следните процеси. Частите, свързани с мрежова комуникация, като TLS или hostPort, са ненужни при in-process communication и могат да бъдат пропуснати.

Client

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect ще се опита да се свърже със системата NATS.
 5// URL адресът може да съдържа семантика за потребителско име/парола. напр. nats://derek:pass@localhost:4222
 6// Поддържат се и масиви, разделени със запетаи, напр. urlA, urlB.
 7// Опциите започват с настройките по подразбиране, но могат да бъдат променени.
 8// За да се свържете с websocket порт на NATS Server, използвайте схемата `ws` или `wss`, като например
 9// `ws://localhost:8080`. Имайте предвид, че websocket схемите не могат да се смесват с други (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options могат да се използват за създаване на персонализирана връзка.
 4type Options struct {
 5	// Url представлява единичен NATS сървър url, към който клиентът
 6	// ще се свързва. Ако опцията Servers също е зададена, тя
 7	// тогава става първият сървър в масива Servers.
 8	Url string
 9
10	// InProcessServer представлява NATS сървър, работещ в
11	// същия процес. Ако това е зададено, тогава ще се опитаме да се свържем
12	// директно със сървъра, вместо да използваме външни TCP връзки.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Функцията Connect, която осъществява връзката между nats server и nats client, може да зададе client URL и connect Option, а в структурата Options, която съдържа тези опции, съществува поле InProcessServer от тип интерфейс InProcesConnProvider.
1// main.go от примерния код
2
3// Инициализиране на нов сървър с опции
4ns, err := server.NewServer(opts)
5
6//...
7
8// Свързване към сървъра чрез in-process връзка
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Когато nats client осъществява Connect, предавайки nats.InProcessServer(ns) на полето InProcessServer, тогава
 1// nats-go/nats.go
 2
 3// createConn ще се свърже със сървъра и ще обвие подходящите
 4// bufio структури. То ще направи правилното нещо, когато съществува
 5// връзка е налице.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Ако имаме референция към in-process сървър, тогава установете
15	// връзка, използвайки това.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("неуспешно получаване на in-process връзка: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • Този интерфейс, когато опцията InProcessServer не е nil (валидна) във функцията createConn, която създава връзка, изпълнява InProcesConn на InProcessServer в опцията, като по този начин
 1// nats-server/server/server.go
 2
 3// InProcessConn връща in-process връзка към сървъра,
 4// избягвайки необходимостта от използване на TCP слушател за локална свързаност
 5// в рамките на същия процес. Това може да се използва независимо от
 6// състоянието на опцията DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("неуспешно създаване на връзка")
16	}
17	return pr, nil
18}
  • извиква и изпълнява InProcessConn, имплементиран в сървъра.
  • Тази функция, ако InProcessServer на nc (nats connection) в go клиента на NATS, nats.go, не е nil, се извиква, за да създаде връзка (net.Conn) и да я свърже с връзката на сървъра.

Consumer driven interface of Go

Един тип имплементира интерфейс, като имплементира неговите методи. Няма изрична декларация за намерение, няма ключова дума "implements". Неявните интерфейси разделят дефиницията на интерфейс от неговата имплементация, която след това може да се появи във всеки пакет без предварителна уговорка.

Interfaces are implemented implicitly, A Tour of Go

Ако един тип съществува само за да имплементира интерфейс и никога няма да има експортирани методи извън този интерфейс, няма нужда да се експортира самият тип.

Generality, Effective Go

  • Този дизайн на интерфейса добре отразява т.нар. consumer defined interface и structural typing (duck typing) в Go, затова бих искал да представя и тази тема.
 1// nats-go/nats.go
 2
 3// Options могат да се използват за създаване на персонализирана връзка.
 4type Options struct {
 5	// Url представлява единичен NATS сървър url, към който клиентът
 6	// ще се свързва. Ако опцията Servers също е зададена, тя
 7	// тогава става първият сървър в масива Servers.
 8	Url string
 9
10	// InProcessServer представлява NATS сървър, работещ в
11	// същия процес. Ако това е зададено, тогава ще се опитаме да се свържем
12	// директно със сървъра, вместо да използваме външни TCP връзки.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Да се върнем към кода. Полето на структурата InProcessServer в клиента nats.go е дефинирано като интерфейс InProcessConnProvider, който изпълнява само InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn връща in-process връзка към сървъра,
 4// избягвайки необходимостта от използване на TCP слушател за локална свързаност
 5// в рамките на същия процес. Това може да се използва независимо от
 6// състоянието на опцията DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("неуспешно създаване на връзка")
16	}
17	return pr, nil
18}
  • Въпреки това, типът, който се подава в него, е Server от nats-server, който изпълнява не само InProcessConn, но и различни други функции.
  • Причината е, че в тази ситуация интересът на клиента е само дали е предоставен интерфейсът InProcessConn, а другите неща не са от голямо значение.
  • Следователно, клиентът nats.go създава и използва само consumer defined interface, наречен InProcessConnProvider, който дефинира само функцията InProcessConn() (net.Conn, error).

Заключение

  • Разгледахме накратко embedded mode на NATS и неговия начин на работа, както и consumer defined interface на Go, който може да бъде идентифициран чрез кода на NATS.
  • Надявам се тази информация да бъде полезна за хората, които използват NATS за горепосочените цели, и с това приключвам тази статия.