上篇文章中我们探索了Local模式下Event Bus的源码,在这篇文章中我们来探索一下Vert.x中的Clustered Event Bus是如何实现的。对应的Vert.x版本为3.3.2。
集群模式介绍
我们先来简单地介绍一下集群模式下Event Bus的基本原理。
我们可以通过集群模式下的Event Bus在不同的服务器之间进行通信,其本质为TCP通信。Vert.x集群模式需要一个集群管理器(默认为HazelcastClusterManager
)来管理集群的状态,存储元数据。当我们在某个节点A给集群模式的Event Bus绑定一个对应地址address
的consumer
的时候,Event Bus会将此节点的ServerID
(包含host
和port
信息)存储至集群管理器的共享Map中,key
为绑定的地址address
,value为绑定了此地址address
的所有结点的ServerID
集合(可以看作是具有负载均衡功能的Set
)。集群中的所有节点都可以从集群管理器中获取Map记录。并且绑定consumer的同时节点A会建立一个NetServer
接收数据。这样,我们再通过另一个结点B向此地址address
发送消息的时候,B就会从集群管理器中取出此地址对应的ServerID
集合,并根据是点对点发送还是发布,根据相应的策略创建NetClient
执行消息分发逻辑。这样,对应的NetServer
收到数据后会对其进行解码然后在本地进行消息的处理。
集群模式下我们还需要注意几个问题:
当某个节点挂掉的时候,其连接将会不可用,集群管理器就会将此节点的信息从集群中移除,并且传播到所有的节点删除对应缓存的信息,这样发消息的时候就不会发送到挂掉的无效节点处。至于高可用性,Vert.x提供了高可用管理器HAManager
用于实现高可用性,在发生故障时能够快速failover,详情可见官方文档。
好了,下面我们就来分析一下Clustered Event Bus的源码。集群模式下Event Bus的类型为ClusteredEventBus
,它继承了单机模式的EventBusImpl
类。其初始化过程与Local模式大同小异,因此这里就直接分析发送和接受消息相关的逻辑了。
绑定MessageConsumer
我们还是先来看consumer
方法的逻辑。前面的调用逻辑都和Local模式下相同,可以参考之前的文章。不同之处在添加记录的地方。Cluster模式下Event Bus需要将当前机器的位置存储至Map中并且传播至集群内的所有节点,因此ClusteredEventBus
重写了四个参数版本的addRegistration
方法(之前在EventBusImpl
类中这个版本的方法用处不大,这里用处就大了):
1 2 3 4 5 6 7 8 9 10 11
| @Override protected <T> void addRegistration(boolean newAddress, String address, boolean replyHandler, boolean localOnly, Handler<AsyncResult<Void>> completionHandler) { if (newAddress && subs != null && !replyHandler && !localOnly) { subs.add(address, serverID, completionHandler); } else { completionHandler.handle(Future.succeededFuture()); } }
|
如果要绑定MessageConsumer
对应的地址在本地中没有注册过,并且不是Event Bus自动生成的reply consumer,并且允许在集群范围内传播的话,Event Bus就会将当前机器的位置添加到集群内的记录subs
中。subs
的类型为AsyncMultiMap
:
1
| private AsyncMultiMap<String, ServerID> subs;
|
ClusteredEventBus启动时会对其进行初始化:
1 2 3 4 5 6 7 8
| clusterManager.<String, ServerID>getAsyncMultiMap(SUBS_MAP_NAME, ar2 -> { if (ar2.succeeded()) { subs = ar2.result(); } else { } });
|
从名字就可以看出来,AsyncMultiMap
允许一键多值,并且其变动可以在集群范围内传播。由于AsyncMultiMap
是集群范围内的,因此对其操作都是异步的。在这里我们可以简单地把它看作是一个Map<String, ChoosableIterable<ServerID>>
类型的键值对,其中ChoosableIterable
与之前见到过的Handlers
类似,属于可以通过轮询算法获取某一元素的集合。subs
的key为绑定的地址,value为绑定此地址的机器位置的集合。机器的位置用ServerID
表示,里面包含了该机器的host
和port
。这样,每当我们向某个地址绑定一个MessageConsumer
的时候,绑定consumer的ServerID
就会被记录到集群中并与地址相对应,其它机器在向此地址发送(或发布)消息的时候,Event Bus就可以从集群中获取在此地址上绑定了consumer的所有ServerID
,再根据相应的策略选出合适的ServerID
建立TCP通信将数据发送至对应机器中,对应机器收到消息后解码并在本地对其进行处理。
这里面还需要注意一点:我们可以在EventBusOptions
中指定ServerID
的port
和host
,若不指定则port
将随机分配(NetServer
的特性)。
剩下的过程也就大同小异了。至于unregister
方法,无非就是将底层的removeRegistration
方法重写,从subs
中删除对应的ServerID
并传播至其它节点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Override protected <T> void removeRegistration(HandlerHolder lastHolder, String address, Handler<AsyncResult<Void>> completionHandler) { if (lastHolder != null && subs != null && !lastHolder.isLocalOnly()) { removeSub(address, serverID, completionHandler); } else { callCompletionHandlerAsync(completionHandler); } } private void removeSub(String subName, ServerID theServerID, Handler<AsyncResult<Void>> completionHandler) { subs.remove(subName, theServerID, ar -> { if (!ar.succeeded()) { log.error("Failed to remove sub", ar.cause()); } else { if (ar.result()) { if (completionHandler != null) { completionHandler.handle(Future.succeededFuture()); } } else { if (completionHandler != null) { completionHandler.handle(Future.failedFuture("sub not found")); } } } }); }
|
消息的发送/发布
集群模式下的消息与本地模式下的消息不同。集群模式下的消息实体类型为ClusteredMessage
,它继承了MessageImpl
消息实体类,并且根据远程传输的特性实现了一种Wire Protocol用于远程传输消息,并负责消息的编码和解码。具体的实现就不展开说了,如果有兴趣的话可以阅读ClusteredMessage
类中相关方法的实现。
我们上篇文章提到过,Event Bus底层通过createMessage
方法创建消息。因此ClusteredEventBus
里就对此方法进行了重写,当然改动就是把MessageImpl
替换成了ClusteredMessage
:
1 2 3 4 5 6 7 8
| @Override protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) { Objects.requireNonNull(address, "no null address accepted"); MessageCodec codec = codecManager.lookupCodec(body, codecName); @SuppressWarnings("unchecked") ClusteredMessage msg = new ClusteredMessage(serverID, address, null, headers, body, codec, send, this); return msg; }
|
接下来就是消息的发送逻辑了。ClusteredEventBus
重写了sendOrPub
方法,此方法存在于SendContextImpl
类中的next
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public void next() { if (iter.hasNext()) { Handler<SendContext> handler = iter.next(); try { handler.handle(this); } catch (Throwable t) { log.error("Failure in interceptor", t); } } else { sendOrPub(this); } }
|
我们来看一下ClusteredEventBus
是如何进行集群内消息的分发的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Override protected <T> void sendOrPub(SendContextImpl<T> sendContext) { String address = sendContext.message.address(); Handler<AsyncResult<ChoosableIterable<ServerID>>> resultHandler = asyncResult -> { if (asyncResult.succeeded()) { ChoosableIterable<ServerID> serverIDs = asyncResult.result(); if (serverIDs != null && !serverIDs.isEmpty()) { sendToSubs(serverIDs, sendContext); } else { metrics.messageSent(address, !sendContext.message.send(), true, false); deliverMessageLocally(sendContext); } } else { log.error("Failed to send message", asyncResult.cause()); } }; if (Vertx.currentContext() == null) { sendNoContext.runOnContext(v -> { subs.get(address, resultHandler); }); } else { subs.get(address, resultHandler); } }
|
首先Event Bus需要从传入的sendContext
中获取要发送至的地址。接着Event Bus需要从集群管理器中获取在此地址上绑定consumer的所有ServerID
,这个过程是异步的,并且需要在Vert.x Context中执行。如果获取记录成功,我们会得到一个可通过轮询算法获取ServerID
的集合(类型为ChoosableIterable<ServerID>
)。如果集合为空,则代表集群内其它节点没有在此地址绑定consumer(或者由于一致性问题没有同步),Event Bus就将消息通过deliverMessageLocally
方法在本地进行相应的分发。deliverMessageLocally
方法的逻辑之前我们已经详细讲过了,这里就不再细说了;如果集合不为空,Event Bus就调用sendToSubs
方法进行下一步操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| private <T> void sendToSubs(ChoosableIterable<ServerID> subs, SendContextImpl<T> sendContext) { String address = sendContext.message.address(); if (sendContext.message.send()) { ServerID sid = subs.choose(); if (!sid.equals(serverID)) { metrics.messageSent(address, false, false, true); sendRemote(sid, sendContext.message); } else { metrics.messageSent(address, false, true, false); deliverMessageLocally(sendContext); } } else { boolean local = false; boolean remote = false; for (ServerID sid : subs) { if (!sid.equals(serverID)) { remote = true; sendRemote(sid, sendContext.message); } else { local = true; } } metrics.messageSent(address, true, local, remote); if (local) { deliverMessageLocally(sendContext); } } }
|
这里就到了分send
和publish
的时候了。如果发送消息的模式为点对点模式(send
),Event Bus会从给的的集合中通过轮询算法获取一个ServerID
。然后Event Bus会检查获取到的ServerID
是否与本机ServerID
相同,如果相同则代表在一个机子上,直接记录metrics信息并且调用deliverMessageLocally
方法往本地发送消息即可;如果不相同,Event Bus就会调用sendRemote
方法执行真正的远程消息发送逻辑。发布订阅模式的逻辑与其大同小异,只不过需要遍历一下ChoosableIterable<ServerID>
集合,然后依次执行之前讲过的逻辑。注意如果要在本地发布消息只需要发一次。
真正的远程消息发送逻辑在sendRemote
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| private void sendRemote(ServerID theServerID, MessageImpl message) { ConnectionHolder holder = connections.get(theServerID); if (holder == null) { holder = new ConnectionHolder(this, theServerID, options); ConnectionHolder prevHolder = connections.putIfAbsent(theServerID, holder); if (prevHolder != null) { holder = prevHolder; } else { holder.connect(); } } holder.writeMessage((ClusteredMessage)message); }
|
一开始我们就提到过,节点之间通过Event Bus进行通信的本质是TCP,因此这里需要创建一个NetClient
作为TCP服务端,连接到之前获取的ServerID
对应的节点然后将消息通过TCP协议发送至接收端。这里Vert.x用一个封装类ConnectionHolder
对NetClient
进行了一些封装。
ClusteredEventBus
中维持着一个connections
哈希表对用于保存ServerID
对应的连接ConnectionHolder
。在sendRemote
方法中,Event Bus首先会从connections
中获取ServerID
对应的连接。如果获取不到就创建连接并将其添加至connections
记录中并调用对应ConnectionHolder
的connect
方法建立连接;最后调用writeMessage
方法将消息编码后通过TCP发送至对应的接收端。
那么ConnectionHolder
是如何实现的呢?我们来看一下其构造函数:
1 2 3 4 5 6 7 8 9 10
| ConnectionHolder(ClusteredEventBus eventBus, ServerID serverID, EventBusOptions options) { this.eventBus = eventBus; this.serverID = serverID; this.vertx = eventBus.vertx(); this.metrics = eventBus.getMetrics(); NetClientOptions clientOptions = new NetClientOptions(options.toJson()); ClusteredEventBus.setCertOptions(clientOptions, options.getKeyCertOptions()); ClusteredEventBus.setTrustOptions(clientOptions, options.getTrustOptions()); client = new NetClientImpl(eventBus.vertx(), clientOptions, false); }
|
可以看到ConnectionHolder
初始化的时候会创建一个NetClient
作为TCP请求端,而请求的对象就是接收端的NetServer
(后边会讲),客户端配置已经在EventBusOptions
中事先配置好了。我们来看看connect
方法是如何建立连接的:
1 2 3 4 5 6 7 8 9 10 11 12
| synchronized void connect() { if (connected) { throw new IllegalStateException("Already connected"); } client.connect(serverID.port, serverID.host, res -> { if (res.succeeded()) { connected(res.result()); } else { close(); } }); }
|
可以看到这里很简单地调用了NetClient#connect
方法建立TCP连接,如果建立连接成功的话会得到一个NetSocket
对象。Event Bus接着将其传至connected
方法中进行处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| private synchronized void connected(NetSocket socket) { this.socket = socket; connected = true; socket.exceptionHandler(t -> close()); socket.closeHandler(v -> close()); socket.handler(data -> { vertx.cancelTimer(timeoutID); schedulePing(); }); schedulePing(); for (ClusteredMessage message : pending) { Buffer data = message.encodeToWire(); metrics.messageWritten(message.address(), data.length()); socket.write(data); } pending.clear(); }
|
首先Event Bus通过exceptionHandler
和closeHandler
方法给连接对应的NetSocket
绑定异常回调和连接关闭回调,触发的时候都调用close
方法关闭连接;为了保证不丢失连接,消息发送方每隔一段时间就需要对消息接收方发送一次心跳包(PING
),如果消息接收方在一定时间内没有回复,那么就认为连接丢失,调用close
方法关闭连接。心跳检测的逻辑在schedulePing
方法中,比较清晰,这里就不详细说了。大家会发现ConnectionHolder
里也有个消息队列(缓冲区)pending
,并且这里会将队列中的消息依次通过TCP发送至接收端。为什么需要这样呢?其实,这要从创建TCP客户端说起。创建TCP客户端这个过程应该是异步的,需要消耗一定时间,而ConnectionHolder
中封装的connect
方法却是同步式的。前面我们刚刚看过,通过connect
方法建立连接以后会接着调用writeMessage
方法发送消息,而这时候客户端连接可能还没建立,因此需要这么个缓冲区先存着,等着连接建立了再一块发送出去(存疑:为什么不将connect
方法直接设计成异步的?)。
至于发送消息的writeMessage
方法,其逻辑一目了然:
1 2 3 4 5 6 7 8 9 10 11 12
| synchronized void writeMessage(ClusteredMessage message) { if (connected) { Buffer data = message.encodeToWire(); metrics.messageWritten(message.address(), data.length()); socket.write(data); } else { if (pending == null) { pending = new ArrayDeque<>(); } pending.add(message); } }
|
如果连接已建立,Event Bus就会调用对应ClusteredMessage
的encodeToWire
方法将其转化为字节流Buffer
,然后记录metrics信息,最后通过socket
的write
方法将消息写入到Socket中,这样消息就从发送端通过TCP发送到了接收端。如果连接未建立,就如之前讲的那样,先把消息存到消息队列中,等连接建立了再一块发出去。
这样,Clustered Event Bus下消息的发送逻辑就理清楚了。下面我们看一下接收端是如何接收消息并在本地进行消息的处理的。
消息的接收
一开始我们提到过,每个节点的Clustered Event Bus在启动时都会创建一个NetServer
作为接收消息的TCP服务端。TCP Server的port
和host
可以在EventBusOptions
中指定,如果不指定的话默认随机分配port
,然后Event Bus会根据NetServer
的配置来生成当前节点的ServerID
。
创建TCP Server的逻辑在start
方法中,与接受消息有关的逻辑就是这一句:
1
| server.connectHandler(getServerHandler());
|
我们知道,NetServer
的connectHandler
方法用于绑定对服务端Socket的处理函数,而这里绑定的处理函数是由getServerHandler
方法生成的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| private Handler<NetSocket> getServerHandler() { return socket -> { RecordParser parser = RecordParser.newFixed(4, null); Handler<Buffer> handler = new Handler<Buffer>() { int size = -1; public void handle(Buffer buff) { if (size == -1) { size = buff.getInt(0); parser.fixedSizeMode(size); } else { ClusteredMessage received = new ClusteredMessage(); received.readFromWire(buff, codecManager); metrics.messageRead(received.address(), buff.length()); parser.fixedSizeMode(4); size = -1; if (received.codec() == CodecManager.PING_MESSAGE_CODEC) { socket.write(PONG); } else { deliverMessageLocally(received); } } } }; parser.setOutput(handler); socket.handler(parser); }; }
|
逻辑非常清晰。这里Event Bus使用了RecordParser
来获取发送过来的对应长度的Buffer
,并将其绑定在NetServer
的Socket上。真正的解析Buffer
并处理的逻辑在内部的handler
中。之前ClusteredMessage
中的Wire Protocol规定Buffer
的首部第一个int
值为要发送Buffer
的长度(逻辑见ClusteredMessage#encodeToWire
方法),因此这里首先获取长度,然后给parser
设定正确的fixed size,这样parser
就可以截取正确长度的Buffer
流了。下面Event Bus会创建一个空的ClusteredMessage
,然后调用其readFromWire
方法从Buffer
中重建消息。当然这里还要记录消息已经读取的metrics信息。接着检测收到的消息实体类型是否为心跳检测包(PING
),如果是的话就发送回ACK消息(PONG
);如果不是心跳包,则代表是正常的消息,Event Bus就调用我们熟悉的deliverMessageLocally
函数在本地进行分发处理,接下来的过程就和Local模式一样了。