事件驱动与消息系统
在云原生架构中,解耦是一件非常普遍却重要的事情,通过解耦功能,应用的职责变得单纯,更容易管理,也更容易水平扩展。
在由单体应用向分布式应用转换时,应用间通信就变得尤为重要且有难度。实例在变化,流量也在变化,当上游服务实例变化,或者性能波动时,很容易引起下游服务不稳定,同时下游服务流量的激增,也会给上游服务带去很大的压力甚至直接瘫痪上游服务。
请求驱动:
# A, B 下游服务(如登录系统)
# S 上游服务(如邮件发送服务)
# 如果 S1 出现区域性故障,会发生什么?
A1 ----| |---- S1
| |
A2 ----| <--> 负载均衡 <--> |---- S2
| |
B1 ----| |---- S3
事件驱动是解决上述问题的一种思路,通过事件驱动,将同步调用变为异步调用,不仅能够很好解决服务耦合的问题(虽然是两个服务,但上游服务出现任何异常下游服务就不可用,相当于还是耦合),还能够提高服务可扩展性和可观测性,在队列中的事件可以被有条不紊地消化,也可以方便地被系统的其它部分所使用。消息系统因此成为云原生架构中关键的中间件之一。
事件驱动:
# A, B 事件生产(如登录系统)
# S 事件消费(如邮件发送服务)
# 如果 S1 出现区域性故障,会发生什么?
A1 ----| |---- S1
| |
A2 ----| --> 消息系统 --> |---- S2
| |
B1 ----| |---- S3
NATS
NATS (NATS - Open Source Messaging System) 是一个简单,安全,高性能的开源分布式消息通信系统。
我们注意到 NATS,不仅因为它当前(09/08/2020)是 CNCF 孵化中项目 (CNCF to host NATS | Cloud Native Computing Foundation),更因为它的简单和高性能,云原生架构本身就很复杂,而 NATS 的协议设计极为简洁,关于 NATS 的性能,可以看这篇文章: Dissecting Message Queues
NATS 服务端使用 Go 语言编写,并提供 Go, Java, JavaScript 等等多种语言 SDK。官方代码主要由 Synadia 团队维护,同时有很多相关的社区项目。Github 主页:NATS - The Cloud Native Messaging System · GitHub。
连接
在介绍 NATS 的特性之前,我们先了解如何连接 NATS 服务端,然后结合代码讲解吧。
用 Go 语言示范,先写好连接方法:
package demo
import (
"github.com/nats-io/nats.go"
"log"
)
// NewConn creates a new connection to NATS server,
// closedCh will be closed when connection is closed.
func NewConn() (conn *nats.Conn, closedCh chan interface{}) {
server := "nats://demo.nats.io"
closedCh = make(chan interface{})
conn, err := nats.Connect(
server,
nats.DisconnectErrHandler(func(c *nats.Conn, err error) {
if err != nil {
log.Printf("Disconnected due to: %s, trying to reconnect", err)
} else {
log.Print("Disconnected normally")
}
}),
nats.ReconnectHandler(func(c *nats.Conn) {
log.Printf("Reconnected [%s]", c.ConnectedUrl())
}),
nats.ClosedHandler(func(c *nats.Conn) {
if err := c.LastError(); err != nil {
log.Fatalf("Connection closed due to: %s", c.LastError())
} else {
log.Print("Connection closed normally")
}
close(closedCh)
}),
)
if err != nil {
log.Fatal(err)
}
return conn, closedCh
}
其中 "nats://demo.nats.io" 是 NATS 官方提供的一个测试服务器,也可以在本地启动服务器:Installing - NATS Docs,这个地址可能是一个服务器,也可能是一个集群,另外,除了专用协议 nats 外,NATS 也支持 tls 协议。
在连接集群时,server 最好应该指定为集群中多个(最好但不一定是全部)节点的高可用链接,比如 "nats://127.0.0.1:4222,nats://127.0.0.1:5222",SDK 会负责连接可用的节点,同时获取到所有节点的信息,在连接断开时,就可以根据集群信息选择其他可用的节点重连。
然后使用以下代码连接到 NATS 服务器:
conn, closedCh := demo.NewConn()
连接上服务器后,就可以发布或者订阅消息了,我们待会再继续写代码。
基于主题的消息通信
NATS 使用主题 (subject) 的概念组织消息 (message),主题就是一串字符,发布者 (publisher) 和订阅者 (subscriber) 根据主题来找到要操作的内容。
Publisher --> Subject("time.us") --> Subscriber
主题可以包含点 ".",通过这个特殊符号,订阅者可以使用通配符同时订阅多个主题:
"*": 单个片断通配符,"time.*" 可以匹配 "time.us", "time.cn",但不能匹配 "time.us.east"。
">": 多个片断通配符,"time.>" 可以匹配 "time.us", "time.cn", 也可以匹配 "time.us.east",甚至 "time.us.east.newyork"。
"*" 和 ">" 还可以组合使用,比如用 "time.*.east.>" 匹配 "time.us.east.newyork" 这样。
发布与订阅
有了主题的概念,我们可以在代码中添加发布和订阅逻辑:
订阅方:
package main
import (
"demo"
"github.com/nats-io/nats.go"
"log"
"os"
"os/signal"
)
func main() {
conn, closedCh := demo.NewConn()
subject := "foo"
counter := 0
_, err := conn.Subscribe(subject, func(msg *nats.Msg) {
counter += 1
log.Printf("[#%d] Received on [%s]: '%s'",
counter, msg.Subject, string(msg.Data))
})
if err != nil {
log.Fatalf("subscribe error: %s", err)
}
log.Printf("Listening on [%s]", subject)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c // blocked, wait for interruption
log.Print("Interrupted, draining connection")
if err := conn.Drain(); err != nil {
log.Fatalf("drain error: %s", err)
}
<-closedCh // blocked, wait for connection to close
}
Drain() 的作用是等待消息处理完成。
测试:将 Drain 改为 Close,会造成什么后果?可以在回调中添加 time.Sleep 方便观察。参考:Draining Messages Before Disconnect - NATS Docs。
发布方:
package main
import (
"demo"
"log"
)
func main() {
conn, closedCh := demo.NewConn()
subject := "foo"
if err := conn.Publish(subject, []byte("Hello")); err != nil {
log.Fatalf("publish error: %s", err)
}
log.Print("Publish successfully")
if err := conn.Flush(); err != nil {
log.Fatalf("flush error: %s", err)
}
conn.Close()
<-closedCh // blocked, wait for connection to close
}
Flush() 的作用是等待消息发送完成,参考:Caches, Flush and Ping - NATS Docs。
先运行订阅者,再运行发布者,就可以看到消息通过 NATS 服务器实现了分发:
订阅方:
❯ go run ./sub
2020/09/09 13:54:25 Listening on [foo]
2020/09/09 13:54:27 [#1] Received on [foo]: 'Hello'
发布方:
❯ go run ./pub
2020/09/09 13:54:27 Publish successfully
2020/09/09 13:54:27 Disconnected normally
2020/09/09 13:54:27 Connection closed normally
测试:重启订阅者,之前的消息还能收到吗?答案是不能,NATS 默认提供的服务质量等级 (QoS) 是最多一次 (at most once),意味着一条消息一经发送就不再重发,如果订阅服务故障或者重启,中间的消息就丢失了,要实现更高等级的服务质量,就要借助更复杂的设计。
请求与回复
请求与回复是提高服务质量的方式之一,请求方可以根据回复判断消息是否送达。NATS 官方给出了在 NATS 中基于发布订阅的请求与回复实现: Request-Reply - NATS Docs,基本原理是在发布时设置一个临时且唯一的收件箱 (Inbox) 主题,发布之后反过来订阅这个主题,而订阅者收到消息后向它的收件箱发布回复,本质上是一个双向发布-订阅,可以说非常简洁了。
Go SDK 中提供了 Request/Respond 接口:
回复方,在订阅逻辑的基础上增加了回复 (Respond):
func main() {
// same as subscriber...
_, err := conn.Subscribe(subject, func(msg *nats.Msg) {
counter += 1
log.Printf("[#%d] Received on [%s]: '%s'",
counter, msg.Subject, string(msg.Data))
if err := msg.Respond([]byte("I'm fine")); err != nil {
log.Fatalf("respond error: %s", err)
}
})
// same as subscriber...
}
请求方,在发布逻辑的基础上将发布 (Publish) 改为了请求 (Request):
func main() {
// same as publisher...
msg, err := conn.Request(subject, []byte("Hello"), 3*time.Second)
if err != nil {
log.Fatalf("request error: %s", err)
}
log.Printf("Reply received on [%s]: %s", msg.Subject, msg.Data)
// same as publisher...
}
查看源码,Request 与 Publish 的区别在于设置了收件箱主题 "_INBOX." 加随机串,另一边 Respond 就是简单地向收到的信息的收件箱主题进行 Publish。
队列组
NATS 天然支持队列组 (queue group),即多个订阅者消费同一个主题队列,主题中的消息会被均匀发送给队列中的每一个订阅者,实现订阅者的透明扩展。
Go SDK 提供了 QueueSubscribe 方法实现队列组:
func main() {
// same as subscriber...
queue := "bar"
_, err := conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
counter += 1
log.Printf("[#%d] Received on [%s]: '%s'",
counter, msg.Subject, string(msg.Data))
})
// same as subscriber...
}
测试:启动多个订阅者(使用 Subscribe),发布消息,谁会收到消息?再启动多个相同组的队列订阅者(使用 QueueSubscribe),发布消息,谁会收到消息?
身份验证与鉴权
安全是 NATS 很重要的特性,NATS 提供用户名密码,token,NKey 等多种方式验证用户身份,也支持对不同用户分别配置特定主题的权限。
NATS 集群
NATS 服务器支持以集群模式运行,在集群模式下,节点分为路由节点和非路由节点,所有节点都可以提供服务,同时根据路由节点(一个或多个)维护整个集群的状态,客户端在连接集群时,可以连接集群中任一节点,同时获取到所有节点的信息,在连接断开时,就可以根据集群信息选择其它可用的节点进行重连。
有关 NATS 集群,可以参考:Clustering - NATS Docs。
NATS Streaming
NATS streaming 是建立在 NATS 基础上的消息系统,提供持久化,限速,更高的服务质量等等高级功能。
连接
NATS streaming 服务器本质上是 NATS 服务器的一个客户端。在 NATS 中,客户端与服务器直接建立 TCP 连接,TCP 连接状态就是 NATS 连接状态:
NATS client ---- NATS server
但在 NATS streaming 中,客户端与 NATS 服务器建立连接,NATS streaming 服务器也与 NATS 服务器建立连接,客户端和 NATS streaming 服务器并没有直接建立连接:
[ NATS streaming client ---- NATS client ] ---- |
| NATS server
[ NATS streaming server ---- NATS client ] ---- |
这种情况下 NATS streaming 服务器通过客户端 ID (client ID, 创建连接到 streaming 服务器时必须指定且唯一) 区分客户端,同时用心跳包的形式判断客户端连接状态。
看一下代码实现:
package demo
import (
"github.com/nats-io/stan.go"
"log"
)
func NewStreamingConn(clientID string) stan.Conn {
clusterID := "test-cluster"
server := "nats://demo.nats.io"
conn, err := stan.Connect(clusterID, clientID,
stan.NatsURL(server),
stan.SetConnectionLostHandler(func(_ stan.Conn, err error) {
log.Fatalf("Connection lost due to: %s", err)
}),
)
if err != nil {
log.Fatalf("Connect to NATS streaming server error: %s", err)
}
return conn
}
建立 NATS streaming 连接需要提供一个唯一的 client ID。
NATS streaming 没有官方测试服务器,所以需要在本地启动服务器:Installing - NATS Docs。
根据前面说的,streaming 本质上是 NATS 的客户端,因此在启动 streaming 服务器时需要指定 NATS 服务器地址。如果不指定,nats-streaming-server 会默认启动一个内置的 NATS server,单实例运行时没有问题,多实例(集群模式或者容错模式)时会出现冲突。
频道
不同于 NATS,NATS streaming 使用频道 (channel) 组织消息。频道和主题 (subject) 基本是同样的概念,但频道不支持使用通配符,一个订阅 (subscription) 只能订阅一个频道,同时频道是有状态的,频道中的消息会保存在频道对应的消息日志 (message log) 中,在订阅时可以根据需求从消息日志中找到历史消息。
订阅
相比 NATS,NATS streaming 中,订阅有更多种形式,并且不管是哪种订阅方式,都可以指定订阅的消息范围,包括:新消息,所有消息,从最后一条消息开始,某个序列号往后的消息,某个时间往后的消息。可选的订阅方式有:
Regular
连接关闭后订阅就被删除,无法继续订阅。
Durable
可以继续订阅,需要主动取消订阅才会被删除,根据持久名称 (durable name) 和 client ID 确定唯一性。
在继续订阅时,自动从上次中断处开始接收消息,就不可以指定消息范围了。
Queue Group
和 NATS 一样,NATS streaming 也支持队列组,并行处理同一个频道的消息,通过队列名和 client ID 确定唯一性。
Queue group 还可以和 durable 结合使用:Queue Group - NATS Docs。
示例代码:
订阅方:
package main
import (
"demo"
"github.com/nats-io/stan.go"
"log"
"os"
"os/signal"
)
func main() {
clientID := "stan-sub-demo"
subject := "foo"
conn := demo.NewStreamingConn(clientID)
log.Printf("Connected with clientID: %s", clientID)
counter := 0
_, err := conn.Subscribe(subject, func(msg *stan.Msg) {
counter++
log.Printf("[#%d] Received: %s", counter, msg)
})
if err != nil {
log.Fatalf("subscribe error: %s", err)
}
log.Printf("Listening on [%s]", subject)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c // blocked, wait for interruption
log.Print("Interrupted, closing connection")
if err := conn.Close(); err != nil {
log.Fatalf("close NATS streaming connection error: %s", err)
}
}
使用默认参数进行的订阅是 Regular 类型,要使用其它方式,需要在订阅时指定。
发布方:
package main
import (
"demo"
"log"
)
func main() {
clientID := "stan-pub-demo"
subject := "foo"
msg := []byte("Hello")
conn := demo.NewStreamingConn(clientID)
log.Printf("Connected with clientID: %s", clientID)
if err := conn.Publish(subject, msg); err != nil {
log.Fatalf("publish error: %s", err)
}
log.Printf("Published [%s]: %s", subject, msg)
if err := conn.Close(); err != nil {
log.Fatalf("close NATS streaming connection error: %s", err)
}
}
测试:根据前面说的,NATS streaming 服务器与客户端没有直接连接,必须通过心跳包确认对方存在,在启动订阅者后,尝试中止 NATS streaming 服务器并短时间内再启动,客户端会意识到有问题发生吗?中止 NATS 服务器呢?
服务质量
前面提到过 NATS 默认提供最多一次的服务质量,但 NATS streaming 默认提供最少一次 (at least once) 的服务质量,对发送的每条消息,streaming 服务器都期望获得确认 (ACK) 响应,在等待超时后,streaming 服务器会尝试重新发送消息。
持久化
既然频道是有状态的,就会有持久化的问题,NATS streaming 支持许多类型的持久化方式:nats-streaming-server/stores at master · nats-io/nats-streaming-server · GitHub,如果有必要还可以自己实现,只需要实现相应的接口即可。
高可用:集群模式与容错模式
NATS streaming 支持两种高可用模式:集群模式 (clustering) 和容错模式 (fault tolerance)。
集群模式
集群模式下多个相同的 streaming 服务器同时运行,它们各自维护状态,并不互相关心,只通过 RAFT 算法协商一个主服务器提供服务。使用集群模式时,每个服务器的持久化存储都是独立的。
容错模式
容错模式同样有多个 streaming 服务器同时运行,但它们共享一份持久化存储,通过排他锁保证同一时间只有一个服务器承载流量。当前承载流量的服务器称为活动服务器,其它服务器称为待命服务器,所有服务器共享一份状态。
Cloudevents
NATS,作为消息系统,能够很好地完成消息传递,但没有对消息本身作出任何规范与限制。要将 NATS 作为多个异构系统的消息中心,就必须规定好消息的数据结构。
当前社区和主流云平台同时提供着多种消息格式,比如:spec/primer.md at v1.0 · cloudevents/spec · GitHub,cloudevents 也是其中之一。但 cloudevents 作为中立的社区项目更吸引我们采纳,同时 cloudevents 当前 (09/09/2020) 同样是 CNCF 孵化中项目,与其采纳其他厂商主导的格式,或者约定自己的格式,使用 cloudevents 无疑是更好的选择。
术语
发生
发生 (occurrence) 泛指一切对状态变化的感知。包括人为操作,或者自动产生的信号,也包括计时结束等一切状态变化。
事件
事件 (event) 是对一次发生的事情的描述。事件应该包含事件数据和上下文数据两部分内容,但不包含对任何分发和处理过程的描述。
生产者
生产者 (producer) 用于生成事件,可能是产生事件的来源,也可能是一个观察者,可以是一个进程,也可以是一个设备。
来源
来源 (source) 是事件产生的地方,或者说上下文。
消费者
消费者 (consumer) 是消息的接收方,负责处理事件,也包含对新事件的触发。
媒介
媒介 (intermediary) 是在生产者和消费者之间,用于传递消息的存在,传递消息过程中可能涉及复杂的路由策略。
上下文
上下文 (context) 是描述事件来源的内容,用于判断事件来源,以及事件与其它事件的关系。Cloudevents 中规定了一些必须或非必须的上下文属性 (attributes)。
数据
数据 (data) 是对发生的事情的描述,比如具体从什么状态变化为什么状态。
事件格式
此事件格式 (event format) 不是指 cloudevents 本身这样的 "格式",而是事件编码为字节的格式,典型的比如 JSON 格式。
消息
消息 (message) 在 cloudevents 中指代媒介中传输的事件单元,分为两种:
结构化模式消息
在结构化模式消息 (structured-mode message) 中,事件被完全编码为字节流并在消息当中作为专门的消息体传递。我们即将讲的 NATS 协议中,就是使用的结构化模式消息传递事件。
二进制模式消息
在二进制模式消息 (binary-mode message) 中,事件数据(特指 data 部分)被放在消息体中,而事件上下文作为某种形式的元数据传递。典型的比如 HTTP 协议,HTTP 协议中上下文在头 (headers) 中传递。
协议
Cloudevents 中的协议 (protocol) 就是普遍意义上的协议,包括 HTTP,NATS 等等。要注意协议与协议绑定 (protocol binding) 的区别。
协议绑定
协议绑定 (protocol binding) 是 cloudevents 特有的术语,描述事件如何使用特定协议传输,即:事件如何映射为该协议中的消息。
上下文属性
Cloudevents 中的上下文属性有很多,我们看所有必须 (required) 的属性:
id
事件识别号,对每个生产者来说必须唯一。
source
事件来源,建议 (RECOMMEND) 是一个包含绝对路径的 URI。
specversion
Cloudevents 规范版本,当前 (09/09/2020) 最新的是 1.0。
type
事件类型,应该 (SHOULD) 有一个反向 DNS 前缀,比如说 "com.github.pull.create"。
SDK
Cloudevent 规范了很多内容,使用 SDK 可以更好地接入。
NATS 协议绑定
Cloudevents 事件在 NATS 中只支持以结构化模式消息传递,所以 NATS 协议绑定非常简单,直接发布消息内容就可以了,官方对 NATS 协议绑定也作出了一定规范,建议遵守:spec/nats-protocol-binding.md at v1.0 · cloudevents/spec · GitHub。
NATS 与 Cloudevents 的关系
NATS 是 cloudevents 的事件媒体,cloudevents 是 NATS 的消息载体。Cloudevents 负载描述事件,NATS 负责分发事件。