GoSuda

Как встроенные NATS взаимодействуют с go-приложением?

By prravda
views ...

Начало работы

О NATS

Программным приложениям и сервисам необходимо обмениваться данными. NATS — это инфраструктура, которая позволяет осуществлять такой обмен данными, сегментированный в форме сообщений. Мы называем это «сообщение-ориентированное связующее ПО».

С NATS разработчики приложений могут:

  • Легко создавать распределенные и масштабируемые клиент-серверные приложения.
  • Хранить и распространять данные в реальном времени в общем виде. Это может быть гибко достигнуто в различных средах, языках, облачных провайдерах и локальных системах.

Что такое NATS, Документация NATS

  • NATS — это брокер сообщений, разработанный на Go.

Встроенный NATS

Если ваше приложение написано на Go, и это соответствует вашим сценариям использования и развертывания, вы можете даже встроить сервер NATS в ваше приложение.

Встраивание NATS, Документация NATS

  • Более того, NATS обладает уникальной особенностью: для приложений, разработанных на Go, поддерживается режим embedded.
  • Это означает, что вместо обычного для брокера сообщений подхода, при котором отдельный сервер брокера запускается, а затем приложение взаимодействует с ним через клиент, сам брокер может быть встроен (embed) в приложение, созданное на Go.

Преимущества и сценарии использования встроенного NATS

  • Существует хорошо объясняющее это видео на Youtube, поэтому я просто дам ссылку на него.
  • Даже без развертывания отдельного сервера брокера сообщений можно получить преимущество от встраивания NATS, создавая modular monolith application, которое достигает separate of concern. Кроме того, становится возможным single binary deployment.
  • Может быть полезно не только для platform with no network (WASM), но и для offline-first application.

Пример из официальной документации

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialize new server with options
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start the server via goroutine
22    go ns.Start()
23
24    // Wait for server to be ready for connections
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Connect to server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Subscribe to the subject
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Print message data
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Shutdown the server (optional)
45        ns.Shutdown()
46    })
47
48    // Publish data to the subject
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Wait for server shutdown
52    ns.WaitForShutdown()
53}
  • Это пример встроенного NATS, приведенный в официальной документации NATS, но если следовать этому коду, связь не будет осуществляться в режиме embedding.
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • Если выполнить этот файл Go с помощью go run ., наблюдая за сетевым трафиком на localhost (127.0.0.1) с помощью команды watch 'netstat -an | grep 127.0.0.1', можно увидеть новые сетевые запросы, исходящие из порта NATS по умолчанию 4222.

Правильные конфигурации для режима embedded

  • Для того чтобы связь осуществлялась в режиме embedded, как и предполагалось, необходимы следующие два параметра:

    • Client: необходимо добавить опцию InProcessServer.
    • Server: в Server.Options флаг DontListen должен быть явно установлен в true.
  • Эти детали не были официально задокументированы, и истоки этой функции можно проследить по этому PR.

    Этот PR добавляет три вещи:

    1. Функцию InProcessConn() в Server, которая создает net.Pipe для получения соединения с сервером NATS без использования TCP-сокетов.
    2. Опцию DontListen, которая указывает серверу NATS не прослушивать обычный TCP-слушатель.
    3. Канал startupComplete, который закрывается непосредственно перед началом AcceptLoop, и readyForConnections будет ждать его.

    Основная мотивация этого заключается в том, что у нас есть приложение, которое может работать как в монолитном (однопроцессном) режиме, так и в полилитном (многопроцессном) режиме. Мы хотели бы иметь возможность использовать NATS для обоих режимов для простоты, но монолитный режим должен быть способен обслуживать различные платформы, где открытие сокетных соединений либо не имеет смысла (мобильные устройства), либо просто невозможно (WASM). Эти изменения позволят нам использовать NATS полностью внутри процесса.

    Сопутствующий PR nats-io/nats.go#774 добавляет поддержку на стороне клиента.

    Это мой первый PR в этот проект, поэтому заранее приношу извинения, если я что-то очевидное упустил.

    /cc @nats-io/core

Рабочий пример для режима embedded

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// for configuring the embeded NATS server
14		// set DonListen as true
15		DontListen: true,
16	}
17
18	// Initialize new server with options
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start the server via goroutine
26	go ns.Start()
27
28	// Wait for server to be ready for connections
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Connect to server via in-process connection
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Subscribe to the subject
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Print message data
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Shutdown the server (optional)
49		ns.Shutdown()
50	})
51
52	// Publish data to the subject
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Wait for server shutdown
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • Теперь можно видеть, что никаких дополнительных network hop не возникает, как и предполагалось.

Под капотом

TL;DR

diagram1

  • Это sequence diagram, показывающая, как внутренне работают функции при выполнении данного кода в main.go, и основные моменты заключаются в следующем:
    • С помощью DontListen: true сервер пропускает фазу прослушивания клиента под названием AcceptLoop.
    • Если активирована опция InProcessServer из Connect клиента, создается in-memory connection, затем с помощью net.Pipe создается pipe, и конец pipe возвращается клиенту в виде net.Conn.
    • Клиент и сервер осуществляют in-process communication через это соединение.

Сервер

AcceptLoop

1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • Прежде всего, если DontListen имеет значение true, фаза прослушивания клиента AcceptLoop пропускается.
 1// nats-server/server/server.go
 2
 3// AcceptLoop is exported for easier testing.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// If we were to exit before the listener is setup properly,
 6	// make sure we close the channel.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot server options.
18	opts := s.getOpts()
19
20	// Setup state that can enable shutdown
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Alert of TLS enabled.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS required for client connections")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38		}
39	}
40
41	// If server was started with RANDOM_PORT (-1), opts.Port would be equal
42	// to 0 at the beginning this function. So we need to get the actual port
43	if opts.Port == 0 {
44		// Write resolved port back to options.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Now that port has been set (if it was set to RANDOM), set the
49	// server's info Host/Port with either values from Options or
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Keep track of client connect URLs. We may need them later.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signal that we are not accepting new clients
65				s.ldmCh <- true
66				// Now wait for the Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Let the caller know that we are ready
75	close(clr)
76	clr = nil
77}
  • Примечательно, что функция AcceptLoop выполняет следующие шаги. Это части, связанные с сетевым взаимодействием, такие как TLS или hostPort, которые можно опустить, поскольку они не нужны при in-process communication.

Клиент

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect will attempt to connect to the NATS system.
 5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
 6// Comma separated arrays are also supported, e.g. urlA, urlB.
 7// Options start with the defaults but can be overridden.
 8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
 9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Функция Connect, которая устанавливает соединение между сервером NATS и клиентом NATS, позволяет настроить URL клиента и опции соединения. В структуре Options, которая содержит эти опции, присутствует поле InProcessServer типа интерфейса InProcessConnProvider.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • Когда клиент NATS инициирует Connect, передавая nats.InProcessServer(ns) в поле InProcessServer,
 1// nats-go/nats.go
 2
 3// createConn will connect to the server and wrap the appropriate
 4// bufio structures. It will do the right thing when an existing
 5// connection is in place.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// If we have a reference to an in-process server then establish a
15	// connection using that.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • этот интерфейс, если опция InProcessServer не является nil (является действительной) в функции createConn, которая создает соединение, выполняет InProcessConn из InProcessServer, находящегося в опции, и
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • вызывает реализованную на сервере функцию InProcessConn.
  • Эта функция вызывается из nats.go (Go-клиент NATS), если InProcessServer в nc (соединение NATS) не равен nil, и создает соединение (net.Conn), связывая его с соединением сервера.

Consumer driven interface в Go

Тип реализует интерфейс, реализуя его методы. Нет явного объявления намерения, нет ключевого слова "implements". Неявные интерфейсы отделяют определение интерфейса от его реализации, которая затем может появиться в любом пакете без предварительной договоренности.

Интерфейсы реализуются неявно, A Tour of Go

Если тип существует только для реализации интерфейса и никогда не будет иметь экспортированных методов, помимо этого интерфейса, нет необходимости экспортировать сам тип.

Общие положения, Effective Go

  • Этот дизайн интерфейса хорошо отражает то, что в Go часто называют consumer defined interface и structural typing (duck typing), поэтому я хотел бы представить эту тему.
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • Вернемся к коду. В клиенте nats.go поле структуры опций InProcessServer определено как интерфейс InProcessConnProvider, который выполняет только InProcessConn.
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • Однако тип, который в него входит, — это Server из nats-server, который выполняет множество функций, помимо InProcessConn.
  • Это потому, что в данной ситуации для клиента важен только факт предоставления интерфейса InProcessConn, а остальные аспекты не имеют большого значения.
  • Поэтому клиент nats.go создает и использует только consumer defined interface под названием InProcessConnProvider, который определяет только функцию InProcessConn() (net.Conn, error).

Заключение

  • Я кратко рассмотрел встроенный режим NATS, принцип его работы, а также consumer defined interface в Go, который можно наблюдать в коде NATS.
  • Надеюсь, эта информация будет полезна тем, кто использует NATS для вышеуказанных целей, и на этом я завершаю свою статью.