Jak komunikuje vestavěný NATS s aplikací v Go?
Getting started
About NATS
Software applications and services need to exchange data. NATS is an infrastructure that allows such data exchange, segmented in the form of messages. We call this a "message oriented middleware".
With NATS, application developers can:
- Effortlessly build distributed and scalable client-server applications.
- Store and distribute data in realtime in a general manner. This can flexibly be achieved across various environments, languages, cloud providers and on-premises systems.
- NATS 는 Go 로 구성된 메시지 브로커이다.
Embedded NATS
If your application is in Go, and if it fits your use case and deployment scenarios, you can even embed a NATS server inside your application.
- 그리고 NATS 의 특이사항이 있는데, Go 로 구성된 애플리케이션의 경우 embedded mode 를 지원한다는 것이다.
- 즉, 메시지 브로커의 일반적인 방식인 별도의 브로커 서버 구동 후 해당 서버와 애플리케이션의 클라이언트를 통해 통신하는 구조가 아닌, 브로커 자체를 Go 로 만든 애플리케이션에 내장(embed)할 수 있다는 이야기이다.
Benefits and use cases of embedded NATS
- 잘 설명된 Youtube 영상이 있어서 영상의 링크로 갈음한다.
- 별도의 메시지 브로커 서버를 배포하지 않더라도 modular monolith applictaion 을 만들어서 separate of concern 을 달성하기도 하면서 nats 를 embedded 로 심을 수 있는 장점을 취할 수가 있다. 더하여 single binary deployment 도 가능해진다.
- platform with no network(wasm) 뿐만 아니라, offline-first application 에서 유용하게 사용할 수 있다.
Example on official docs
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{}
13
14 // Initialize new server with options
15 ns, err := server.NewServer(opts)
16
17 if err != nil {
18 panic(err)
19 }
20
21 // Start the server via goroutine
22 go ns.Start()
23
24 // Wait for server to be ready for connections
25 if !ns.ReadyForConnections(4 * time.Second) {
26 panic("not ready for connection")
27 }
28
29 // Connect to server
30 nc, err := nats.Connect(ns.ClientURL())
31
32 if err != nil {
33 panic(err)
34 }
35
36 subject := "my-subject"
37
38 // Subscribe to the subject
39 nc.Subscribe(subject, func(msg *nats.Msg) {
40 // Print message data
41 data := string(msg.Data)
42 fmt.Println(data)
43
44 // Shutdown the server (optional)
45 ns.Shutdown()
46 })
47
48 // Publish data to the subject
49 nc.Publish(subject, []byte("Hello embedded NATS!"))
50
51 // Wait for server shutdown
52 ns.WaitForShutdown()
53}
- NATS 공식 문서가 걸어놓은 Embedded NATS 의 예시인데, 해당 예시 코드대로 진행하게 되면 embedding mode 로 communication 이 이뤄지지 않는다.
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:34:20
2 in 0.017s (0)
3...
4tcp4 0 0 127.0.0.1.4222 127.0.0.1.63769 TIME_WAIT
watch 'netstat -an | grep 127.0.0.1'
command 를 통해 localhost(127.0.0.1) 로 오가는 network 를 확인하면서go run .
로 해당 go file 을 실행시키면 NATS 의 default port 인4222
에서 출발하는 새로운 네트워크 요청들이 추가되는 걸 볼 수 있다.
Right configurations for embedding mode
의도하는 대로 embedded mode 로 통신을 하기 위해선 다음과 같은 두 가지 옵션이 필요하다.
- Client:
InProcessServer
option 을 넣어주어야 한다. - Server:
Server.Options
에DontListen
이라는 flag 를true
로 명시해야 한다.
- Client:
해당 부분들은 공식적으로 문서화가 되어있진 않았고, 이 기능의 시작은 해당 PR 을 통해 파악할 수 있다.
This PR adds three things:
InProcessConn()
function toServer
which builds anet.Pipe
to get a connection to the NATS server without using TCP socketsDontListen
option which tells the NATS server not to listen on the usual TCP listenerstartupComplete
channel, which is closed right before we startAcceptLoop
, andreadyForConnections
will wait for it
The main motivation for this is that we have an application that can run either in a monolith (single-process) mode or a polylith (multi-process) mode. We'd like to be able to use NATS for both modes for simplicity, but the monolith mode has to be able to cater for a variety of platforms where opening socket connections either doesn't make sense (mobile) or just isn't possible (WASM). These changes will allow us to use NATS entirely in-process instead.
An accompanying PR nats-io/nats.go#774 adds support to the client side.
This is my first PR to this project so apologies in advance if I've missed anything obvious anywhere.
/cc @nats-io/core
Working Example for embedded mode
1package main
2
3import (
4 "fmt"
5 "time"
6
7 "github.com/nats-io/nats-server/v2/server"
8 "github.com/nats-io/nats.go"
9)
10
11func main() {
12 opts := &server.Options{
13 // for configuring the embeded NATS server
14 // set DonListen as true
15 DontListen: true,
16 }
17
18 // Initialize new server with options
19 ns, err := server.NewServer(opts)
20
21 if err != nil {
22 panic(err)
23 }
24
25 // Start the server via goroutine
26 go ns.Start()
27
28 // Wait for server to be ready for connections
29 if !ns.ReadyForConnections(10 * time.Second) {
30 panic("not ready for connection")
31 }
32
33 // Connect to server via in-process connection
34 nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
35
36 if err != nil {
37 panic(err)
38 }
39
40 subject := "my-subject"
41
42 // Subscribe to the subject
43 nc.Subscribe(subject, func(msg *nats.Msg) {
44 // Print message data
45 data := string(msg.Data)
46 fmt.Println(data)
47
48 // Shutdown the server (optional)
49 ns.Shutdown()
50 })
51
52 // Publish data to the subject
53 nc.Publish(subject, []byte("Hello embedded NATS!"))
54
55 // Wait for server shutdown
56 ns.WaitForShutdown()
57}
58
1Every 2.0s: netstat -an | grep 127.0.0.1 pravdalaptop-home.local: 02:37:50
2 in 0.023s (0)
3...no additional logs
4
- 이제 의도한 대로 추가적인 network hop 이 발생하지 않는 것을 볼 수 있다.
Under the hood
TL;DR
- 해당 코드를
main.go
에서 실행시켰을 때, 내부적으로 어떠한 함수들이 어떻게 작동되는지를 나타낸 sequence diagram 이며, 골자를 설명하자면 아래와 같다.DontListen: true
를 통해 서버는AcceptLoop
라는 client listening phase 를 생략한다.- client 의 Connect option 중
InProcessServer
가 활성화 된다면 in-memory connection 을 생성하고net.Pipe
를 통해 pipe 를 만든 뒤 end of pipe 를 client 에게net.Conn
type 으로 반환한다. - client 와 server 가 해당 connection 을 통해 in-process communication 을 진행한다.
Server
AcceptLoop
1// nats-server/server/server.go
2
3// Wait for clients.
4if !opts.DontListen {
5 s.AcceptLoop(clientListenReady)
6}
- 먼저
DontListen
이 true 인 경우,AcceptLoop
라는 client listening phase 를 생략한다.
1// nats-server/server/server.go
2
3// AcceptLoop is exported for easier testing.
4func (s *Server) AcceptLoop(clr chan struct{}) {
5 // If we were to exit before the listener is setup properly,
6 // make sure we close the channel.
7 defer func() {
8 if clr != nil {
9 close(clr)
10 }
11 }()
12
13 if s.isShuttingDown() {
14 return
15 }
16
17 // Snapshot server options.
18 opts := s.getOpts()
19
20 // Setup state that can enable shutdown
21 s.mu.Lock()
22 hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
23 l, e := natsListen("tcp", hp)
24 s.listenerErr = e
25 if e != nil {
26 s.mu.Unlock()
27 s.Fatalf("Error listening on port: %s, %q", hp, e)
28 return
29 }
30 s.Noticef("Listening for client connections on %s",
31 net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
32
33 // Alert of TLS enabled.
34 if opts.TLSConfig != nil {
35 s.Noticef("TLS required for client connections")
36 if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
37 s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
38 }
39 }
40
41 // If server was started with RANDOM_PORT (-1), opts.Port would be equal
42 // to 0 at the beginning this function. So we need to get the actual port
43 if opts.Port == 0 {
44 // Write resolved port back to options.
45 opts.Port = l.Addr().(*net.TCPAddr).Port
46 }
47
48 // Now that port has been set (if it was set to RANDOM), set the
49 // server's info Host/Port with either values from Options or
50 // ClientAdvertise.
51 if err := s.setInfoHostPort(); err != nil {
52 s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
53 l.Close()
54 s.mu.Unlock()
55 return
56 }
57 // Keep track of client connect URLs. We may need them later.
58 s.clientConnectURLs = s.getClientConnectURLs()
59 s.listener = l
60
61 go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
62 func(_ error) bool {
63 if s.isLameDuckMode() {
64 // Signal that we are not accepting new clients
65 s.ldmCh <- true
66 // Now wait for the Shutdown...
67 <-s.quitCh
68 return true
69 }
70 return false
71 })
72 s.mu.Unlock()
73
74 // Let the caller know that we are ready
75 close(clr)
76 clr = nil
77}
- 참고로 AcceptLoop 함수는 다음과 같은 과정들을 진행한다.
TLS
나hostPort
와 같이 network 통신과 관련된 부분으로, in-process communication 을 하게 되면 필요 없는 부분들이니 생략해도 무방한 부분들이다.
Client
InProcessServer
1
2// nats-go/nats.go
3
4// Connect will attempt to connect to the NATS system.
5// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
6// Comma separated arrays are also supported, e.g. urlA, urlB.
7// Options start with the defaults but can be overridden.
8// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
9// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
10func Connect(url string, options ...Option) (*Conn, error) {
11 opts := GetDefaultOptions()
12 opts.Servers = processUrlString(url)
13 for _, opt := range options {
14 if opt != nil {
15 if err := opt(&opts); err != nil {
16 return nil, err
17 }
18 }
19 }
20 return opts.Connect()
21}
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4type Options struct {
5 // Url represents a single NATS server url to which the client
6 // will be connecting. If the Servers option is also set, it
7 // then becomes the first server in the Servers array.
8 Url string
9
10 // InProcessServer represents a NATS server running within the
11 // same process. If this is set then we will attempt to connect
12 // to the server directly rather than using external TCP conns.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- nats server 와 nats client 의 연결을 진행하는
Connect
함수는 client URL 과 connect Option 을 설정할 수 있고, 해당 Option 들을 모아놓은 Options struct 엔InProcesConnProvider
interface type 의InProcessServer
라는 field 가 존재한다.
1// main.go of example code
2
3// Initialize new server with options
4ns, err := server.NewServer(opts)
5
6//...
7
8// Connect to server via in-process connection
9nc, err := nats.Connect(ns.ClientURL(), nats.InProcessServer(ns))
- nats client 에서 Connect 를 진행할 때,
InProcessServer
field 로nats.InProcessServer(ns)
를 넘겨주게 되면
1// nats-go/nats.go
2
3// InProcessServer is an Option that will try to establish a direction to a NATS server
4// running within the process instead of dialing via TCP.
5func InProcessServer(server InProcessConnProvider) Option {
6 return func(o *Options) error {
7 o.InProcessServer = server
8 return nil
9 }
10}
- option 의 InProcessServer 가 embedded nats server 로 대체되고
1// nats-go/nats.go
2
3// createConn will connect to the server and wrap the appropriate
4// bufio structures. It will do the right thing when an existing
5// connection is in place.
6func (nc *Conn) createConn() (err error) {
7 if nc.Opts.Timeout < 0 {
8 return ErrBadTimeout
9 }
10 if _, cur := nc.currentServer(); cur == nil {
11 return ErrNoServers
12 }
13
14 // If we have a reference to an in-process server then establish a
15 // connection using that.
16 if nc.Opts.InProcessServer != nil {
17 conn, err := nc.Opts.InProcessServer.InProcessConn()
18 if err != nil {
19 return fmt.Errorf("failed to get in-process connection: %w", err)
20 }
21 nc.conn = conn
22 nc.bindToNewConn()
23 return nil
24 }
25
26 //...
27}
- 해당 interface 는 connection 을 만들어주는
createConn
함수에서InProcessServer
option 이 nil 이 아닌(valid 한)경우 option 에 있는 InProcessServer 의InProcesConn
을 실행하면서
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// avoiding the need to use a TCP listener for local connectivity
5// within the same process. This can be used regardless of the
6// state of the DontListen option.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- server 에 구현된
InProcessConn
을 호출해 실행한다. - 해당 function 은 nats 의 go client 인
nats.go
에서 nc(nats connection) 의InProcessServer
가 nil 이 아닐 경우 호출이 되어 connection(net.Conn
) 을 만들고, 이를 server 의 connection 에 bind 시킨다.
Consumer driven interface of Go
A type implements an interface by implementing its methods. There is no explicit declaration of intent, no "implements" keyword. Implicit interfaces decouple the definition of an interface from its implementation, which could then appear in any package without prearrangement.
Interfaces are implemented implicitly, A Tour of Go
If a type exists only to implement an interface and will never have exported methods beyond that interface, there is no need to export the type itself.
- 해당 interface design 은 Go 에서 흔히 말 하는 consumer defined interface 와 structural typing(duck typing) 을 잘 담고 있어서 해당 주제도 같이 소개해 보려고 한다.
1// nats-go/nats.go
2
3// Options can be used to create a customized connection.
4type Options struct {
5 // Url represents a single NATS server url to which the client
6 // will be connecting. If the Servers option is also set, it
7 // then becomes the first server in the Servers array.
8 Url string
9
10 // InProcessServer represents a NATS server running within the
11 // same process. If this is set then we will attempt to connect
12 // to the server directly rather than using external TCP conns.
13 InProcessServer InProcessConnProvider
14
15 //...
16}
1// nats-go/nats.go
2
3type InProcessConnProvider interface {
4 InProcessConn() (net.Conn, error)
5}
- 다시 코드로 넘어가보자. nats.go client 에서
InProcessServer
option struct field 는InProcessConn
만을 수행하는InProcessConnProvider
interface 로 정의되었다.
1// nats-server/server/server.go
2
3// InProcessConn returns an in-process connection to the server,
4// avoiding the need to use a TCP listener for local connectivity
5// within the same process. This can be used regardless of the
6// state of the DontListen option.
7func (s *Server) InProcessConn() (net.Conn, error) {
8 pl, pr := net.Pipe()
9 if !s.startGoRoutine(func() {
10 s.createClientInProcess(pl)
11 s.grWG.Done()
12 }) {
13 pl.Close()
14 pr.Close()
15 return nil, fmt.Errorf("failed to create connection")
16 }
17 return pr, nil
18}
- 그러나 그것에 들어가는 type 은 nats-server 의
Server
로, InProcessConn 뿐만 아니라 다양한 기능들을 수행하고 있다. - 왜냐면 해당 상황에서의 client 의 관심사는
InProcessConn
이라는 interface 를 제공했느냐 아니냐 뿐이지, 다른 것들은 크게 중요하지 않기 때문이다. - 따라서 nats.go client 는
InProcessConn() (net.Conn, error)
이라는 기능만을 정의한InProcessConnProvider
라는 consumer defined interface 만을 만들어서 사용하고 있다.
Conclusion
- NATS 의 embedded mode 와 그 작동방식, 그리고 NATS 의 code 를 통해 확인할 수 있는 Go 의 consumer defined interface 에 대해 간략하게 다루어 보았다.
- 해당 정보가 위와 같은 목적으로 NATS 를 사용하는 사람들에게 도움이 되길 바라며 이 글을 마치고자 한다.