Как вградените NATS комуникират с go приложение?
Начало
Относно NATS
Софтуерните приложения и услуги трябва да обменят данни. NATS е инфраструктура, която позволява такъв обмен на данни, сегментиран във формата на съобщения. Ние наричаме това „ориентиран към съобщения междинен софтуер“.
С NATS разработчиците на приложения могат:
- Без усилие да изграждат разпределени и мащабируеми клиент-сървър приложения.
- Да съхраняват и разпространяват данни в реално време по общ начин. Това може гъвкаво да бъде постигнато в различни среди, езици, доставчици на облачни услуги и локални системи.
- NATS е брокер за съобщения, изграден с Go.
Вграден NATS
Ако вашето приложение е на Go и ако то отговаря на вашия случай на употреба и сценарии за внедряване, можете дори да вградите NATS сървър във вашето приложение.
- Има и една особеност на NATS, а именно, че за приложения, изградени с Go, се поддържа embedded mode.
- Тоест, вместо обичайния начин за брокери на съобщения, който включва стартиране на отделен брокер сървър и комуникация с него чрез клиент на приложението, самият брокер може да бъде вграден (embed) в приложението, написано на Go.
Предимства и случаи на употреба на вграден NATS
- Има добре обяснено видео в Youtube, така че ще се задоволя с линка към видеото.
- Дори без да разполагате с отделен сървър за брокер на съобщения, можете да създадете modular monolith application и да постигнете separate of concern, като същевременно използвате предимството да вградите NATS. В допълнение, става възможно и single binary deployment.
- Може да се използва не само за platform with no network (WASM), но и за offline-first application.
Пример от официалната документация
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialize new server with options
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Start the server via goroutine
22 go ns.Start()
23
24 // Wait for server to be ready for connections
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Connect to server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Subscribe to the subject
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Print message data
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Shutdown the server (optional)
45 ns.Shutdown()
46 })
47
48 // Publish data to the subject
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Wait for server shutdown
52 ns.WaitForShutdown()
53}
- Това е пример за Embedded NATS, посочен в официалната документация на NATS, но ако се изпълни точно този примерен код, комуникацията няма да се осъществи в embedding mode.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- Чрез командата
watch 'netstat -an | grep 127.0.0.1'
можете да наблюдавате мрежовия трафик към и от localhost (127.0.0.1), и когато изпълните файла Go сgo run .
, ще видите, че се добавят нови мрежови заявки, изхождащи от стандартния порт на NATS4222
.
Правилни конфигурации за embedding mode
За да се осъществи комуникация в embedded mode, както е предвидено, са необходими следните две опции.
- Client: Трябва да се добави опцията
InProcessServer
. - Server: Във
Server.Options
трябва да се зададе флагътDontListen
наtrue
.
- Client: Трябва да се добави опцията
Тези части не са официално документирани, като началото на тази функционалност може да се проследи чрез този PR.
This PR adds three things:
InProcessConn()
function toServer
which builds anet.Pipe
to get a connection to the NATS server without using TCP socketsDontListen
option which tells the NATS server not to listen on the usual TCP listenerstartupComplete
channel, which is closed right before we startAcceptLoop
, andreadyForConnections
will wait for it
The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.
An accompanying PR nats-io/nats.go#774 adds support to the client side.
This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.
/cc @nats-io/core
Работещ пример за embedded mode
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // for configuring the embeded NATS server
14 // set DonListen as true
15 DontListen: true,
16 }
17
18 // Initialize new server with options
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Start the server via goroutine
26 go ns.Start()
27
28 // Wait for server to be ready for connections
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Connect to server via in-process connection
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Subscribe to the subject
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Print message data
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Shutdown the server (optional)
49 ns.Shutdown()
50 })
51
52 // Publish data to the subject
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Wait for server shutdown
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- Сега може да се види, че не възникват допълнителни network hop, както е предвидено.
Под капака
TL;DR
- Тази диаграма на последователността показва какви функции и как работят вътрешно, когато този код се изпълни от
main.go
, а основната идея е следната.- Чрез
DontListen: true
сървърът пропуска фазата на слушане на клиенти, нареченаAcceptLoop
. - Ако
InProcessServer
сред опциите за свързване на клиента е активиран, се създава in-memory connection и чрезnet.Pipe
се създава pipe, след което краят на pipe се връща на клиента като типnet.Conn
. - Клиентът и сървърът осъществяват in-process communication чрез тази връзка.
- Чрез
Сървър
AcceptLoop
1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- Първо, ако
DontListen
е true, фазата на слушане на клиенти, нареченаAcceptLoop
, се пропуска.
1// nats-server/server/server.go
2
3// AcceptLoop is exported for easier testing.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // If we were to exit before the listener is setup properly,
6 // make sure we close the channel.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Snapshot server options.
18 opts := s.getOpts()
19
20 // Setup state that can enable shutdown
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Error listening on port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Listening for client connections on %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Alert of TLS enabled.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS required for client connections")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38 }
39 }
40
41 // If server was started with RANDOM_PORT (-1), opts.Port would be equal
42 // to 0 at the beginning this function. So we need to get the actual port
43 if opts.Port == 0 {
44 // Write resolved port back to options.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Now that port has been set (if it was set to RANDOM), set the
49 // server's info Host/Port with either values from Options or
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Keep track of client connect URLs. We may need them later.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signal that we are not accepting new clients
65 s.ldmCh <- true
66 // Now wait for the Shutdown...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Let the caller know that we are ready
75 close(clr)
76 clr = nil
77}
- За сведение, функцията AcceptLoop изпълнява следните процеси. Части като
TLS
илиhostPort
, които са свързани с мрежова комуникация, могат да бъдат пропуснати, тъй като не са необходими при in-process communication.
Клиент
InProcessServer
1
2// nats-go/nats.go
3
4// Connect will attempt to connect to the NATS system.
5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
6// Comma separated arrays are also supported, e.g. urlA, urlB.
7// Options start with the defaults but can be overridden.
8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4type Options struct {
5 // Url represents a single NATS server url to which the client
6 // will be connecting. If the Servers option is also set, it
7 // then becomes the first server in the Servers array.
8 Url string
9
10 // InProcessServer represents a NATS server running within the
11 // same process. If this is set then we will attempt to connect
12 // to the server directly rather than using external TCP conns.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
- Функцията
Connect
, която осъществява връзка между NATS сървър и NATS клиент, може да конфигурира URL на клиента и опции за свързване, а структурата Options, която събира тези опции, съдържа полеInProcessServer
от тип интерфейсInProcessConnProvider
.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- Когато клиентът на NATS осъществява връзка, ако
nats.InProcessServer(ns)
бъде предаден на полетоInProcessServer
, тогава
1// nats-go/nats.go
2
3// InProcessServer is an Option that will try to establish a direction to a NATS server
4// running within the process instead of dialing via TCP.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- InProcessServer на опцията се заменя с embedded NATS сървър и
1// nats-go/nats.go
2
3// createConn will connect to the server and wrap the appropriate
4// bufio structures. It will do the right thing when an existing
5// connection is in place.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // If we have a reference to an in-process server then establish a
15 // connection using that.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("failed to get in-process connection: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- Този интерфейс, във функцията
createConn
, която създава връзка, ако опциятаInProcessServer
не е nil (валидна), изпълняваInProcesConn
на InProcessServer в опцията и
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// avoiding the need to use a TCP listener for local connectivity
5// within the same process. This can be used regardless of the
6// state of the DontListen option.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- Изпълнява се
InProcessConn
, имплементиран в сървъра. - Тази функция, когато
InProcessServer
на nc (NATS връзка) в Go клиента на NATS (nats.go
) не е nil, се извиква, създава връзка (net.Conn
) и я свързва с връзката на сървъра.
Consumer driven interface на Go
Един тип имплементира интерфейс, като имплементира неговите методи. Няма изрично деклариране на намерение, няма ключова дума „implements“. Неявните интерфейси отделят дефиницията на интерфейс от неговата имплементация, която след това може да се появи във всеки пакет без предварителна уговорка.
Интерфейсите се имплементират неявно, Обиколка на Go
If a type exists only to implement an interface and will never have exported methods beyond that interface, there is no need to export the type itself.
- Този дизайн на интерфейса добре отразява често споменаваните в Go consumer defined interface и structural typing (duck typing), затова бих искал да представя и тази тема.
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4type Options struct {
5 // Url represents a single NATS server url to which the client
6 // will be connecting. If the Servers option is also set, it
7 // then becomes the first server in the Servers array.
8 Url string
9
10 // InProcessServer represents a NATS server running within the
11 // same process. If this is set then we will attempt to connect
12 // to the server directly rather than using external TCP conns.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- Нека се върнем към кода. Полето на структурата
InProcessServer
в клиентаnats.go
е дефинирано като интерфейсInProcessConnProvider
, който изпълнява самоInProcessConn
.
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// avoiding the need to use a TCP listener for local connectivity
5// within the same process. This can be used regardless of the
6// state of the DontListen option.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- Въпреки това, типът, който се подава в него, е
Server
от nats-server, който изпълнява различни функции, освен InProcessConn. - Това е така, защото в дадената ситуация клиентът се интересува само от това дали е предоставен интерфейсът
InProcessConn
, а останалото не е толкова важно. - Следователно, клиентът
nats.go
използва само consumer defined interface, нареченInProcessConnProvider
, който дефинира само функциятаInProcessConn() (net.Conn, error)
.
Заключение
- Разгледахме накратко embedded mode на NATS и неговия начин на работа, както и consumer defined interface на Go, който може да се види чрез кода на NATS.
- Надявам се тази информация да бъде полезна за хората, които използват NATS за горепосочените цели, и с това завършвам тази статия.