GoSuda

組み込みNATSはGoアプリケーションとどのように通信するのですか?

By prravda
views ...

はじめに

NATSについて

ソフトウェアアプリケーションとサービスはデータを交換する必要があります。NATSは、メッセージの形式にセグメント化された、そのようなデータ交換を可能にするインフラストラクチャです。これを「メッセージ指向ミドルウェア」と呼びます。

NATSを使用すると、アプリケーション開発者は以下のことが可能になります。

  • 分散型でスケーラブルなクライアント・サーバーアプリケーションを容易に構築できます。
  • 汎用的な方法でデータをリアルタイムに保存・配信できます。これは、様々な環境、言語、クラウドプロバイダー、オンプレミスシステムを横断して柔軟に実現できます。

What is NATS, NATS docs

  • NATSはGoで構築されたメッセージブローカーです。

Embedded NATS

アプリケーションがGoで記述されており、ユースケースとデプロイメントシナリオに適合する場合、NATSサーバーをアプリケーション内に組み込むことさえ可能です。

Embedding NATS, NATS docs

  • そして、NATSには特異な点があり、Goで構築されたアプリケーションの場合、embedded modeをサポートするということです。
  • すなわち、メッセージブローカーの一般的な方式である、別途ブローカーサーバーを起動し、そのサーバーとアプリケーションのクライアントを通じて通信する構造ではなく、ブローカー自体をGoで作成されたアプリケーションに内蔵(embed)できるという話です。

embedded NATSの利点とユースケース

  • よく説明されたYoutube動画があるので、動画のリンクをもって代えさせていただきます。
  • 別途メッセージブローカーサーバーをデプロイしなくても、modular monolith applicationを作成することで、separate of concernを達成しつつ、natsをembeddedで組み込むという利点を得ることができます。さらに、single binary deploymentも可能になります。
  • platform with no network (wasm)だけでなく、offline-first applicationでも有用に利用できます。

公式ドキュメントの例

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6
 7    "github.com/nats-io/nats-server/v2/server"
 8    "github.com/nats-io/nats.go"
 9)
10
11func main() {
12    opts := &server.Options{}
13
14    // オプションで新しいサーバーを初期化する
15    ns, err := server.NewServer(opts)
16
17    if err != nil {
18        panic(err)
19    }
20
21    // goroutineを介してサーバーを起動する
22    go ns.Start()
23
24    // サーバーが接続準備完了になるまで待機する
25    if !ns.ReadyForConnections(4 * time.Second) {
26        panic("not ready for connection")
27    }
28
29    // サーバーに接続する
30    nc, err := nats.Connect(ns.ClientURL())
31
32    if err != nil {
33        panic(err)
34    }
35
36    subject := "my-subject"
37
38    // サブジェクトを購読する
39    nc.Subscribe(subject, func(msg *nats.Msg) {
40        // メッセージデータを表示する
41        data := string(msg.Data)
42        fmt.Println(data)
43
44        // サーバーをシャットダウンする(オプション)
45        ns.Shutdown()
46    })
47
48    // サブジェクトにデータを公開する
49    nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51    // サーバーのシャットダウンを待機する
52    ns.WaitForShutdown()
53}
  • NATS公式ドキュメントが掲載しているEmbedded NATSの例ですが、この例のコード通りに実行すると、embedding modeでの通信は行われません。
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:34:20
2                                                                     in 0.017s (0)
3...
4tcp4       0      0  127.0.0.1.4222         127.0.0.1.63769        TIME_WAIT
  • watch 'netstat -an | grep 127.0.0.1'コマンドでlocalhost (127.0.0.1)とのネットワークのやり取りを確認しながら、go run .で当該goファイルを起動すると、NATSのデフォルトポートである4222から発信する新たなネットワークリクエストが追加されることが確認できます。

embedded modeの正しい設定

  • 意図したとおりにembedded modeで通信を行うためには、以下の2つのオプションが必要です。

    • Client: InProcessServerオプションを追加する必要があります。
    • Server: Server.OptionsDontListenというフラグをtrueと明示する必要があります。
  • これらの部分は公式には文書化されておらず、この機能の始まりは当該PRを通じて把握することができます。

    このPRは3つの要素を追加します。

    1. ServerInProcessConn()関数を追加し、TCPソケットを使用せずにNATSサーバーへの接続を取得するためにnet.Pipeを構築します。
    2. DontListenオプションを追加し、NATSサーバーに通常のTCPリスナーでリッスンしないように指示します。
    3. startupCompleteチャネルを追加し、AcceptLoopの開始直前に閉じられ、readyForConnectionsがそれを待機します。

    この主な動機は、モノリス(単一プロセス)モードまたはポリリス(複数プロセス)モードのいずれかで実行できるアプリケーションがあることです。NATSを両方のモードでシンプルに使用できるようにしたいと考えていますが、モノリスモードは、ソケット接続を開くことが意味をなさない(モバイル)か、単に不可能(WASM)な様々なプラットフォームに対応できる必要があります。これらの変更により、NATSを完全にインプロセスで使用できるようになります。

    付随するPRnats-io/nats.go#774はクライアント側のサポートを追加します。

    これは私がこのプロジェクトに提出する最初のPRなので、もし何か明らかなことを見落としている場合は、事前にお詫び申し上げます。

    /cc @nats-io/core

embedded modeの動作例

 1package main
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"github.com/nats-io/nats-server/v2/server"
 8	"github.com/nats-io/nats.go"
 9)
10
11func main() {
12	opts := &server.Options{
13		// 組み込みNATSサーバーを設定するため
14		// DontListenをtrueに設定する
15		DontListen: true,
16	}
17
18	// オプションで新しいサーバーを初期化する
19	ns, err := server.NewServer(opts)
20
21	if err != nil {
22		panic(err)
23	}
24
25	// goroutineを介してサーバーを起動する
26	go ns.Start()
27
28	// サーバーが接続準備完了になるまで待機する
29	if !ns.ReadyForConnections(10 * time.Second) {
30		panic("not ready for connection")
31	}
32
33	// インプロセス接続を介してサーバーに接続する
34	nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36	if err != nil {
37		panic(err)
38	}
39
40	subject := "my-subject"
41
42	// サブジェクトを購読する
43	nc.Subscribe(subject, func(msg *nats.Msg) {
44		// メッセージデータを表示する
45		data := string(msg.Data)
46		fmt.Println(data)
47
48		// サーバーをシャットダウンする(オプション)
49		ns.Shutdown()
50	})
51
52	// サブジェクトにデータを公開する
53	nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55	// サーバーのシャットダウンを待機する
56	ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1         pravdalaptop-home.local: 02:37:50
2                                                                     in 0.023s (0)
3...no additional logs 
4
  • これで、意図したとおりに追加のnetwork hopが発生しないことが確認できます。

内部構造

TL;DR

diagram1

  • このコードをmain.goで実行した際に、内部でどのような関数がどのように動作するかを示すシーケンス図であり、要点を説明すると以下の通りです。
    • DontListen: trueを通じて、サーバーはAcceptLoopというclient listening phaseをスキップします。
    • clientのConnect optionのうちInProcessServerが有効化されると、in-memory connectionを生成し、net.Pipeを通じてpipeを作成した後、pipeの終端をclientにnet.Conn型で返します。
    • clientとserverは当該connectionを通じてin-process communicationを行います。

サーバー

AcceptLoop

1// nats-server/server/server.go
2
3// クライアントを待機する。
4if !opts.DontListen {
5	s.AcceptLoop(clientListenReady)
6}
  • まず、DontListenがtrueの場合、AcceptLoopというクライアントリスニングフェーズをスキップします。
 1// nats-server/server/server.go
 2
 3// AcceptLoopはテストを容易にするためにエクスポートされている。
 4func (s *Server) AcceptLoop(clr chan struct{}) {
 5	// リスナーが適切に設定される前に終了する場合、
 6	// チャネルを必ず閉じる。
 7	defer func() {
 8		if clr != nil {
 9			close(clr)
10		}
11	}()
12
13	if s.isShuttingDown() {
14		return
15	}
16
17	// サーバーオプションのスナップショット。
18	opts := s.getOpts()
19
20	// シャットダウンを可能にする状態を設定する
21	s.mu.Lock()
22	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23	l, e := natsListen("tcp", hp)
24	s.listenerErr = e
25	if e != nil {
26		s.mu.Unlock()
27		s.Fatalf("Error listening on port: %s, %q", hp, e)
28		return
29	}
30	s.Noticef("Listening for client connections on %s",
31		net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33	// TLSが有効であることを通知する。
34	if opts.TLSConfig != nil {
35		s.Noticef("TLS required for client connections")
36		if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37			s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38		}
39	}
40
41	// サーバーがRANDOM_PORT (-1)で起動された場合、opts.Portは
42	// この関数の開始時には0になる。そのため、実際のポートを
43	// 取得する必要がある。
44	if opts.Port == 0 {
45		// 解決されたポートをオプションに書き戻す。
46		opts.Port = l.Addr().(*net.TCPAddr).Port
47	}
48
49	// ポートが設定された後(RANDOMに設定されていた場合)、
50	// サーバーのinfo Host/PortをOptionsまたは
51	// ClientAdvertiseの値で設定する。
52	if err := s.setInfoHostPort(); err != nil {
53		s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
54		l.Close()
55		s.mu.Unlock()
56		return
57	}
58	// クライアント接続URLを追跡する。後で必要になるかもしれない。
59	s.clientConnectURLs = s.getClientConnectURLs()
60	s.listener = l
61
62	go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
63		func(_ error) bool {
64			if s.isLameDuckMode() {
65				// 新しいクライアントを受け付けていないことを通知する
66				s.ldmCh <- true
67				// これでシャットダウンを待機する...
68				<-s.quitCh
69				return true
70			}
71			return false
72		})
73	s.mu.Unlock()
74
75	// 呼び出し元に準備ができたことを知らせる
76	close(clr)
77	clr = nil
78}
  • 参考までに、AcceptLoop関数は以下のプロセスを実行します。TLShostPortのようにネットワーク通信に関連する部分であり、in-process communicationを行う場合は不要な部分であるため、省略しても問題ありません。

クライアント

InProcessServer

 1
 2// nats-go/nats.go
 3
 4// ConnectはNATSシステムへの接続を試みる。
 5// URLにはユーザー名/パスワードのセマンティクスを含めることができる。例: nats://derek:pass@localhost:4222
 6// コンマ区切りの配列もサポートされている。例: urlA, urlB。
 7// オプションはデフォルトから始まるが、上書きできる。
 8// NATSサーバーのwebsocketポートに接続するには、`ws`または`wss`スキームを使用する。例: `ws://localhost:8080`。
 9// websocketスキームは他のスキーム(nats/tls)と混在できないことに注意。
10func Connect(url string, options ...Option) (*Conn, error) {
11	opts := GetDefaultOptions()
12	opts.Servers = processUrlString(url)
13	for _, opt := range options {
14		if opt != nil {
15			if err := opt(&opts); err != nil {
16				return nil, err
17			}
18		}
19	}
20	return opts.Connect()
21}
 1// nats-go/nats.go
 2
 3// Optionsはカスタマイズされた接続を作成するために使用できる。
 4type Options struct {
 5	// Urlはクライアントが接続する単一のNATSサーバーURLを表す。
 6	// Serversオプションも設定されている場合、それは
 7	// Servers配列の最初のサーバーになる。
 8	Url string
 9
10	// InProcessServerは同じプロセス内で実行されているNATSサーバーを表す。
11	// これが設定されている場合、外部TCP接続を使用するのではなく、
12	// サーバーに直接接続を試みる。
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • natsサーバーとnatsクライアントの接続を行うConnect関数は、クライアントURLとconnect Optionを設定でき、それらのOptionを集めたOptions構造体にはInProcessConnProviderインターフェース型のInProcessServerというフィールドが存在します。
1// main.go of example code
2
3// オプションで新しいサーバーを初期化する
4ns, err := server.NewServer(opts)
5
6//...
7
8// インプロセス接続を介してサーバーに接続する
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
  • natsクライアントでConnectを行う際に、InProcessServerフィールドにnats.InProcessServer(ns)を渡すと、
 1// nats-go/nats.go
 2
 3// InProcessServerは、TCPを介してダイヤルする代わりに、
 4// プロセス内で実行されているNATSサーバーへの直接的な接続を確立しようとするOptionである。
 5func InProcessServer(server InProcessConnProvider) Option {
 6	return func(o *Options) error {
 7		o.InProcessServer = server
 8		return nil
 9	}
10}
  • optionのInProcessServerがembedded nats serverに置き換えられ、
 1// nats-go/nats.go
 2
 3// createConnはサーバーに接続し、適切な
 4// bufio構造体をラップする。既存の接続が
 5// 存在する場合、適切な処理を行う。
 6func (nc *Conn) createConn() (err error) {
 7	if nc.Opts.Timeout < 0 {
 8		return ErrBadTimeout
 9	}
10	if _, cur := nc.currentServer(); cur == nil {
11		return ErrNoServers
12	}
13
14	// インプロセスサーバーへの参照がある場合、
15	// それを使用して接続を確立する。
16	if nc.Opts.InProcessServer != nil {
17		conn, err := nc.Opts.InProcessServer.InProcessConn()
18		if err != nil {
19			return fmt.Errorf("failed to get in-process connection: %w", err)
20		}
21		nc.conn = conn
22		nc.bindToNewConn()
23		return nil
24	}
25	
26	//...
27}
  • 当該インターフェースは接続を作成するcreateConn関数において、InProcessServerオプションがnilではない(有効な)場合、オプション内のInProcessServerのInProcessConnを実行し、
 1// nats-server/server/server.go
 2
 3// InProcessConnはサーバーへのインプロセス接続を返し、
 4// 同じプロセス内でのローカル接続のためにTCPリスナーを使用する必要性を回避する。
 5// これはDontListenオプションの状態に関わらず使用できる。
 6func (s *Server) InProcessConn() (net.Conn, error) {
 7	pl, pr := net.Pipe()
 8	if !s.startGoRoutine(func() {
 9		s.createClientInProcess(pl)
10		s.grWG.Done()
11	}) {
12		pl.Close()
13		pr.Close()
14		return nil, fmt.Errorf("failed to create connection")
15	}
16	return pr, nil
17}
  • サーバーに実装されたInProcessConnを呼び出して実行します。
  • この関数は、natsのGoクライアントであるnats.goにおいて、nc(nats connection)のInProcessServerがnilではない場合に呼び出され、connection(net.Conn)を作成し、それをサーバーのconnectionにバインドします。

GoのConsumer driven interface

型は、そのメソッドを実装することでインターフェースを実装します。意図を明示的に宣言する必要はなく、「implements」キーワードもありません。暗黙的なインターフェースは、インターフェースの定義と実装を分離し、事前の取り決めなしに任意のパッケージで実装できるようになります。

Interfaces are implemented implicitly, A Tour of Go

型がインターフェースを実装するためだけに存在し、そのインターフェースを超えてエクスポートされるメソッドを持たない場合、型自体をエクスポートする必要はありません。

Generality, Effective Go

  • このインターフェースデザインは、Goでよく言われるconsumer defined interfaceとstructural typing (duck typing)をよく表しているので、このテーマも一緒に紹介したいと思います。
 1// nats-go/nats.go
 2
 3// Optionsはカスタマイズされた接続を作成するために使用できる。
 4type Options struct {
 5	// Urlはクライアントが接続する単一のNATSサーバーURLを表す。
 6	// Serversオプションも設定されている場合、それは
 7	// Servers配列の最初のサーバーになる。
 8	Url string
 9
10	// InProcessServerは同じプロセス内で実行されているNATSサーバーを表す。
11	// これが設定されている場合、外部TCP接続を使用するのではなく、
12	// サーバーに直接接続を試みる。
13	InProcessServer InProcessConnProvider
14	
15	//...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4	InProcessConn() (net.Conn, error)
5}
  • 再びコードに戻りましょう。nats.goクライアントでは、InProcessServerオプション構造体フィールドが、InProcessConnのみを実行するInProcessConnProviderインターフェースとして定義されています。
 1// nats-server/server/server.go
 2
 3// InProcessConnはサーバーへのインプロセス接続を返し、
 4// 同じプロセス内でのローカル接続のためにTCPリスナーを使用する必要性を回避する。
 5// これはDontListenオプションの状態に関わらず使用できる。
 6func (s *Server) InProcessConn() (net.Conn, error) {
 7	pl, pr := net.Pipe()
 8	if !s.startGoRoutine(func() {
 9		s.createClientInProcess(pl)
10		s.grWG.Done()
11	}) {
12		pl.Close()
13		pr.Close()
14		return nil, fmt.Errorf("failed to create connection")
15	}
16	return pr, nil
17}
  • しかし、そこに入力される型はnats-serverのServerであり、InProcessConnだけでなく多様な機能を実行しています。
  • なぜなら、この状況におけるクライアントの関心事はInProcessConnというインターフェースが提供されたかどうかだけであり、その他のことはあまり重要ではないからです。
  • したがって、nats.goクライアントはInProcessConn() (net.Conn, error)という機能のみを定義したInProcessConnProviderというconsumer defined interfaceのみを作成して使用しています。

結論

  • NATSのembedded modeとその動作方式、そしてNATSのコードを通じて確認できるGoのconsumer defined interfaceについて簡潔に扱いました。
  • この情報が上記のような目的でNATSを使用する方々のお役に立つことを願い、本稿を終えたいと思います。