Как встроенные NATS взаимодействуют с go-приложением?
Начало работы
О NATS
Программным приложениям и сервисам необходимо обмениваться данными. NATS — это инфраструктура, которая позволяет осуществлять такой обмен данными, сегментированный в форме сообщений. Мы называем это «сообщение-ориентированное связующее ПО».
С NATS разработчики приложений могут:
- Легко создавать распределенные и масштабируемые клиент-серверные приложения.
- Хранить и распространять данные в реальном времени в общем виде. Это может быть гибко достигнуто в различных средах, языках, облачных провайдерах и локальных системах.
- NATS — это брокер сообщений, разработанный на Go.
Встроенный NATS
Если ваше приложение написано на Go, и это соответствует вашим сценариям использования и развертывания, вы можете даже встроить сервер NATS в ваше приложение.
Встраивание NATS, Документация NATS
- Более того, NATS обладает уникальной особенностью: для приложений, разработанных на Go, поддерживается режим embedded.
- Это означает, что вместо обычного для брокера сообщений подхода, при котором отдельный сервер брокера запускается, а затем приложение взаимодействует с ним через клиент, сам брокер может быть встроен (embed) в приложение, созданное на Go.
Преимущества и сценарии использования встроенного NATS
- Существует хорошо объясняющее это видео на Youtube, поэтому я просто дам ссылку на него.
- Даже без развертывания отдельного сервера брокера сообщений можно получить преимущество от встраивания NATS, создавая modular monolith application, которое достигает separate of concern. Кроме того, становится возможным single binary deployment.
- Может быть полезно не только для platform with no network (WASM), но и для offline-first application.
Пример из официальной документации
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialize new server with options
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Start the server via goroutine
22 go ns.Start()
23
24 // Wait for server to be ready for connections
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Connect to server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Subscribe to the subject
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Print message data
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Shutdown the server (optional)
45 ns.Shutdown()
46 })
47
48 // Publish data to the subject
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Wait for server shutdown
52 ns.WaitForShutdown()
53}
- Это пример встроенного NATS, приведенный в официальной документации NATS, но если следовать этому коду, связь не будет осуществляться в режиме embedding.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Если выполнить этот файл Go с помощью
go run .
, наблюдая за сетевым трафиком на localhost (127.0.0.1) с помощью командыwatch 'netstat -an | grep 127.0.0.1'
, можно увидеть новые сетевые запросы, исходящие из порта NATS по умолчанию4222
.
Правильные конфигурации для режима embedded
Для того чтобы связь осуществлялась в режиме embedded, как и предполагалось, необходимы следующие два параметра:
- Client: необходимо добавить опцию
InProcessServer
. - Server: в
Server.Options
флагDontListen
должен быть явно установлен вtrue
.
- Client: необходимо добавить опцию
Эти детали не были официально задокументированы, и истоки этой функции можно проследить по этому PR.
Этот PR добавляет три вещи:
- Функцию
InProcessConn()
вServer
, которая создаетnet.Pipe
для получения соединения с сервером NATS без использования TCP-сокетов. - Опцию
DontListen
, которая указывает серверу NATS не прослушивать обычный TCP-слушатель. - Канал
startupComplete
, который закрывается непосредственно перед началомAcceptLoop
, иreadyForConnections
будет ждать его.
Основная мотивация этого заключается в том, что у нас есть приложение, которое может работать как в монолитном (однопроцессном) режиме, так и в полилитном (многопроцессном) режиме. Мы хотели бы иметь возможность использовать NATS для обоих режимов для простоты, но монолитный режим должен быть способен обслуживать различные платформы, где открытие сокетных соединений либо не имеет смысла (мобильные устройства), либо просто невозможно (WASM). Эти изменения позволят нам использовать NATS полностью внутри процесса.
Сопутствующий PR nats-io/nats.go#774 добавляет поддержку на стороне клиента.
Это мой первый PR в этот проект, поэтому заранее приношу извинения, если я что-то очевидное упустил.
/cc @nats-io/core
- Функцию
Рабочий пример для режима embedded
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // for configuring the embeded NATS server
14 // set DonListen as true
15 DontListen: true,
16 }
17
18 // Initialize new server with options
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Start the server via goroutine
26 go ns.Start()
27
28 // Wait for server to be ready for connections
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Connect to server via in-process connection
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Subscribe to the subject
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Print message data
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Shutdown the server (optional)
49 ns.Shutdown()
50 })
51
52 // Publish data to the subject
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Wait for server shutdown
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Теперь можно видеть, что никаких дополнительных network hop не возникает, как и предполагалось.
Под капотом
TL;DR
- Это sequence diagram, показывающая, как внутренне работают функции при выполнении данного кода в
main.go
, и основные моменты заключаются в следующем:- С помощью
DontListen: true
сервер пропускает фазу прослушивания клиента под названиемAcceptLoop
. - Если активирована опция
InProcessServer
из Connect клиента, создается in-memory connection, затем с помощьюnet.Pipe
создается pipe, и конец pipe возвращается клиенту в видеnet.Conn
. - Клиент и сервер осуществляют in-process communication через это соединение.
- С помощью
Сервер
AcceptLoop
1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Прежде всего, если
DontListen
имеет значение true, фаза прослушивания клиентаAcceptLoop
пропускается.
1// nats-server/server/server.go
2
3// AcceptLoop is exported for easier testing.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // If we were to exit before the listener is setup properly,
6 // make sure we close the channel.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Snapshot server options.
18 opts := s.getOpts()
19
20 // Setup state that can enable shutdown
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Error listening on port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Listening for client connections on %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Alert of TLS enabled.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS required for client connections")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38 }
39 }
40
41 // If server was started with RANDOM_PORT (-1), opts.Port would be equal
42 // to 0 at the beginning this function. So we need to get the actual port
43 if opts.Port == 0 {
44 // Write resolved port back to options.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Now that port has been set (if it was set to RANDOM), set the
49 // server's info Host/Port with either values from Options or
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Keep track of client connect URLs. We may need them later.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signal that we are not accepting new clients
65 s.ldmCh <- true
66 // Now wait for the Shutdown...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Let the caller know that we are ready
75 close(clr)
76 clr = nil
77}
- Примечательно, что функция AcceptLoop выполняет следующие шаги. Это части, связанные с сетевым взаимодействием, такие как
TLS
илиhostPort
, которые можно опустить, поскольку они не нужны при in-process communication.
Клиент
InProcessServer
1
2// nats-go/nats.go
3
4// Connect will attempt to connect to the NATS system.
5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
6// Comma separated arrays are also supported, e.g. urlA, urlB.
7// Options start with the defaults but can be overridden.
8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4type Options struct {
5 // Url represents a single NATS server url to which the client
6 // will be connecting. If the Servers option is also set, it
7 // then becomes the first server in the Servers array.
8 Url string
9
10 // InProcessServer represents a NATS server running within the
11 // same process. If this is set then we will attempt to connect
12 // to the server directly rather than using external TCP conns.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Функция
Connect
, которая устанавливает соединение между сервером NATS и клиентом NATS, позволяет настроить URL клиента и опции соединения. В структуре Options, которая содержит эти опции, присутствует полеInProcessServer
типа интерфейсаInProcessConnProvider
.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Когда клиент NATS инициирует Connect, передавая
nats.InProcessServer(ns)
в полеInProcessServer
,
1// nats-go/nats.go
2
3// createConn will connect to the server and wrap the appropriate
4// bufio structures. It will do the right thing when an existing
5// connection is in place.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // If we have a reference to an in-process server then establish a
15 // connection using that.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("failed to get in-process connection: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- этот интерфейс, если опция
InProcessServer
не является nil (является действительной) в функцииcreateConn
, которая создает соединение, выполняетInProcessConn
из InProcessServer, находящегося в опции, и
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// avoiding the need to use a TCP listener for local connectivity
5// within the same process. This can be used regardless of the
6// state of the DontListen option.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- вызывает реализованную на сервере функцию
InProcessConn
. - Эта функция вызывается из
nats.go
(Go-клиент NATS), еслиInProcessServer
в nc (соединение NATS) не равен nil, и создает соединение (net.Conn
), связывая его с соединением сервера.
Consumer driven interface в Go
Тип реализует интерфейс, реализуя его методы. Нет явного объявления намерения, нет ключевого слова "implements". Неявные интерфейсы отделяют определение интерфейса от его реализации, которая затем может появиться в любом пакете без предварительной договоренности.
Интерфейсы реализуются неявно, A Tour of Go
Если тип существует только для реализации интерфейса и никогда не будет иметь экспортированных методов, помимо этого интерфейса, нет необходимости экспортировать сам тип.
- Этот дизайн интерфейса хорошо отражает то, что в Go часто называют consumer defined interface и structural typing (duck typing), поэтому я хотел бы представить эту тему.
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4type Options struct {
5 // Url represents a single NATS server url to which the client
6 // will be connecting. If the Servers option is also set, it
7 // then becomes the first server in the Servers array.
8 Url string
9
10 // InProcessServer represents a NATS server running within the
11 // same process. If this is set then we will attempt to connect
12 // to the server directly rather than using external TCP conns.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Вернемся к коду. В клиенте nats.go поле структуры опций
InProcessServer
определено как интерфейсInProcessConnProvider
, который выполняет толькоInProcessConn
.
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// avoiding the need to use a TCP listener for local connectivity
5// within the same process. This can be used regardless of the
6// state of the DontListen option.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- Однако тип, который в него входит, — это
Server
из nats-server, который выполняет множество функций, помимо InProcessConn. - Это потому, что в данной ситуации для клиента важен только факт предоставления интерфейса
InProcessConn
, а остальные аспекты не имеют большого значения. - Поэтому клиент nats.go создает и использует только consumer defined interface под названием
InProcessConnProvider
, который определяет только функциюInProcessConn() (net.Conn, error)
.
Заключение
- Я кратко рассмотрел встроенный режим NATS, принцип его работы, а также consumer defined interface в Go, который можно наблюдать в коде NATS.
- Надеюсь, эта информация будет полезна тем, кто использует NATS для вышеуказанных целей, и на этом я завершаю свою статью.