上篇文章中我们探索了Local模式下Event Bus的源码,在这篇文章中我们来探索一下Vert.x中的Clustered Event Bus是如何实现的。对应的Vert.x版本为3.3.2。
集群模式介绍
我们先来简单地介绍一下集群模式下Event Bus的基本原理。
我们可以通过集群模式下的Event Bus在不同的服务器之间进行通信,其本质为TCP通信。Vert.x集群模式需要一个集群管理器(默认为HazelcastClusterManager)来管理集群的状态,存储元数据。当我们在某个节点A给集群模式的Event Bus绑定一个对应地址address的consumer的时候,Event Bus会将此节点的ServerID(包含host和port信息)存储至集群管理器的共享Map中,key为绑定的地址address,value为绑定了此地址address的所有结点的ServerID集合(可以看作是具有负载均衡功能的Set)。集群中的所有节点都可以从集群管理器中获取Map记录。并且绑定consumer的同时节点A会建立一个NetServer接收数据。这样,我们再通过另一个结点B向此地址address发送消息的时候,B就会从集群管理器中取出此地址对应的ServerID集合,并根据是点对点发送还是发布,根据相应的策略创建NetClient执行消息分发逻辑。这样,对应的NetServer收到数据后会对其进行解码然后在本地进行消息的处理。
集群模式下我们还需要注意几个问题:
当某个节点挂掉的时候,其连接将会不可用,集群管理器就会将此节点的信息从集群中移除,并且传播到所有的节点删除对应缓存的信息,这样发消息的时候就不会发送到挂掉的无效节点处。至于高可用性,Vert.x提供了高可用管理器HAManager用于实现高可用性,在发生故障时能够快速failover,详情可见官方文档。
好了,下面我们就来分析一下Clustered Event Bus的源码。集群模式下Event Bus的类型为ClusteredEventBus,它继承了单机模式的EventBusImpl类。其初始化过程与Local模式大同小异,因此这里就直接分析发送和接受消息相关的逻辑了。
绑定MessageConsumer
我们还是先来看consumer方法的逻辑。前面的调用逻辑都和Local模式下相同,可以参考之前的文章。不同之处在添加记录的地方。Cluster模式下Event Bus需要将当前机器的位置存储至Map中并且传播至集群内的所有节点,因此ClusteredEventBus重写了四个参数版本的addRegistration方法(之前在EventBusImpl类中这个版本的方法用处不大,这里用处就大了):
| 1 2 3 4 5 6 7 8 9 10 11
 | @Override protected <T> void addRegistration(boolean newAddress, String address,                                    boolean replyHandler, boolean localOnly,                                    Handler<AsyncResult<Void>> completionHandler) {   if (newAddress && subs != null && !replyHandler && !localOnly) {          subs.add(address, serverID, completionHandler);   } else {     completionHandler.handle(Future.succeededFuture());   } }
 | 
如果要绑定MessageConsumer对应的地址在本地中没有注册过,并且不是Event Bus自动生成的reply consumer,并且允许在集群范围内传播的话,Event Bus就会将当前机器的位置添加到集群内的记录subs中。subs的类型为AsyncMultiMap:
| 1
 | private AsyncMultiMap<String, ServerID> subs;
 | 
ClusteredEventBus启动时会对其进行初始化:
| 1 2 3 4 5 6 7 8
 | clusterManager.<String, ServerID>getAsyncMultiMap(SUBS_MAP_NAME, ar2 -> {   if (ar2.succeeded()) {     subs = ar2.result();        } else {        } });
 | 
从名字就可以看出来,AsyncMultiMap允许一键多值,并且其变动可以在集群范围内传播。由于AsyncMultiMap是集群范围内的,因此对其操作都是异步的。在这里我们可以简单地把它看作是一个Map<String, ChoosableIterable<ServerID>>类型的键值对,其中ChoosableIterable与之前见到过的Handlers类似,属于可以通过轮询算法获取某一元素的集合。subs的key为绑定的地址,value为绑定此地址的机器位置的集合。机器的位置用ServerID表示,里面包含了该机器的host和port。这样,每当我们向某个地址绑定一个MessageConsumer的时候,绑定consumer的ServerID就会被记录到集群中并与地址相对应,其它机器在向此地址发送(或发布)消息的时候,Event Bus就可以从集群中获取在此地址上绑定了consumer的所有ServerID,再根据相应的策略选出合适的ServerID建立TCP通信将数据发送至对应机器中,对应机器收到消息后解码并在本地对其进行处理。
这里面还需要注意一点:我们可以在EventBusOptions中指定ServerID的port和host,若不指定则port将随机分配(NetServer的特性)。
剩下的过程也就大同小异了。至于unregister方法,无非就是将底层的removeRegistration方法重写,从subs中删除对应的ServerID并传播至其它节点:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
 | @Override protected <T> void removeRegistration(HandlerHolder lastHolder, String address,                                       Handler<AsyncResult<Void>> completionHandler) {   if (lastHolder != null && subs != null && !lastHolder.isLocalOnly()) {     removeSub(address, serverID, completionHandler);   } else {     callCompletionHandlerAsync(completionHandler);   } } private void removeSub(String subName, ServerID theServerID, Handler<AsyncResult<Void>> completionHandler) {   subs.remove(subName, theServerID, ar -> {     if (!ar.succeeded()) {       log.error("Failed to remove sub", ar.cause());     } else {       if (ar.result()) {         if (completionHandler != null) {           completionHandler.handle(Future.succeededFuture());         }       } else {         if (completionHandler != null) {           completionHandler.handle(Future.failedFuture("sub not found"));         }       }     }   }); }
 | 
消息的发送/发布
集群模式下的消息与本地模式下的消息不同。集群模式下的消息实体类型为ClusteredMessage,它继承了MessageImpl消息实体类,并且根据远程传输的特性实现了一种Wire Protocol用于远程传输消息,并负责消息的编码和解码。具体的实现就不展开说了,如果有兴趣的话可以阅读ClusteredMessage类中相关方法的实现。
我们上篇文章提到过,Event Bus底层通过createMessage方法创建消息。因此ClusteredEventBus里就对此方法进行了重写,当然改动就是把MessageImpl替换成了ClusteredMessage:
| 1 2 3 4 5 6 7 8
 | @Override protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {   Objects.requireNonNull(address, "no null address accepted");   MessageCodec codec = codecManager.lookupCodec(body, codecName);   @SuppressWarnings("unchecked")   ClusteredMessage msg = new ClusteredMessage(serverID, address, null, headers, body, codec, send, this);   return msg; }
 | 
接下来就是消息的发送逻辑了。ClusteredEventBus重写了sendOrPub方法,此方法存在于SendContextImpl类中的next方法:
| 1 2 3 4 5 6 7 8 9 10 11 12 13
 | @Override public void next() {   if (iter.hasNext()) {     Handler<SendContext> handler = iter.next();     try {       handler.handle(this);     } catch (Throwable t) {       log.error("Failure in interceptor", t);     }   } else {     sendOrPub(this);   } }
 | 
我们来看一下ClusteredEventBus是如何进行集群内消息的分发的:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
 | @Override protected <T> void sendOrPub(SendContextImpl<T> sendContext) {   String address = sendContext.message.address();   Handler<AsyncResult<ChoosableIterable<ServerID>>> resultHandler = asyncResult -> {     if (asyncResult.succeeded()) {       ChoosableIterable<ServerID> serverIDs = asyncResult.result();       if (serverIDs != null && !serverIDs.isEmpty()) {         sendToSubs(serverIDs, sendContext);       } else {         metrics.messageSent(address, !sendContext.message.send(), true, false);         deliverMessageLocally(sendContext);       }     } else {       log.error("Failed to send message", asyncResult.cause());     }   };   if (Vertx.currentContext() == null) {          sendNoContext.runOnContext(v -> {       subs.get(address, resultHandler);     });   } else {     subs.get(address, resultHandler);   } }
 | 
首先Event Bus需要从传入的sendContext中获取要发送至的地址。接着Event Bus需要从集群管理器中获取在此地址上绑定consumer的所有ServerID,这个过程是异步的,并且需要在Vert.x Context中执行。如果获取记录成功,我们会得到一个可通过轮询算法获取ServerID的集合(类型为ChoosableIterable<ServerID>)。如果集合为空,则代表集群内其它节点没有在此地址绑定consumer(或者由于一致性问题没有同步),Event Bus就将消息通过deliverMessageLocally方法在本地进行相应的分发。deliverMessageLocally方法的逻辑之前我们已经详细讲过了,这里就不再细说了;如果集合不为空,Event Bus就调用sendToSubs方法进行下一步操作:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
 | private <T> void sendToSubs(ChoosableIterable<ServerID> subs, SendContextImpl<T> sendContext) {   String address = sendContext.message.address();   if (sendContext.message.send()) {          ServerID sid = subs.choose();     if (!sid.equals(serverID)) {         metrics.messageSent(address, false, false, true);       sendRemote(sid, sendContext.message);     } else {       metrics.messageSent(address, false, true, false);       deliverMessageLocally(sendContext);     }   } else {          boolean local = false;     boolean remote = false;     for (ServerID sid : subs) {       if (!sid.equals(serverID)) {           remote = true;         sendRemote(sid, sendContext.message);       } else {         local = true;       }     }     metrics.messageSent(address, true, local, remote);     if (local) {       deliverMessageLocally(sendContext);     }   } }
 | 
这里就到了分send和publish的时候了。如果发送消息的模式为点对点模式(send),Event Bus会从给的的集合中通过轮询算法获取一个ServerID。然后Event Bus会检查获取到的ServerID是否与本机ServerID相同,如果相同则代表在一个机子上,直接记录metrics信息并且调用deliverMessageLocally方法往本地发送消息即可;如果不相同,Event Bus就会调用sendRemote方法执行真正的远程消息发送逻辑。发布订阅模式的逻辑与其大同小异,只不过需要遍历一下ChoosableIterable<ServerID>集合,然后依次执行之前讲过的逻辑。注意如果要在本地发布消息只需要发一次。
真正的远程消息发送逻辑在sendRemote方法中:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14
 | private void sendRemote(ServerID theServerID, MessageImpl message) {   ConnectionHolder holder = connections.get(theServerID);   if (holder == null) {     holder = new ConnectionHolder(this, theServerID, options);     ConnectionHolder prevHolder = connections.putIfAbsent(theServerID, holder);     if (prevHolder != null) {              holder = prevHolder;     } else {       holder.connect();     }   }   holder.writeMessage((ClusteredMessage)message); }
 | 
一开始我们就提到过,节点之间通过Event Bus进行通信的本质是TCP,因此这里需要创建一个NetClient作为TCP服务端,连接到之前获取的ServerID对应的节点然后将消息通过TCP协议发送至接收端。这里Vert.x用一个封装类ConnectionHolder对NetClient进行了一些封装。
ClusteredEventBus中维持着一个connections哈希表对用于保存ServerID对应的连接ConnectionHolder。在sendRemote方法中,Event Bus首先会从connections中获取ServerID对应的连接。如果获取不到就创建连接并将其添加至connections记录中并调用对应ConnectionHolder的connect方法建立连接;最后调用writeMessage方法将消息编码后通过TCP发送至对应的接收端。
那么ConnectionHolder是如何实现的呢?我们来看一下其构造函数:
| 1 2 3 4 5 6 7 8 9 10
 | ConnectionHolder(ClusteredEventBus eventBus, ServerID serverID, EventBusOptions options) {   this.eventBus = eventBus;   this.serverID = serverID;   this.vertx = eventBus.vertx();   this.metrics = eventBus.getMetrics();   NetClientOptions clientOptions = new NetClientOptions(options.toJson());   ClusteredEventBus.setCertOptions(clientOptions, options.getKeyCertOptions());   ClusteredEventBus.setTrustOptions(clientOptions, options.getTrustOptions());   client = new NetClientImpl(eventBus.vertx(), clientOptions, false); }
 | 
可以看到ConnectionHolder初始化的时候会创建一个NetClient作为TCP请求端,而请求的对象就是接收端的NetServer(后边会讲),客户端配置已经在EventBusOptions中事先配置好了。我们来看看connect方法是如何建立连接的:
| 1 2 3 4 5 6 7 8 9 10 11 12
 | synchronized void connect() {   if (connected) {     throw new IllegalStateException("Already connected");   }   client.connect(serverID.port, serverID.host, res -> {     if (res.succeeded()) {       connected(res.result());     } else {       close();     }   }); }
 | 
可以看到这里很简单地调用了NetClient#connect方法建立TCP连接,如果建立连接成功的话会得到一个NetSocket对象。Event Bus接着将其传至connected方法中进行处理:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
 | private synchronized void connected(NetSocket socket) {   this.socket = socket;   connected = true;   socket.exceptionHandler(t -> close());   socket.closeHandler(v -> close());   socket.handler(data -> {          vertx.cancelTimer(timeoutID);     schedulePing();   });      schedulePing();   for (ClusteredMessage message : pending) {     Buffer data = message.encodeToWire();     metrics.messageWritten(message.address(), data.length());     socket.write(data);   }   pending.clear(); }
 | 
首先Event Bus通过exceptionHandler和closeHandler方法给连接对应的NetSocket绑定异常回调和连接关闭回调,触发的时候都调用close方法关闭连接;为了保证不丢失连接,消息发送方每隔一段时间就需要对消息接收方发送一次心跳包(PING),如果消息接收方在一定时间内没有回复,那么就认为连接丢失,调用close方法关闭连接。心跳检测的逻辑在schedulePing方法中,比较清晰,这里就不详细说了。大家会发现ConnectionHolder里也有个消息队列(缓冲区)pending,并且这里会将队列中的消息依次通过TCP发送至接收端。为什么需要这样呢?其实,这要从创建TCP客户端说起。创建TCP客户端这个过程应该是异步的,需要消耗一定时间,而ConnectionHolder中封装的connect方法却是同步式的。前面我们刚刚看过,通过connect方法建立连接以后会接着调用writeMessage方法发送消息,而这时候客户端连接可能还没建立,因此需要这么个缓冲区先存着,等着连接建立了再一块发送出去(存疑:为什么不将connect方法直接设计成异步的?)。
至于发送消息的writeMessage方法,其逻辑一目了然:
| 1 2 3 4 5 6 7 8 9 10 11 12
 | synchronized void writeMessage(ClusteredMessage message) {   if (connected) {     Buffer data = message.encodeToWire();     metrics.messageWritten(message.address(), data.length());     socket.write(data);   } else {     if (pending == null) {       pending = new ArrayDeque<>();     }     pending.add(message);   } }
 | 
如果连接已建立,Event Bus就会调用对应ClusteredMessage的encodeToWire方法将其转化为字节流Buffer,然后记录metrics信息,最后通过socket的write方法将消息写入到Socket中,这样消息就从发送端通过TCP发送到了接收端。如果连接未建立,就如之前讲的那样,先把消息存到消息队列中,等连接建立了再一块发出去。
这样,Clustered Event Bus下消息的发送逻辑就理清楚了。下面我们看一下接收端是如何接收消息并在本地进行消息的处理的。
消息的接收
一开始我们提到过,每个节点的Clustered Event Bus在启动时都会创建一个NetServer作为接收消息的TCP服务端。TCP Server的port和host可以在EventBusOptions中指定,如果不指定的话默认随机分配port,然后Event Bus会根据NetServer的配置来生成当前节点的ServerID。
创建TCP Server的逻辑在start方法中,与接受消息有关的逻辑就是这一句:
| 1
 | server.connectHandler(getServerHandler());
 | 
我们知道,NetServer的connectHandler方法用于绑定对服务端Socket的处理函数,而这里绑定的处理函数是由getServerHandler方法生成的:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
 | private Handler<NetSocket> getServerHandler() {   return socket -> {     RecordParser parser = RecordParser.newFixed(4, null);     Handler<Buffer> handler = new Handler<Buffer>() {       int size = -1;       public void handle(Buffer buff) {         if (size == -1) {           size = buff.getInt(0);           parser.fixedSizeMode(size);         } else {           ClusteredMessage received = new ClusteredMessage();           received.readFromWire(buff, codecManager);           metrics.messageRead(received.address(), buff.length());           parser.fixedSizeMode(4);           size = -1;           if (received.codec() == CodecManager.PING_MESSAGE_CODEC) {                          socket.write(PONG);           } else {             deliverMessageLocally(received);           }         }       }     };     parser.setOutput(handler);     socket.handler(parser);   }; }
 | 
逻辑非常清晰。这里Event Bus使用了RecordParser来获取发送过来的对应长度的Buffer,并将其绑定在NetServer的Socket上。真正的解析Buffer并处理的逻辑在内部的handler中。之前ClusteredMessage中的Wire Protocol规定Buffer的首部第一个int值为要发送Buffer的长度(逻辑见ClusteredMessage#encodeToWire方法),因此这里首先获取长度,然后给parser设定正确的fixed size,这样parser就可以截取正确长度的Buffer流了。下面Event Bus会创建一个空的ClusteredMessage,然后调用其readFromWire方法从Buffer中重建消息。当然这里还要记录消息已经读取的metrics信息。接着检测收到的消息实体类型是否为心跳检测包(PING),如果是的话就发送回ACK消息(PONG);如果不是心跳包,则代表是正常的消息,Event Bus就调用我们熟悉的deliverMessageLocally函数在本地进行分发处理,接下来的过程就和Local模式一样了。