GoSuda

Как встроенные NATS взаимодействуют с go-приложением?

By prravda
views ...

Начало работы

О NATS

Программным приложениям и службам необходимо обмениваться данными. NATS — это инфраструктура, которая позволяет осуществлять такой обмен данными, сегментированный в форме сообщений. Мы называем это «сообщение-ориентированным ПО промежуточного слоя» (message oriented middleware).

С помощью NATS разработчики приложений могут:

  • Легко создавать распределенные и масштабируемые клиент-серверные приложения.
  • Хранить и распространять данные в режиме реального времени общим образом. Это может быть гибко достигнуто в различных средах, языках, облачных провайдерах и локальных системах.

Что такое NATS, документация NATS

  • NATS — это брокер сообщений, разработанный на Go.

Встроенный NATS

Если ваше приложение написано на Go, и это соответствует вашему варианту использования и сценариям развертывания, вы можете даже встроить сервер NATS в ваше приложение.

Встраивание NATS, документация NATS

  • А особенностью NATS является то, что для приложений, разработанных на Go, он поддерживает встроенный режим (embedded mode).
  • То есть, вместо обычного для брокеров сообщений подхода, при котором отдельный сервер брокера запускается, а затем приложение обменивается данными с ним через клиент, брокер может быть встроен (embed) непосредственно в приложение, написанное на Go.

Преимущества и сценарии использования встроенного NATS

  • Существует хорошо объясняющее видео на Youtube, поэтому я ограничусь ссылкой на него.
  • Даже без развертывания отдельного сервера брокера сообщений можно создать modular monolith application для достижения separate of concern и использовать преимущества встроенного NATS. Кроме того, становится возможным single binary deployment.
  • Это может быть полезно не только для platform with no network (WASM), но и для offline-first application.

Пример из официальной документации

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Инициализация нового сервера с опциями
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Запуск сервера через goroutine
22    go ns.Start()
23
24    // Ожидание готовности сервера к соединениям
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Подключение к серверу
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Подписка на subject
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Вывод данных сообщения
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Остановка сервера (опционально)
45        ns.Shutdown()
46    })
47
48    // Публикация данных в subject
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Ожидание завершения работы сервера
52    ns.WaitForShutdown()
53}
  • Это пример встроенного NATS из официальной документации NATS, но если следовать этому коду, связь не будет осуществляться в режиме embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Если вы запустите этот go-файл с помощью go run . и будете наблюдать за сетевым трафиком, идущим на localhost (127.0.0.1) и с него, используя команду watch 'netstat -an | grep 127.0.0.1', вы увидите, что добавляются новые сетевые запросы, исходящие из порта 4222, который является портом по умолчанию для NATS.

Правильные конфигурации для встроенного режима

  • Для того чтобы связь осуществлялась в режиме embedded mode, как задумано, необходимы следующие две опции:

    • Client: необходимо добавить опцию InProcessServer.
    • Server: в Server.Options необходимо указать флаг DontListen как true.
  • Эти части официально не документированы, и начало этой функциональности можно проследить через данный PR.

    Этот PR добавляет три вещи:

    1. Функцию InProcessConn() в Server, которая создает net.Pipe для получения соединения с сервером NATS без использования TCP-сокетов.
    2. Опцию DontListen, которая указывает серверу NATS не прослушивать обычный TCP-слушатель.
    3. Канал startupComplete, который закрывается непосредственно перед началом AcceptLoop, и readyForConnections будет ожидать его.

    Основная мотивация для этого состоит в том, что у нас есть приложение, которое может работать либо в монолитном (однопроцессном) режиме, либо в полилитном (многопроцессном) режиме. Мы хотели бы иметь возможность использовать NATS для обоих режимов для простоты, но монолитный режим должен быть способен обслуживать различные платформы, где открытие сокетных соединений либо не имеет смысла (мобильные устройства), либо просто невозможно (WASM). Эти изменения позволят нам использовать NATS полностью внутри процесса.

    Сопутствующий PR nats-io/nats.go#774 добавляет поддержку на стороне клиента.

    Это мой первый PR в этот проект, поэтому заранее приношу извинения, если я что-то упустил.

    /cc @nats-io/core

Рабочий пример для встроенного режима

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// для настройки встроенного сервера NATS
14		// установить DonListen в true
15		DontListen: true,
16	}
17
18	// Инициализация нового сервера с опциями
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Запуск сервера через goroutine
26	go ns.Start()
27
28	// Ожидание готовности сервера к соединениям
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Подключение к серверу через внутрипроцессное соединение
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Подписка на subject
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Вывод данных сообщения
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Остановка сервера (опционально)
49		ns.Shutdown()
50	})
51
52	// Публикация данных в subject
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Ожидание завершения работы сервера
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Теперь видно, что никаких дополнительных network hop не происходит, как и было задумано.

Под капотом

Кратко

diagram1

  • Данная диаграмма последовательности иллюстрирует, какие функции и как работают внутри, когда этот код запускается в main.go, а суть ее заключается в следующем:
    • Через DontListen: true сервер пропускает фазу прослушивания клиентов AcceptLoop.
    • Если опция InProcessServer клиента активирована, она создает in-memory connection, создает pipe через net.Pipe и возвращает конец pipe клиенту в виде net.Conn.
    • Клиент и сервер осуществляют in-process communication через это соединение.

Сервер

AcceptLoop

1// nats-server/server/server.go
2
3// Ожидание клиентов.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Прежде всего, если DontListen установлен в true, фаза прослушивания клиентов AcceptLoop пропускается.
 1// nats-server/server/server.go
 2
 3// AcceptLoop экспортируется для упрощения тестирования.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// Если мы должны были выйти до того, как слушатель будет правильно настроен,
 6	// убедимся, что мы закрыли канал.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Снимок опций сервера.
18	opts := s.getOpts()
19
20	// Настройка состояния, которое может включить завершение работы
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Уведомление о включении TLS.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS required for client connections")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38		}
39	}
40
41	// Если сервер был запущен с RANDOM_PORT (-1), opts.Port будет равен
42	// 0 в начале этой функции. Поэтому нам нужно получить фактический порт.
43	if opts.Port == 0 {
44		// Записать разрешенный порт обратно в опции.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Теперь, когда порт установлен (если он был установлен в RANDOM), установите
49	// Host/Port информации сервера со значениями из Options или
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Отслеживание URL-адресов подключения клиентов. Они могут понадобиться позже.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Сигнал о том, что мы не принимаем новых клиентов
65				s.ldmCh <- true
66				// Теперь ждем завершения работы...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Сообщаем вызывающей стороне, что мы готовы
75	close(clr)
76	clr = nil
77}
  • Примечательно, что функция AcceptLoop выполняет следующие действия: она связана с сетевой связью, такой как TLS и hostPort, и поскольку in-process communication не требует этих частей, их можно опустить.

Клиент

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect попытается подключиться к системе NATS.
 5// URL может содержать семантику имени пользователя/пароля. Например: nats://derek:pass@localhost:4222
 6// Также поддерживаются массивы, разделенные запятыми, например: urlA, urlB.
 7// Опции начинаются со значений по умолчанию, но могут быть переопределены.
 8// Для подключения к websocket-порту сервера NATS используйте схему `ws` или `wss`, например,
 9// `ws://localhost:8080`. Обратите внимание, что схемы websocket не могут быть смешаны с другими (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options могут быть использованы для создания настраиваемого соединения.
 4type Options struct {
 5	// Url представляет собой URL одного сервера NATS, к которому клиент
 6	// будет подключаться. Если также установлена опция Servers, то
 7	// она становится первым сервером в массиве Servers.
 8	Url string
 9
10	// InProcessServer представляет сервер NATS, работающий внутри
11	// того же процесса. Если это установлено, то мы попытаемся подключиться
12	// к серверу напрямую, а не используя внешние TCP-соединения.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Функция Connect, которая устанавливает соединение между NATS-сервером и NATS-клиентом, позволяет настроить URL клиента и опции подключения, а в структуре Options, содержащей эти опции, есть поле InProcessServer типа интерфейса InProcessConnProvider.
1// main.go of example code
2
3// Инициализация нового сервера с опциями
4ns, err := server.NewServer(opts)
5
6//...
7
8// Подключение к серверу через внутрипроцессное соединение
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Когда клиент NATS выполняет Connect, передавая nats.InProcessServer(ns) в поле InProcessServer,
 1// nats-go/nats.go
 2
 3// InProcessServer — это опция, которая попытается установить направление к серверу NATS,
 4// работающему внутри процесса, вместо набора номера через TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • InProcessServer опции заменяется встроенным сервером NATS, и
 1// nats-go/nats.go
 2
 3// createConn подключится к серверу и обернет соответствующие
 4// структуры bufio. Он сделает правильные вещи, когда уже существует
 5// соединение.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// Если у нас есть ссылка на внутрипроцессный сервер, то устанавливаем
15	// соединение, используя его.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • Этот интерфейс выполняется путем вызова InProcessConn InProcessServer в опции, если опция InProcessServer не является nil (действительной) в функции createConn, которая создает соединение, и
 1// nats-server/server/server.go
 2
 3// InProcessConn возвращает внутрипроцессное соединение с сервером,
 4// избегая необходимости использования TCP-слушателя для локальной связности
 5// в рамках одного процесса. Это может быть использовано независимо от
 6// состояния опции DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • вызывает и выполняет InProcessConn, реализованную на сервере.
  • Эта функция вызывается, когда InProcessServer объекта nc (nats connection) в клиенте Go NATS nats.go не равен nil. Она создает соединение (net.Conn) и связывает его с соединением сервера.

Интерфейс, управляемый потребителем, в Go

Тип реализует интерфейс, реализуя его методы. Нет явного объявления намерения, нет ключевого слова "implements". Неявные интерфейсы отделяют определение интерфейса от его реализации, которая затем может появиться в любом пакете без предварительной договоренности.

Интерфейсы реализуются неявно, A Tour of Go

Если тип существует только для реализации интерфейса и никогда не будет иметь экспортированных методов, выходящих за рамки этого интерфейса, нет необходимости экспортировать сам тип.

Общность, Effective Go

  • Этот дизайн интерфейса хорошо отражает то, что в Go часто называют consumer defined interface и structural typing (duck typing), поэтому я хотел бы представить эту тему.
 1// nats-go/nats.go
 2
 3// Options могут быть использованы для создания настраиваемого соединения.
 4type Options struct {
 5	// Url представляет собой URL одного сервера NATS, к которому клиент
 6	// будет подключаться. Если также установлена опция Servers, то
 7	// она становится первым сервером в массиве Servers.
 8	Url string
 9
10	// InProcessServer представляет сервер NATS, работающий внутри
11	// того же процесса. Если это установлено, то мы попытаемся подключиться
12	// к серверу напрямую, а не используя внешние TCP-соединения.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Вернемся к коду. Поле структуры InProcessServer в клиенте nats.go было определено как интерфейс InProcessConnProvider, который выполняет только InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn возвращает внутрипроцессное соединение с сервером,
 4// избегая необходимости использования TCP-слушателя для локальной связности
 5// в рамках одного процесса. Это может быть использовано независимо от
 6// состояния опции DontListen.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Однако тип, который в него входит, — это Server из nats-server, который выполняет не только InProcessConn, но и множество других функций.
  • Причина в том, что в данном случае клиент интересуется только тем, был ли предоставлен интерфейс InProcessConn, а остальные детали не имеют большого значения.
  • Поэтому клиент nats.go создал и использует только InProcessConnProvider, который является consumer defined interface, определяющим только одну функцию: InProcessConn() (net.Conn, error).

Заключение

  • Я кратко рассмотрел встроенный режим NATS, принцип его работы, а также consumer defined interface в Go, который можно наблюдать в коде NATS.
  • Надеюсь, эта информация будет полезна тем, кто использует NATS для вышеупомянутых целей, и на этом я завершаю свою статью.