GoSuda

嵌入式 NATS 如何与 Go 应用程序通信?

By prravda
views ...

入门

关于 NATS

软件应用程序和服务需要交换数据。NATS 是一种基础设施,允许这种以消息形式分段的数据交换。我们称之为“面向消息的中间件”。

借助 NATS,应用程序开发人员可以:

  • 轻松构建分布式和可扩展的客户端-服务器应用程序。
  • 以通用方式实时存储和分发数据。这可以在各种环境、语言、云提供商和本地系统中灵活实现。

什么是 NATS,NATS 文档

  • NATS 是一个用 Go 编写的消息代理。

嵌入式 NATS

如果您的应用程序是用 Go 编写的,并且符合您的用例和部署场景,您甚至可以将 NATS 服务器嵌入到您的应用程序中。

嵌入式 NATS, NATS 文档

  • NATS 的一个特点是,对于用 Go 编写的应用程序,它支持嵌入模式。
  • 也就是说,它不是消息代理的常见方式——启动一个独立的代理服务器,然后通过该服务器和应用程序的客户端进行通信——而是可以将代理本身嵌入(embed)到用 Go 编写的应用程序中。

嵌入式 NATS 的优势和用例

  • 有一个解释得很清楚的 Youtube 视频,这里就用视频链接代替了。
  • 即使不部署单独的消息代理服务器,也可以创建 modular monolith application,从而实现关注点分离,并利用将 NATS 嵌入的优势。此外,还可以实现 single binary deployment。
  • 它不仅可以在 platform with no network (WASM) 中使用,还可以在 offline-first application 中有用。

官方文档示例

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // Initialize new server with options
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // Start the server via goroutine
22    go ns.Start()
23
24    // Wait for server to be ready for connections
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // Connect to server
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // Subscribe to the subject
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // Print message data
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // Shutdown the server (optional)
45        ns.Shutdown()
46    })
47
48    // Publish data to the subject
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // Wait for server shutdown
52    ns.WaitForShutdown()
53}
  • 这是 NATS 官方文档提供的 嵌入式 NATS 示例,但是按照该示例代码进行操作,并不会以嵌入模式进行通信。
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • 通过 watch 'netstat -an | grep 127.0.0.1' 命令检查与 localhost (127.0.0.1) 之间的网络流量,然后运行 go run . 执行该 go 文件,可以看到从 NATS 的默认端口 4222 发出的新网络请求正在增加。

嵌入模式的正确配置

  • 为了按照预期以嵌入模式进行通信,需要以下两个选项。

    • 客户端:需要添加 InProcessServer 选项。
    • 服务器:在 Server.Options 中需要明确将 DontListen 标志设置为 true
  • 这些部分并未正式文档化,此功能的起源可以通过此 PR 了解。

    This PR adds three things:

    1. InProcessConn() function to Server which builds a net.Pipe to get a connection to the NATS server without using TCP sockets
    2. DontListen option which tells the NATS server not to listen on the usual TCP listener
    3. startupComplete channel, which is closed right before we start AcceptLoop, and readyForConnections will wait for it

    The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.

    An accompanying PR nats-io/nats.go#774 adds support to the client side.

    This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.

    /cc @nats-io/core

嵌入模式的工作示例

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// for configuring the embeded NATS server
14		// set DonListen as true
15		DontListen: true,
16	}
17
18	// Initialize new server with options
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// Start the server via goroutine
26	go ns.Start()
27
28	// Wait for server to be ready for connections
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// Connect to server via in-process connection
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// Subscribe to the subject
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// Print message data
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// Shutdown the server (optional)
49		ns.Shutdown()
50	})
51
52	// Publish data to the subject
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// Wait for server shutdown
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • 现在可以看到,没有额外的网络跳跃发生,这正是我们所期望的。

内部原理

TL;DR

diagram1

  • 当该代码在 main.go 中运行时,该序列图展示了内部函数如何运作,其核心思想如下:
    • 通过 DontListen: true,服务器跳过了客户端监听阶段 AcceptLoop
    • 如果客户端的 Connect 选项中的 InProcessServer 被激活,它将创建一个 in-memory connection,并通过 net.Pipe 创建一个管道,然后将管道的末端以 net.Conn 类型返回给客户端。
    • 客户端和服务器通过该连接进行进程内通信。

服务器

AcceptLoop

1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • 首先,如果 DontListen 为 true,则跳过 AcceptLoop 这一客户端监听阶段。
 1// nats-server/server/server.go
 2
 3// AcceptLoop is exported for easier testing.
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// If we were to exit before the listener is setup properly,
 6	// make sure we close the channel.
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// Snapshot server options.
18	opts := s.getOpts()
19
20	// Setup state that can enable shutdown
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// Alert of TLS enabled.
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS required for client connections")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38		}
39	}
40
41	// If server was started with RANDOM_PORT (-1), opts.Port would be equal
42	// to 0 at the beginning this function. So we need to get the actual port
43	if opts.Port == 0 {
44		// Write resolved port back to options.
45		opts.Port = l.Addr().(*net.TCPAddr).Port
46	}
47
48	// Now that port has been set (if it was set to RANDOM), set the
49	// server's info Host/Port with either values from Options or
50	// ClientAdvertise.
51	if err := s.setInfoHostPort(); err != nil {
52		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53		l.Close()
54		s.mu.Unlock()
55		return
56	}
57	// Keep track of client connect URLs. We may need them later.
58	s.clientConnectURLs = s.getClientConnectURLs()
59	s.listener = l
60
61	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62		func(_ error) bool {
63			if s.isLameDuckMode() {
64				// Signal that we are not accepting new clients
65				s.ldmCh <- true
66				// Now wait for the Shutdown...
67				<-s.quitCh
68				return true
69			}
70			return false
71		})
72	s.mu.Unlock()
73
74	// Let the caller know that we are ready
75	close(clr)
76	clr = nil
77}
  • 此外,AcceptLoop 函数执行以下过程。这些部分与网络通信相关的,例如 TLShostPort,在进行进程内通信时是不需要的,因此可以省略。

客户端

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// Connect will attempt to connect to the NATS system.
 5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
 6// Comma separated arrays are also supported, e.g. urlA, urlB.
 7// Options start with the defaults but can be overridden.
 8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
 9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • 连接 NATS 服务器和 NATS 客户端的 Connect 函数可以设置客户端 URL 和连接选项,并且在这些选项的 Options 结构体中存在一个类型为 InProcessConnProvider 接口的 InProcessServer 字段。
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • 当 NATS 客户端进行 Connect 时,如果将 nats.InProcessServer(ns) 传递给 InProcessServer 字段,那么
 1// nats-go/nats.go
 2
 3// InProcessServer is an Option that will try to establish a direction to a NATS server
 4// running within the process instead of dialing via TCP.
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • 选项的 InProcessServer 将被嵌入式 NATS 服务器替换。
 1// nats-go/nats.go
 2
 3// createConn will connect to the server and wrap the appropriate
 4// bufio structures. It will do the right thing when an existing
 5// connection is in place.
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// If we have a reference to an in-process server then establish a
15	// connection using that.
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • 该接口在创建连接的 createConn 函数中,如果 InProcessServer 选项不为 nil(有效),则执行选项中的 InProcessServer 的 InProcessConn,同时
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • 调用并执行服务器中实现的 InProcessConn
  • 该函数在 NATS 的 Go 客户端 nats.go 中,当 nc (NATS connection) 的 InProcessServer 不为 nil 时被调用,创建连接 (net.Conn),并将其绑定到服务器的连接上。

Go 的消费者驱动接口

类型通过实现其方法来满足接口。没有明确的意图声明,没有“implements”关键字。隐式接口将接口的定义与其实现分离,然后实现可以在任何包中出现而无需预先安排。

接口是隐式实现的,Go 语言之旅

如果一个类型仅仅是为了实现一个接口而存在,并且除了该接口之外永远不会有导出的方法,那么就没有必要导出该类型本身。

通用性,Effective Go

  • 这种接口设计很好地体现了 Go 中常说的消费者定义接口和结构化类型(鸭子类型),因此我也想介绍一下这个主题。
 1// nats-go/nats.go
 2
 3// Options can be used to create a customized connection.
 4type Options struct {
 5	// Url represents a single NATS server url to which the client
 6	// will be connecting. If the Servers option is also set, it
 7	// then becomes the first server in the Servers array.
 8	Url string
 9
10	// InProcessServer represents a NATS server running within the
11	// same process. If this is set then we will attempt to connect
12	// to the server directly rather than using external TCP conns.
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • 让我们回到代码。nats.go 客户端中的 InProcessServer 选项结构体字段被定义为 InProcessConnProvider 接口,该接口只执行 InProcessConn
 1// nats-server/server/server.go
 2
 3// InProcessConn returns an in-process connection to the server,
 4// avoiding the need to use a TCP listener for local connectivity
 5// within the same process. This can be used regardless of the
 6// state of the DontListen option.
 7func (s *Server) InProcessConn() (net.Conn, error) {
 8	pl, pr := net.Pipe()
 9	if !s.startGoRoutine(func() {
10		s.createClientInProcess(pl)
11		s.grWG.Done()
12	}) {
13		pl.Close()
14		pr.Close()
15		return nil, fmt.Errorf("failed to create connection")
16	}
17	return pr, nil
18}
  • 然而,其中传入的类型是 nats-server 的 Server,它不仅执行 InProcessConn,还执行各种其他功能。
  • 这是因为在这种情况下,客户端关注的只是是否提供了 InProcessConn 接口,其他方面并不那么重要。
  • 因此,nats.go 客户端只创建并使用了 InProcessConn() (net.Conn, error) 这一功能定义的 InProcessConnProvider 这一消费者定义接口。

结论

  • 本文简要介绍了 NATS 的嵌入模式及其工作原理,以及通过 NATS 代码可以观察到的 Go 语言的消费者定义接口。
  • 希望这些信息能对出于上述目的使用 NATS 的读者有所帮助,本文到此结束。