Как встроенные NATS взаимодействуют с go-приложением?
Начало работы
О NATS
Программным приложениям и службам необходимо обмениваться данными. NATS — это инфраструктура, которая позволяет осуществлять такой обмен данными, сегментированный в форме сообщений. Мы называем это «сообщение-ориентированным ПО промежуточного слоя» (message oriented middleware).
С помощью NATS разработчики приложений могут:
- Легко создавать распределенные и масштабируемые клиент-серверные приложения.
- Хранить и распространять данные в режиме реального времени общим образом. Это может быть гибко достигнуто в различных средах, языках, облачных провайдерах и локальных системах.
- NATS — это брокер сообщений, разработанный на Go.
Встроенный NATS
Если ваше приложение написано на Go, и это соответствует вашему варианту использования и сценариям развертывания, вы можете даже встроить сервер NATS в ваше приложение.
Встраивание NATS, документация NATS
- А особенностью NATS является то, что для приложений, разработанных на Go, он поддерживает встроенный режим (embedded mode).
- То есть, вместо обычного для брокеров сообщений подхода, при котором отдельный сервер брокера запускается, а затем приложение обменивается данными с ним через клиент, брокер может быть встроен (embed) непосредственно в приложение, написанное на Go.
Преимущества и сценарии использования встроенного NATS
- Существует хорошо объясняющее видео на Youtube, поэтому я ограничусь ссылкой на него.
- Даже без развертывания отдельного сервера брокера сообщений можно создать modular monolith application для достижения separate of concern и использовать преимущества встроенного NATS. Кроме того, становится возможным single binary deployment.
- Это может быть полезно не только для platform with no network (WASM), но и для offline-first application.
Пример из официальной документации
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Инициализация нового сервера с опциями
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Запуск сервера через goroutine
22 go ns.Start()
23
24 // Ожидание готовности сервера к соединениям
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Подключение к серверу
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Подписка на subject
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Вывод данных сообщения
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Остановка сервера (опционально)
45 ns.Shutdown()
46 })
47
48 // Публикация данных в subject
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Ожидание завершения работы сервера
52 ns.WaitForShutdown()
53}
- Это пример встроенного NATS из официальной документации NATS, но если следовать этому коду, связь не будет осуществляться в режиме embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Если вы запустите этот go-файл с помощью
go run .и будете наблюдать за сетевым трафиком, идущим на localhost (127.0.0.1) и с него, используя командуwatch 'netstat -an | grep 127.0.0.1', вы увидите, что добавляются новые сетевые запросы, исходящие из порта4222, который является портом по умолчанию для NATS.
Правильные конфигурации для встроенного режима
Для того чтобы связь осуществлялась в режиме embedded mode, как задумано, необходимы следующие две опции:
- Client: необходимо добавить опцию
InProcessServer. - Server: в
Server.Optionsнеобходимо указать флагDontListenкакtrue.
- Client: необходимо добавить опцию
Эти части официально не документированы, и начало этой функциональности можно проследить через данный PR.
Этот PR добавляет три вещи:
- Функцию
InProcessConn()вServer, которая создаетnet.Pipeдля получения соединения с сервером NATS без использования TCP-сокетов. - Опцию
DontListen, которая указывает серверу NATS не прослушивать обычный TCP-слушатель. - Канал
startupComplete, который закрывается непосредственно перед началомAcceptLoop, иreadyForConnectionsбудет ожидать его.
Основная мотивация для этого состоит в том, что у нас есть приложение, которое может работать либо в монолитном (однопроцессном) режиме, либо в полилитном (многопроцессном) режиме. Мы хотели бы иметь возможность использовать NATS для обоих режимов для простоты, но монолитный режим должен быть способен обслуживать различные платформы, где открытие сокетных соединений либо не имеет смысла (мобильные устройства), либо просто невозможно (WASM). Эти изменения позволят нам использовать NATS полностью внутри процесса.
Сопутствующий PR nats-io/nats.go#774 добавляет поддержку на стороне клиента.
Это мой первый PR в этот проект, поэтому заранее приношу извинения, если я что-то упустил.
/cc @nats-io/core
- Функцию
Рабочий пример для встроенного режима
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // для настройки встроенного сервера NATS
14 // установить DonListen в true
15 DontListen: true,
16 }
17
18 // Инициализация нового сервера с опциями
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Запуск сервера через goroutine
26 go ns.Start()
27
28 // Ожидание готовности сервера к соединениям
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Подключение к серверу через внутрипроцессное соединение
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Подписка на subject
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Вывод данных сообщения
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Остановка сервера (опционально)
49 ns.Shutdown()
50 })
51
52 // Публикация данных в subject
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Ожидание завершения работы сервера
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Теперь видно, что никаких дополнительных network hop не происходит, как и было задумано.
Под капотом
Кратко
- Данная диаграмма последовательности иллюстрирует, какие функции и как работают внутри, когда этот код запускается в
main.go, а суть ее заключается в следующем:- Через
DontListen: trueсервер пропускает фазу прослушивания клиентовAcceptLoop. - Если опция
InProcessServerклиента активирована, она создает in-memory connection, создает pipe черезnet.Pipeи возвращает конец pipe клиенту в видеnet.Conn. - Клиент и сервер осуществляют in-process communication через это соединение.
- Через
Сервер
AcceptLoop
1// nats-server/server/server.go
2
3// Ожидание клиентов.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Прежде всего, если
DontListenустановлен в true, фаза прослушивания клиентовAcceptLoopпропускается.
1// nats-server/server/server.go
2
3// AcceptLoop экспортируется для упрощения тестирования.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // Если мы должны были выйти до того, как слушатель будет правильно настроен,
6 // убедимся, что мы закрыли канал.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Снимок опций сервера.
18 opts := s.getOpts()
19
20 // Настройка состояния, которое может включить завершение работы
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Error listening on port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Listening for client connections on %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Уведомление о включении TLS.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS required for client connections")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38 }
39 }
40
41 // Если сервер был запущен с RANDOM_PORT (-1), opts.Port будет равен
42 // 0 в начале этой функции. Поэтому нам нужно получить фактический порт.
43 if opts.Port == 0 {
44 // Записать разрешенный порт обратно в опции.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Теперь, когда порт установлен (если он был установлен в RANDOM), установите
49 // Host/Port информации сервера со значениями из Options или
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Отслеживание URL-адресов подключения клиентов. Они могут понадобиться позже.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Сигнал о том, что мы не принимаем новых клиентов
65 s.ldmCh <- true
66 // Теперь ждем завершения работы...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Сообщаем вызывающей стороне, что мы готовы
75 close(clr)
76 clr = nil
77}
- Примечательно, что функция AcceptLoop выполняет следующие действия: она связана с сетевой связью, такой как
TLSиhostPort, и поскольку in-process communication не требует этих частей, их можно опустить.
Клиент
InProcessServer
1
2// nats-go/nats.go
3
4// Connect попытается подключиться к системе NATS.
5// URL может содержать семантику имени пользователя/пароля. Например: nats://derek:pass@localhost:4222
6// Также поддерживаются массивы, разделенные запятыми, например: urlA, urlB.
7// Опции начинаются со значений по умолчанию, но могут быть переопределены.
8// Для подключения к websocket-порту сервера NATS используйте схему `ws` или `wss`, например,
9// `ws://localhost:8080`. Обратите внимание, что схемы websocket не могут быть смешаны с другими (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options могут быть использованы для создания настраиваемого соединения.
4type Options struct {
5 // Url представляет собой URL одного сервера NATS, к которому клиент
6 // будет подключаться. Если также установлена опция Servers, то
7 // она становится первым сервером в массиве Servers.
8 Url string
9
10 // InProcessServer представляет сервер NATS, работающий внутри
11 // того же процесса. Если это установлено, то мы попытаемся подключиться
12 // к серверу напрямую, а не используя внешние TCP-соединения.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Функция
Connect, которая устанавливает соединение между NATS-сервером и NATS-клиентом, позволяет настроить URL клиента и опции подключения, а в структуре Options, содержащей эти опции, есть полеInProcessServerтипа интерфейсаInProcessConnProvider.
1// main.go of example code
2
3// Инициализация нового сервера с опциями
4ns, err := server.NewServer(opts)
5
6//...
7
8// Подключение к серверу через внутрипроцессное соединение
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Когда клиент NATS выполняет Connect, передавая
nats.InProcessServer(ns)в полеInProcessServer,
1// nats-go/nats.go
2
3// InProcessServer — это опция, которая попытается установить направление к серверу NATS,
4// работающему внутри процесса, вместо набора номера через TCP.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
InProcessServerопции заменяется встроенным сервером NATS, и
1// nats-go/nats.go
2
3// createConn подключится к серверу и обернет соответствующие
4// структуры bufio. Он сделает правильные вещи, когда уже существует
5// соединение.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // Если у нас есть ссылка на внутрипроцессный сервер, то устанавливаем
15 // соединение, используя его.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("failed to get in-process connection: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- Этот интерфейс выполняется путем вызова
InProcessConnInProcessServerв опции, если опцияInProcessServerне является nil (действительной) в функцииcreateConn, которая создает соединение, и
1// nats-server/server/server.go
2
3// InProcessConn возвращает внутрипроцессное соединение с сервером,
4// избегая необходимости использования TCP-слушателя для локальной связности
5// в рамках одного процесса. Это может быть использовано независимо от
6// состояния опции DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- вызывает и выполняет
InProcessConn, реализованную на сервере. - Эта функция вызывается, когда
InProcessServerобъектаnc(nats connection) в клиенте Go NATSnats.goне равен nil. Она создает соединение (net.Conn) и связывает его с соединением сервера.
Интерфейс, управляемый потребителем, в Go
Тип реализует интерфейс, реализуя его методы. Нет явного объявления намерения, нет ключевого слова "implements". Неявные интерфейсы отделяют определение интерфейса от его реализации, которая затем может появиться в любом пакете без предварительной договоренности.
Интерфейсы реализуются неявно, A Tour of Go
Если тип существует только для реализации интерфейса и никогда не будет иметь экспортированных методов, выходящих за рамки этого интерфейса, нет необходимости экспортировать сам тип.
- Этот дизайн интерфейса хорошо отражает то, что в Go часто называют consumer defined interface и structural typing (duck typing), поэтому я хотел бы представить эту тему.
1// nats-go/nats.go
2
3// Options могут быть использованы для создания настраиваемого соединения.
4type Options struct {
5 // Url представляет собой URL одного сервера NATS, к которому клиент
6 // будет подключаться. Если также установлена опция Servers, то
7 // она становится первым сервером в массиве Servers.
8 Url string
9
10 // InProcessServer представляет сервер NATS, работающий внутри
11 // того же процесса. Если это установлено, то мы попытаемся подключиться
12 // к серверу напрямую, а не используя внешние TCP-соединения.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Вернемся к коду. Поле структуры
InProcessServerв клиентеnats.goбыло определено как интерфейсInProcessConnProvider, который выполняет толькоInProcessConn.
1// nats-server/server/server.go
2
3// InProcessConn возвращает внутрипроцессное соединение с сервером,
4// избегая необходимости использования TCP-слушателя для локальной связности
5// в рамках одного процесса. Это может быть использовано независимо от
6// состояния опции DontListen.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- Однако тип, который в него входит, — это
Serverизnats-server, который выполняет не только InProcessConn, но и множество других функций. - Причина в том, что в данном случае клиент интересуется только тем, был ли предоставлен интерфейс
InProcessConn, а остальные детали не имеют большого значения. - Поэтому клиент
nats.goсоздал и использует толькоInProcessConnProvider, который является consumer defined interface, определяющим только одну функцию:InProcessConn() (net.Conn, error).
Заключение
- Я кратко рассмотрел встроенный режим NATS, принцип его работы, а также consumer defined interface в Go, который можно наблюдать в коде NATS.
- Надеюсь, эта информация будет полезна тем, кто использует NATS для вышеупомянутых целей, и на этом я завершаю свою статью.