Vert.x 技术内幕 | Event Bus 源码分析 (集群模式)

上篇文章中我们探索了Local模式下Event Bus的源码,在这篇文章中我们来探索一下Vert.x中的Clustered Event Bus是如何实现的。对应的Vert.x版本为3.3.2

集群模式介绍

我们先来简单地介绍一下集群模式下Event Bus的基本原理。

我们可以通过集群模式下的Event Bus在不同的服务器之间进行通信,其本质为TCP通信。Vert.x集群模式需要一个集群管理器(默认为HazelcastClusterManager)来管理集群的状态,存储元数据。当我们在某个节点A给集群模式的Event Bus绑定一个对应地址addressconsumer的时候,Event Bus会将此节点的ServerID(包含hostport信息)存储至集群管理器的共享Map中,key为绑定的地址address,value为绑定了此地址address的所有结点的ServerID集合(可以看作是具有负载均衡功能的Set)。集群中的所有节点都可以从集群管理器中获取Map记录。并且绑定consumer的同时节点A会建立一个NetServer接收数据。这样,我们再通过另一个结点B向此地址address发送消息的时候,B就会从集群管理器中取出此地址对应的ServerID集合,并根据是点对点发送还是发布,根据相应的策略创建NetClient执行消息分发逻辑。这样,对应的NetServer收到数据后会对其进行解码然后在本地进行消息的处理。

集群模式下我们还需要注意几个问题:

  • 某个节点挂了怎么办?
  • 如何确保结点的高可用性?

当某个节点挂掉的时候,其连接将会不可用,集群管理器就会将此节点的信息从集群中移除,并且传播到所有的节点删除对应缓存的信息,这样发消息的时候就不会发送到挂掉的无效节点处。至于高可用性,Vert.x提供了高可用管理器HAManager用于实现高可用性,在发生故障时能够快速failover,详情可见官方文档

好了,下面我们就来分析一下Clustered Event Bus的源码。集群模式下Event Bus的类型为ClusteredEventBus,它继承了单机模式的EventBusImpl类。其初始化过程与Local模式大同小异,因此这里就直接分析发送和接受消息相关的逻辑了。

绑定MessageConsumer

我们还是先来看consumer方法的逻辑。前面的调用逻辑都和Local模式下相同,可以参考之前的文章。不同之处在添加记录的地方。Cluster模式下Event Bus需要将当前机器的位置存储至Map中并且传播至集群内的所有节点,因此ClusteredEventBus重写了四个参数版本的addRegistration方法(之前在EventBusImpl类中这个版本的方法用处不大,这里用处就大了):

1
2
3
4
5
6
7
8
9
10
11
@Override
protected <T> void addRegistration(boolean newAddress, String address,
boolean replyHandler, boolean localOnly,
Handler<AsyncResult<Void>> completionHandler) {
if (newAddress && subs != null && !replyHandler && !localOnly) {
// Propagate the information
subs.add(address, serverID, completionHandler);
} else {
completionHandler.handle(Future.succeededFuture());
}
}

如果要绑定MessageConsumer对应的地址在本地中没有注册过,并且不是Event Bus自动生成的reply consumer,并且允许在集群范围内传播的话,Event Bus就会将当前机器的位置添加到集群内的记录subs中。subs的类型为AsyncMultiMap:

1
private AsyncMultiMap<String, ServerID> subs;

ClusteredEventBus启动时会对其进行初始化:

1
2
3
4
5
6
7
8
clusterManager.<String, ServerID>getAsyncMultiMap(SUBS_MAP_NAME, ar2 -> {
if (ar2.succeeded()) {
subs = ar2.result();
// 其他代码暂略。。。
} else {
// 代码略。。。
}
});

从名字就可以看出来,AsyncMultiMap允许一键多值,并且其变动可以在集群范围内传播。由于AsyncMultiMap集群范围内的,因此对其操作都是异步的。在这里我们可以简单地把它看作是一个Map<String, ChoosableIterable<ServerID>>类型的键值对,其中ChoosableIterable与之前见到过的Handlers类似,属于可以通过轮询算法获取某一元素的集合。subs的key为绑定的地址,value为绑定此地址的机器位置的集合。机器的位置用ServerID表示,里面包含了该机器的hostport。这样,每当我们向某个地址绑定一个MessageConsumer的时候,绑定consumer的ServerID就会被记录到集群中并与地址相对应,其它机器在向此地址发送(或发布)消息的时候,Event Bus就可以从集群中获取在此地址上绑定了consumer的所有ServerID,再根据相应的策略选出合适的ServerID建立TCP通信将数据发送至对应机器中,对应机器收到消息后解码并在本地对其进行处理。

这里面还需要注意一点:我们可以在EventBusOptions中指定ServerIDporthost,若不指定则port将随机分配(NetServer的特性)。

剩下的过程也就大同小异了。至于unregister方法,无非就是将底层的removeRegistration方法重写,从subs中删除对应的ServerID并传播至其它节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
protected <T> void removeRegistration(HandlerHolder lastHolder, String address,
Handler<AsyncResult<Void>> completionHandler) {
if (lastHolder != null && subs != null && !lastHolder.isLocalOnly()) {
removeSub(address, serverID, completionHandler);
} else {
callCompletionHandlerAsync(completionHandler);
}
}
private void removeSub(String subName, ServerID theServerID, Handler<AsyncResult<Void>> completionHandler) {
subs.remove(subName, theServerID, ar -> {
if (!ar.succeeded()) {
log.error("Failed to remove sub", ar.cause());
} else {
if (ar.result()) {
if (completionHandler != null) {
completionHandler.handle(Future.succeededFuture());
}
} else {
if (completionHandler != null) {
completionHandler.handle(Future.failedFuture("sub not found"));
}
}
}
});
}

消息的发送/发布

集群模式下的消息与本地模式下的消息不同。集群模式下的消息实体类型为ClusteredMessage,它继承了MessageImpl消息实体类,并且根据远程传输的特性实现了一种Wire Protocol用于远程传输消息,并负责消息的编码和解码。具体的实现就不展开说了,如果有兴趣的话可以阅读ClusteredMessage类中相关方法的实现。

我们上篇文章提到过,Event Bus底层通过createMessage方法创建消息。因此ClusteredEventBus里就对此方法进行了重写,当然改动就是把MessageImpl替换成了ClusteredMessage:

1
2
3
4
5
6
7
8
@Override
protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
ClusteredMessage msg = new ClusteredMessage(serverID, address, null, headers, body, codec, send, this);
return msg;
}

接下来就是消息的发送逻辑了。ClusteredEventBus重写了sendOrPub方法,此方法存在于SendContextImpl类中的next方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void next() {
if (iter.hasNext()) {
Handler<SendContext> handler = iter.next();
try {
handler.handle(this);
} catch (Throwable t) {
log.error("Failure in interceptor", t);
}
} else {
sendOrPub(this);
}
}

我们来看一下ClusteredEventBus是如何进行集群内消息的分发的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
protected <T> void sendOrPub(SendContextImpl<T> sendContext) {
String address = sendContext.message.address();
Handler<AsyncResult<ChoosableIterable<ServerID>>> resultHandler = asyncResult -> {
if (asyncResult.succeeded()) {
ChoosableIterable<ServerID> serverIDs = asyncResult.result();
if (serverIDs != null && !serverIDs.isEmpty()) {
sendToSubs(serverIDs, sendContext);
} else {
metrics.messageSent(address, !sendContext.message.send(), true, false);
deliverMessageLocally(sendContext);
}
} else {
log.error("Failed to send message", asyncResult.cause());
}
};
if (Vertx.currentContext() == null) {
// Guarantees the order when there is no current context
sendNoContext.runOnContext(v -> {
subs.get(address, resultHandler);
});
} else {
subs.get(address, resultHandler);
}
}

首先Event Bus需要从传入的sendContext中获取要发送至的地址。接着Event Bus需要从集群管理器中获取在此地址上绑定consumer的所有ServerID,这个过程是异步的,并且需要在Vert.x Context中执行。如果获取记录成功,我们会得到一个可通过轮询算法获取ServerID的集合(类型为ChoosableIterable<ServerID>)。如果集合为空,则代表集群内其它节点没有在此地址绑定consumer(或者由于一致性问题没有同步),Event Bus就将消息通过deliverMessageLocally方法在本地进行相应的分发。deliverMessageLocally方法的逻辑之前我们已经详细讲过了,这里就不再细说了;如果集合不为空,Event Bus就调用sendToSubs方法进行下一步操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private <T> void sendToSubs(ChoosableIterable<ServerID> subs, SendContextImpl<T> sendContext) {
String address = sendContext.message.address();
if (sendContext.message.send()) {
// Choose one
ServerID sid = subs.choose();
if (!sid.equals(serverID)) { //We don't send to this node
metrics.messageSent(address, false, false, true);
sendRemote(sid, sendContext.message);
} else {
metrics.messageSent(address, false, true, false);
deliverMessageLocally(sendContext);
}
} else {
// Publish
boolean local = false;
boolean remote = false;
for (ServerID sid : subs) {
if (!sid.equals(serverID)) { //We don't send to this node
remote = true;
sendRemote(sid, sendContext.message);
} else {
local = true;
}
}
metrics.messageSent(address, true, local, remote);
if (local) {
deliverMessageLocally(sendContext);
}
}
}

这里就到了分sendpublish的时候了。如果发送消息的模式为点对点模式(send),Event Bus会从给的的集合中通过轮询算法获取一个ServerID。然后Event Bus会检查获取到的ServerID是否与本机ServerID相同,如果相同则代表在一个机子上,直接记录metrics信息并且调用deliverMessageLocally方法往本地发送消息即可;如果不相同,Event Bus就会调用sendRemote方法执行真正的远程消息发送逻辑。发布订阅模式的逻辑与其大同小异,只不过需要遍历一下ChoosableIterable<ServerID>集合,然后依次执行之前讲过的逻辑。注意如果要在本地发布消息只需要发一次。

真正的远程消息发送逻辑在sendRemote方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void sendRemote(ServerID theServerID, MessageImpl message) {
ConnectionHolder holder = connections.get(theServerID);
if (holder == null) {
holder = new ConnectionHolder(this, theServerID, options);
ConnectionHolder prevHolder = connections.putIfAbsent(theServerID, holder);
if (prevHolder != null) {
// Another one sneaked in
holder = prevHolder;
} else {
holder.connect();
}
}
holder.writeMessage((ClusteredMessage)message);
}

一开始我们就提到过,节点之间通过Event Bus进行通信的本质是TCP,因此这里需要创建一个NetClient作为TCP服务端,连接到之前获取的ServerID对应的节点然后将消息通过TCP协议发送至接收端。这里Vert.x用一个封装类ConnectionHolderNetClient进行了一些封装。

ClusteredEventBus中维持着一个connections哈希表对用于保存ServerID对应的连接ConnectionHolder。在sendRemote方法中,Event Bus首先会从connections中获取ServerID对应的连接。如果获取不到就创建连接并将其添加至connections记录中并调用对应ConnectionHolderconnect方法建立连接;最后调用writeMessage方法将消息编码后通过TCP发送至对应的接收端。

那么ConnectionHolder是如何实现的呢?我们来看一下其构造函数:

1
2
3
4
5
6
7
8
9
10
ConnectionHolder(ClusteredEventBus eventBus, ServerID serverID, EventBusOptions options) {
this.eventBus = eventBus;
this.serverID = serverID;
this.vertx = eventBus.vertx();
this.metrics = eventBus.getMetrics();
NetClientOptions clientOptions = new NetClientOptions(options.toJson());
ClusteredEventBus.setCertOptions(clientOptions, options.getKeyCertOptions());
ClusteredEventBus.setTrustOptions(clientOptions, options.getTrustOptions());
client = new NetClientImpl(eventBus.vertx(), clientOptions, false);
}

可以看到ConnectionHolder初始化的时候会创建一个NetClient作为TCP请求端,而请求的对象就是接收端的NetServer(后边会讲),客户端配置已经在EventBusOptions中事先配置好了。我们来看看connect方法是如何建立连接的:

1
2
3
4
5
6
7
8
9
10
11
12
synchronized void connect() {
if (connected) {
throw new IllegalStateException("Already connected");
}
client.connect(serverID.port, serverID.host, res -> {
if (res.succeeded()) {
connected(res.result());
} else {
close();
}
});
}

可以看到这里很简单地调用了NetClient#connect方法建立TCP连接,如果建立连接成功的话会得到一个NetSocket对象。Event Bus接着将其传至connected方法中进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private synchronized void connected(NetSocket socket) {
this.socket = socket;
connected = true;
socket.exceptionHandler(t -> close());
socket.closeHandler(v -> close());
socket.handler(data -> {
// Got a pong back
vertx.cancelTimer(timeoutID);
schedulePing();
});
// Start a pinger
schedulePing();
for (ClusteredMessage message : pending) {
Buffer data = message.encodeToWire();
metrics.messageWritten(message.address(), data.length());
socket.write(data);
}
pending.clear();
}

首先Event Bus通过exceptionHandlercloseHandler方法给连接对应的NetSocket绑定异常回调和连接关闭回调,触发的时候都调用close方法关闭连接;为了保证不丢失连接,消息发送方每隔一段时间就需要对消息接收方发送一次心跳包(PING),如果消息接收方在一定时间内没有回复,那么就认为连接丢失,调用close方法关闭连接。心跳检测的逻辑在schedulePing方法中,比较清晰,这里就不详细说了。大家会发现ConnectionHolder里也有个消息队列(缓冲区)pending,并且这里会将队列中的消息依次通过TCP发送至接收端。为什么需要这样呢?其实,这要从创建TCP客户端说起。创建TCP客户端这个过程应该是异步的,需要消耗一定时间,而ConnectionHolder中封装的connect方法却是同步式的。前面我们刚刚看过,通过connect方法建立连接以后会接着调用writeMessage方法发送消息,而这时候客户端连接可能还没建立,因此需要这么个缓冲区先存着,等着连接建立了再一块发送出去(存疑:为什么不将connect方法直接设计成异步的?)。

至于发送消息的writeMessage方法,其逻辑一目了然:

1
2
3
4
5
6
7
8
9
10
11
12
synchronized void writeMessage(ClusteredMessage message) {
if (connected) {
Buffer data = message.encodeToWire();
metrics.messageWritten(message.address(), data.length());
socket.write(data);
} else {
if (pending == null) {
pending = new ArrayDeque<>();
}
pending.add(message);
}
}

如果连接已建立,Event Bus就会调用对应ClusteredMessageencodeToWire方法将其转化为字节流Buffer,然后记录metrics信息,最后通过socketwrite方法将消息写入到Socket中,这样消息就从发送端通过TCP发送到了接收端。如果连接未建立,就如之前讲的那样,先把消息存到消息队列中,等连接建立了再一块发出去。

这样,Clustered Event Bus下消息的发送逻辑就理清楚了。下面我们看一下接收端是如何接收消息并在本地进行消息的处理的。

消息的接收

一开始我们提到过,每个节点的Clustered Event Bus在启动时都会创建一个NetServer作为接收消息的TCP服务端。TCP Server的porthost可以在EventBusOptions中指定,如果不指定的话默认随机分配port,然后Event Bus会根据NetServer的配置来生成当前节点的ServerID

创建TCP Server的逻辑在start方法中,与接受消息有关的逻辑就是这一句:

1
server.connectHandler(getServerHandler());

我们知道,NetServerconnectHandler方法用于绑定对服务端Socket的处理函数,而这里绑定的处理函数是由getServerHandler方法生成的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private Handler<NetSocket> getServerHandler() {
return socket -> {
RecordParser parser = RecordParser.newFixed(4, null);
Handler<Buffer> handler = new Handler<Buffer>() {
int size = -1;
public void handle(Buffer buff) {
if (size == -1) {
size = buff.getInt(0);
parser.fixedSizeMode(size);
} else {
ClusteredMessage received = new ClusteredMessage();
received.readFromWire(buff, codecManager);
metrics.messageRead(received.address(), buff.length());
parser.fixedSizeMode(4);
size = -1;
if (received.codec() == CodecManager.PING_MESSAGE_CODEC) {
// Just send back pong directly on connection
socket.write(PONG);
} else {
deliverMessageLocally(received);
}
}
}
};
parser.setOutput(handler);
socket.handler(parser);
};
}

逻辑非常清晰。这里Event Bus使用了RecordParser来获取发送过来的对应长度的Buffer,并将其绑定在NetServer的Socket上。真正的解析Buffer并处理的逻辑在内部的handler中。之前ClusteredMessage中的Wire Protocol规定Buffer的首部第一个int值为要发送Buffer的长度(逻辑见ClusteredMessage#encodeToWire方法),因此这里首先获取长度,然后给parser设定正确的fixed size,这样parser就可以截取正确长度的Buffer流了。下面Event Bus会创建一个空的ClusteredMessage,然后调用其readFromWire方法从Buffer中重建消息。当然这里还要记录消息已经读取的metrics信息。接着检测收到的消息实体类型是否为心跳检测包(PING),如果是的话就发送回ACK消息(PONG);如果不是心跳包,则代表是正常的消息,Event Bus就调用我们熟悉的deliverMessageLocally函数在本地进行分发处理,接下来的过程就和Local模式一样了。

文章目录
  1. 1. 集群模式介绍
  2. 2. 绑定MessageConsumer
  3. 3. 消息的发送/发布
  4. 4. 消息的接收