組み込みNATSはGoアプリケーションとどのように通信するのですか?
はじめに
NATSについて
ソフトウェアアプリケーションとサービスはデータを交換する必要があります。NATSは、メッセージの形式にセグメント化された、そのようなデータ交換を可能にするインフラストラクチャです。これを「メッセージ指向ミドルウェア」と呼びます。
NATSを使用すると、アプリケーション開発者は以下のことが可能になります。
- 分散型でスケーラブルなクライアント・サーバーアプリケーションを容易に構築できます。
- 汎用的な方法でデータをリアルタイムに保存・配信できます。これは、様々な環境、言語、クラウドプロバイダー、オンプレミスシステムを横断して柔軟に実現できます。
- NATSはGoで構築されたメッセージブローカーです。
Embedded NATS
アプリケーションがGoで記述されており、ユースケースとデプロイメントシナリオに適合する場合、NATSサーバーをアプリケーション内に組み込むことさえ可能です。
- そして、NATSには特異な点があり、Goで構築されたアプリケーションの場合、embedded modeをサポートするということです。
- すなわち、メッセージブローカーの一般的な方式である、別途ブローカーサーバーを起動し、そのサーバーとアプリケーションのクライアントを通じて通信する構造ではなく、ブローカー自体をGoで作成されたアプリケーションに内蔵(embed)できるという話です。
embedded NATSの利点とユースケース
- よく説明されたYoutube動画があるので、動画のリンクをもって代えさせていただきます。
- 別途メッセージブローカーサーバーをデプロイしなくても、modular monolith applicationを作成することで、separate of concernを達成しつつ、natsをembeddedで組み込むという利点を得ることができます。さらに、single binary deploymentも可能になります。
- platform with no network (wasm)だけでなく、offline-first applicationでも有用に利用できます。
公式ドキュメントの例
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // オプションで新しいサーバーを初期化する
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // goroutineを介してサーバーを起動する
22 go ns.Start()
23
24 // サーバーが接続準備完了になるまで待機する
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // サーバーに接続する
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // サブジェクトを購読する
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // メッセージデータを表示する
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // サーバーをシャットダウンする(オプション)
45 ns.Shutdown()
46 })
47
48 // サブジェクトにデータを公開する
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // サーバーのシャットダウンを待機する
52 ns.WaitForShutdown()
53}
- NATS公式ドキュメントが掲載しているEmbedded NATSの例ですが、この例のコード通りに実行すると、embedding modeでの通信は行われません。
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
watch 'netstat -an | grep 127.0.0.1'コマンドでlocalhost (127.0.0.1)とのネットワークのやり取りを確認しながら、go run .で当該goファイルを起動すると、NATSのデフォルトポートである4222から発信する新たなネットワークリクエストが追加されることが確認できます。
embedded modeの正しい設定
意図したとおりにembedded modeで通信を行うためには、以下の2つのオプションが必要です。
- Client:
InProcessServerオプションを追加する必要があります。 - Server:
Server.OptionsにDontListenというフラグをtrueと明示する必要があります。
- Client:
これらの部分は公式には文書化されておらず、この機能の始まりは当該PRを通じて把握することができます。
このPRは3つの要素を追加します。
ServerにInProcessConn()関数を追加し、TCPソケットを使用せずにNATSサーバーへの接続を取得するためにnet.Pipeを構築します。DontListenオプションを追加し、NATSサーバーに通常のTCPリスナーでリッスンしないように指示します。startupCompleteチャネルを追加し、AcceptLoopの開始直前に閉じられ、readyForConnectionsがそれを待機します。
この主な動機は、モノリス(単一プロセス)モードまたはポリリス(複数プロセス)モードのいずれかで実行できるアプリケーションがあることです。NATSを両方のモードでシンプルに使用できるようにしたいと考えていますが、モノリスモードは、ソケット接続を開くことが意味をなさない(モバイル)か、単に不可能(WASM)な様々なプラットフォームに対応できる必要があります。これらの変更により、NATSを完全にインプロセスで使用できるようになります。
付随するPRnats-io/nats.go#774はクライアント側のサポートを追加します。
これは私がこのプロジェクトに提出する最初のPRなので、もし何か明らかなことを見落としている場合は、事前にお詫び申し上げます。
/cc @nats-io/core
embedded modeの動作例
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // 組み込みNATSサーバーを設定するため
14 // DontListenをtrueに設定する
15 DontListen: true,
16 }
17
18 // オプションで新しいサーバーを初期化する
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // goroutineを介してサーバーを起動する
26 go ns.Start()
27
28 // サーバーが接続準備完了になるまで待機する
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // インプロセス接続を介してサーバーに接続する
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // サブジェクトを購読する
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // メッセージデータを表示する
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // サーバーをシャットダウンする(オプション)
49 ns.Shutdown()
50 })
51
52 // サブジェクトにデータを公開する
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // サーバーのシャットダウンを待機する
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- これで、意図したとおりに追加のnetwork hopが発生しないことが確認できます。
内部構造
TL;DR
- このコードを
main.goで実行した際に、内部でどのような関数がどのように動作するかを示すシーケンス図であり、要点を説明すると以下の通りです。DontListen: trueを通じて、サーバーはAcceptLoopというclient listening phaseをスキップします。- clientのConnect optionのうち
InProcessServerが有効化されると、in-memory connectionを生成し、net.Pipeを通じてpipeを作成した後、pipeの終端をclientにnet.Conn型で返します。 - clientとserverは当該connectionを通じてin-process communicationを行います。
サーバー
AcceptLoop
1// nats-server/server/server.go
2
3// クライアントを待機する。
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- まず、
DontListenがtrueの場合、AcceptLoopというクライアントリスニングフェーズをスキップします。
1// nats-server/server/server.go
2
3// AcceptLoopはテストを容易にするためにエクスポートされている。
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // リスナーが適切に設定される前に終了する場合、
6 // チャネルを必ず閉じる。
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // サーバーオプションのスナップショット。
18 opts := s.getOpts()
19
20 // シャットダウンを可能にする状態を設定する
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Error listening on port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Listening for client connections on %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // TLSが有効であることを通知する。
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS required for client connections")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38 }
39 }
40
41 // サーバーがRANDOM_PORT (-1)で起動された場合、opts.Portは
42 // この関数の開始時には0になる。そのため、実際のポートを
43 // 取得する必要がある。
44 if opts.Port == 0 {
45 // 解決されたポートをオプションに書き戻す。
46 opts.Port = l.Addr().(*net.TCPAddr).Port
47 }
48
49 // ポートが設定された後(RANDOMに設定されていた場合)、
50 // サーバーのinfo Host/PortをOptionsまたは
51 // ClientAdvertiseの値で設定する。
52 if err := s.setInfoHostPort(); err != nil {
53 s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
54 l.Close()
55 s.mu.Unlock()
56 return
57 }
58 // クライアント接続URLを追跡する。後で必要になるかもしれない。
59 s.clientConnectURLs = s.getClientConnectURLs()
60 s.listener = l
61
62 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
63 func(_ error) bool {
64 if s.isLameDuckMode() {
65 // 新しいクライアントを受け付けていないことを通知する
66 s.ldmCh <- true
67 // これでシャットダウンを待機する...
68 <-s.quitCh
69 return true
70 }
71 return false
72 })
73 s.mu.Unlock()
74
75 // 呼び出し元に準備ができたことを知らせる
76 close(clr)
77 clr = nil
78}
- 参考までに、AcceptLoop関数は以下のプロセスを実行します。
TLSやhostPortのようにネットワーク通信に関連する部分であり、in-process communicationを行う場合は不要な部分であるため、省略しても問題ありません。
クライアント
InProcessServer
1
2// nats-go/nats.go
3
4// ConnectはNATSシステムへの接続を試みる。
5// URLにはユーザー名/パスワードのセマンティクスを含めることができる。例: nats://derek:pass@localhost:4222
6// コンマ区切りの配列もサポートされている。例: urlA, urlB。
7// オプションはデフォルトから始まるが、上書きできる。
8// NATSサーバーのwebsocketポートに接続するには、`ws`または`wss`スキームを使用する。例: `ws://localhost:8080`。
9// websocketスキームは他のスキーム(nats/tls)と混在できないことに注意。
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Optionsはカスタマイズされた接続を作成するために使用できる。
4type Options struct {
5 // Urlはクライアントが接続する単一のNATSサーバーURLを表す。
6 // Serversオプションも設定されている場合、それは
7 // Servers配列の最初のサーバーになる。
8 Url string
9
10 // InProcessServerは同じプロセス内で実行されているNATSサーバーを表す。
11 // これが設定されている場合、外部TCP接続を使用するのではなく、
12 // サーバーに直接接続を試みる。
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- natsサーバーとnatsクライアントの接続を行う
Connect関数は、クライアントURLとconnect Optionを設定でき、それらのOptionを集めたOptions構造体にはInProcessConnProviderインターフェース型のInProcessServerというフィールドが存在します。
1// main.go of example code
2
3// オプションで新しいサーバーを初期化する
4ns, err := server.NewServer(opts)
5
6//...
7
8// インプロセス接続を介してサーバーに接続する
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- natsクライアントでConnectを行う際に、
InProcessServerフィールドにnats.InProcessServer(ns)を渡すと、
1// nats-go/nats.go
2
3// InProcessServerは、TCPを介してダイヤルする代わりに、
4// プロセス内で実行されているNATSサーバーへの直接的な接続を確立しようとするOptionである。
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- optionのInProcessServerがembedded nats serverに置き換えられ、
1// nats-go/nats.go
2
3// createConnはサーバーに接続し、適切な
4// bufio構造体をラップする。既存の接続が
5// 存在する場合、適切な処理を行う。
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // インプロセスサーバーへの参照がある場合、
15 // それを使用して接続を確立する。
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("failed to get in-process connection: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- 当該インターフェースは接続を作成する
createConn関数において、InProcessServerオプションがnilではない(有効な)場合、オプション内のInProcessServerのInProcessConnを実行し、
1// nats-server/server/server.go
2
3// InProcessConnはサーバーへのインプロセス接続を返し、
4// 同じプロセス内でのローカル接続のためにTCPリスナーを使用する必要性を回避する。
5// これはDontListenオプションの状態に関わらず使用できる。
6func (s *Server) InProcessConn() (net.Conn, error) {
7 pl, pr := net.Pipe()
8 if !s.startGoRoutine(func() {
9 s.createClientInProcess(pl)
10 s.grWG.Done()
11 }) {
12 pl.Close()
13 pr.Close()
14 return nil, fmt.Errorf("failed to create connection")
15 }
16 return pr, nil
17}
- サーバーに実装された
InProcessConnを呼び出して実行します。 - この関数は、natsのGoクライアントである
nats.goにおいて、nc(nats connection)のInProcessServerがnilではない場合に呼び出され、connection(net.Conn)を作成し、それをサーバーのconnectionにバインドします。
GoのConsumer driven interface
型は、そのメソッドを実装することでインターフェースを実装します。意図を明示的に宣言する必要はなく、「implements」キーワードもありません。暗黙的なインターフェースは、インターフェースの定義と実装を分離し、事前の取り決めなしに任意のパッケージで実装できるようになります。
Interfaces are implemented implicitly, A Tour of Go
型がインターフェースを実装するためだけに存在し、そのインターフェースを超えてエクスポートされるメソッドを持たない場合、型自体をエクスポートする必要はありません。
- このインターフェースデザインは、Goでよく言われるconsumer defined interfaceとstructural typing (duck typing)をよく表しているので、このテーマも一緒に紹介したいと思います。
1// nats-go/nats.go
2
3// Optionsはカスタマイズされた接続を作成するために使用できる。
4type Options struct {
5 // Urlはクライアントが接続する単一のNATSサーバーURLを表す。
6 // Serversオプションも設定されている場合、それは
7 // Servers配列の最初のサーバーになる。
8 Url string
9
10 // InProcessServerは同じプロセス内で実行されているNATSサーバーを表す。
11 // これが設定されている場合、外部TCP接続を使用するのではなく、
12 // サーバーに直接接続を試みる。
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- 再びコードに戻りましょう。nats.goクライアントでは、
InProcessServerオプション構造体フィールドが、InProcessConnのみを実行するInProcessConnProviderインターフェースとして定義されています。
1// nats-server/server/server.go
2
3// InProcessConnはサーバーへのインプロセス接続を返し、
4// 同じプロセス内でのローカル接続のためにTCPリスナーを使用する必要性を回避する。
5// これはDontListenオプションの状態に関わらず使用できる。
6func (s *Server) InProcessConn() (net.Conn, error) {
7 pl, pr := net.Pipe()
8 if !s.startGoRoutine(func() {
9 s.createClientInProcess(pl)
10 s.grWG.Done()
11 }) {
12 pl.Close()
13 pr.Close()
14 return nil, fmt.Errorf("failed to create connection")
15 }
16 return pr, nil
17}
- しかし、そこに入力される型はnats-serverの
Serverであり、InProcessConnだけでなく多様な機能を実行しています。 - なぜなら、この状況におけるクライアントの関心事は
InProcessConnというインターフェースが提供されたかどうかだけであり、その他のことはあまり重要ではないからです。 - したがって、nats.goクライアントは
InProcessConn() (net.Conn, error)という機能のみを定義したInProcessConnProviderというconsumer defined interfaceのみを作成して使用しています。
結論
- NATSのembedded modeとその動作方式、そしてNATSのコードを通じて確認できるGoのconsumer defined interfaceについて簡潔に扱いました。
- この情報が上記のような目的でNATSを使用する方々のお役に立つことを願い、本稿を終えたいと思います。