嵌入式 NATS 如何与 Go 应用程序通信?
入门
关于 NATS
软件应用程序和服务需要交换数据。NATS 是一种基础设施,允许这种以消息形式分段的数据交换。我们称之为“面向消息的中间件”。
借助 NATS,应用程序开发人员可以:
- 轻松构建分布式和可扩展的客户端-服务器应用程序。
- 以通用方式实时存储和分发数据。这可以在各种环境、语言、云提供商和本地系统中灵活实现。
- NATS 是一个用 Go 编写的消息代理。
嵌入式 NATS
如果您的应用程序是用 Go 编写的,并且符合您的用例和部署场景,您甚至可以将 NATS 服务器嵌入到您的应用程序中。
- NATS 的一个特点是,对于用 Go 编写的应用程序,它支持嵌入模式。
- 也就是说,它不是消息代理的常见方式——启动一个独立的代理服务器,然后通过该服务器和应用程序的客户端进行通信——而是可以将代理本身嵌入(embed)到用 Go 编写的应用程序中。
嵌入式 NATS 的优势和用例
- 有一个解释得很清楚的 Youtube 视频,这里就用视频链接代替了。
- 即使不部署单独的消息代理服务器,也可以创建 modular monolith application,从而实现关注点分离,并利用将 NATS 嵌入的优势。此外,还可以实现 single binary deployment。
- 它不仅可以在 platform with no network (WASM) 中使用,还可以在 offline-first application 中有用。
官方文档示例
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialize new server with options
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Start the server via goroutine
22 go ns.Start()
23
24 // Wait for server to be ready for connections
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Connect to server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Subscribe to the subject
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Print message data
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Shutdown the server (optional)
45 ns.Shutdown()
46 })
47
48 // Publish data to the subject
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Wait for server shutdown
52 ns.WaitForShutdown()
53}
- 这是 NATS 官方文档提供的 嵌入式 NATS 示例,但是按照该示例代码进行操作,并不会以嵌入模式进行通信。
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
- 通过
watch 'netstat -an | grep 127.0.0.1'
命令检查与 localhost (127.0.0.1) 之间的网络流量,然后运行go run .
执行该 go 文件,可以看到从 NATS 的默认端口4222
发出的新网络请求正在增加。
嵌入模式的正确配置
为了按照预期以嵌入模式进行通信,需要以下两个选项。
- 客户端:需要添加
InProcessServer
选项。 - 服务器:在
Server.Options
中需要明确将DontListen
标志设置为true
。
- 客户端:需要添加
这些部分并未正式文档化,此功能的起源可以通过此 PR 了解。
This PR adds three things:
InProcessConn()
function toServer
which builds anet.Pipe
to get a connection to the NATS server without using TCP socketsDontListen
option which tells the NATS server not to listen on the usual TCP listenerstartupComplete
channel, which is closed right before we startAcceptLoop
, andreadyForConnections
will wait for it
The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.
An accompanying PR nats-io/nats.go#774 adds support to the client side.
This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.
/cc @nats-io/core
嵌入模式的工作示例
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // for configuring the embeded NATS server
14 // set DonListen as true
15 DontListen: true,
16 }
17
18 // Initialize new server with options
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Start the server via goroutine
26 go ns.Start()
27
28 // Wait for server to be ready for connections
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Connect to server via in-process connection
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Subscribe to the subject
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Print message data
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Shutdown the server (optional)
49 ns.Shutdown()
50 })
51
52 // Publish data to the subject
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Wait for server shutdown
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- 现在可以看到,没有额外的网络跳跃发生,这正是我们所期望的。
内部原理
TL;DR
- 当该代码在
main.go
中运行时,该序列图展示了内部函数如何运作,其核心思想如下:- 通过
DontListen: true
,服务器跳过了客户端监听阶段AcceptLoop
。 - 如果客户端的 Connect 选项中的
InProcessServer
被激活,它将创建一个 in-memory connection,并通过net.Pipe
创建一个管道,然后将管道的末端以net.Conn
类型返回给客户端。 - 客户端和服务器通过该连接进行进程内通信。
- 通过
服务器
AcceptLoop
1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- 首先,如果
DontListen
为 true,则跳过AcceptLoop
这一客户端监听阶段。
1// nats-server/server/server.go
2
3// AcceptLoop is exported for easier testing.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // If we were to exit before the listener is setup properly,
6 // make sure we close the channel.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Snapshot server options.
18 opts := s.getOpts()
19
20 // Setup state that can enable shutdown
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Error listening on port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Listening for client connections on %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Alert of TLS enabled.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS required for client connections")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38 }
39 }
40
41 // If server was started with RANDOM_PORT (-1), opts.Port would be equal
42 // to 0 at the beginning this function. So we need to get the actual port
43 if opts.Port == 0 {
44 // Write resolved port back to options.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Now that port has been set (if it was set to RANDOM), set the
49 // server's info Host/Port with either values from Options or
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Keep track of client connect URLs. We may need them later.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signal that we are not accepting new clients
65 s.ldmCh <- true
66 // Now wait for the Shutdown...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Let the caller know that we are ready
75 close(clr)
76 clr = nil
77}
- 此外,AcceptLoop 函数执行以下过程。这些部分与网络通信相关的,例如
TLS
或hostPort
,在进行进程内通信时是不需要的,因此可以省略。
客户端
InProcessServer
1
2// nats-go/nats.go
3
4// Connect will attempt to connect to the NATS system.
5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
6// Comma separated arrays are also supported, e.g. urlA, urlB.
7// Options start with the defaults but can be overridden.
8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4type Options struct {
5 // Url represents a single NATS server url to which the client
6 // will be connecting. If the Servers option is also set, it
7 // then becomes the first server in the Servers array.
8 Url string
9
10 // InProcessServer represents a NATS server running within the
11 // same process. If this is set then we will attempt to connect
12 // to the server directly rather than using external TCP conns.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- 连接 NATS 服务器和 NATS 客户端的
Connect
函数可以设置客户端 URL 和连接选项,并且在这些选项的 Options 结构体中存在一个类型为InProcessConnProvider
接口的InProcessServer
字段。
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- 当 NATS 客户端进行 Connect 时,如果将
nats.InProcessServer(ns)
传递给InProcessServer
字段,那么
1// nats-go/nats.go
2
3// InProcessServer is an Option that will try to establish a direction to a NATS server
4// running within the process instead of dialing via TCP.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- 选项的 InProcessServer 将被嵌入式 NATS 服务器替换。
1// nats-go/nats.go
2
3// createConn will connect to the server and wrap the appropriate
4// bufio structures. It will do the right thing when an existing
5// connection is in place.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // If we have a reference to an in-process server then establish a
15 // connection using that.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("failed to get in-process connection: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- 该接口在创建连接的
createConn
函数中,如果InProcessServer
选项不为 nil(有效),则执行选项中的 InProcessServer 的InProcessConn
,同时
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// avoiding the need to use a TCP listener for local connectivity
5// within the same process. This can be used regardless of the
6// state of the DontListen option.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- 调用并执行服务器中实现的
InProcessConn
。 - 该函数在 NATS 的 Go 客户端
nats.go
中,当 nc (NATS connection) 的InProcessServer
不为 nil 时被调用,创建连接 (net.Conn
),并将其绑定到服务器的连接上。
Go 的消费者驱动接口
类型通过实现其方法来满足接口。没有明确的意图声明,没有“implements”关键字。隐式接口将接口的定义与其实现分离,然后实现可以在任何包中出现而无需预先安排。
如果一个类型仅仅是为了实现一个接口而存在,并且除了该接口之外永远不会有导出的方法,那么就没有必要导出该类型本身。
- 这种接口设计很好地体现了 Go 中常说的消费者定义接口和结构化类型(鸭子类型),因此我也想介绍一下这个主题。
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4type Options struct {
5 // Url represents a single NATS server url to which the client
6 // will be connecting. If the Servers option is also set, it
7 // then becomes the first server in the Servers array.
8 Url string
9
10 // InProcessServer represents a NATS server running within the
11 // same process. If this is set then we will attempt to connect
12 // to the server directly rather than using external TCP conns.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- 让我们回到代码。nats.go 客户端中的
InProcessServer
选项结构体字段被定义为InProcessConnProvider
接口,该接口只执行InProcessConn
。
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// avoiding the need to use a TCP listener for local connectivity
5// within the same process. This can be used regardless of the
6// state of the DontListen option.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- 然而,其中传入的类型是 nats-server 的
Server
,它不仅执行 InProcessConn,还执行各种其他功能。 - 这是因为在这种情况下,客户端关注的只是是否提供了
InProcessConn
接口,其他方面并不那么重要。 - 因此,nats.go 客户端只创建并使用了
InProcessConn() (net.Conn, error)
这一功能定义的InProcessConnProvider
这一消费者定义接口。
结论
- 本文简要介绍了 NATS 的嵌入模式及其工作原理,以及通过 NATS 代码可以观察到的 Go 语言的消费者定义接口。
- 希望这些信息能对出于上述目的使用 NATS 的读者有所帮助,本文到此结束。