Event Bus是Vert.x的“神经系统”,是最为关键的几个部分之一。今天我们就来探索一下Event Bus的实现原理。本篇分析的是Local模式的Event Bus,对应的Vert.x版本为3.3.2。
本文假定读者有一定的并发编程基础以及Vert.x使用基础,并且对Vert.x的线程模型以及back-pressure有所了解。
Local Event Bus的创建
一般情况下,我们通过Vertx
的eventBus
方法来创建或获取一个EventBus
实例:
|
|
eventBus
方法定义于Vertx
接口中,我们来看一下其位于VertxImpl
类中的实现:
|
|
可以看到此方法返回VertxImpl
实例中的eventBus
成员,同时需要注意并发可见性问题。那么eventBus
成员是何时初始化的呢?答案在VertxImpl
类的构造函数中。这里截取相关逻辑:
|
|
可以看到VertxImpl
内部是通过createAndStartEventBus
方法来初始化eventBus
的。我们来看一下其逻辑:
|
|
可以看到此方法通过eventBus = new EventBusImpl(this)
将eventBus
进行了初始化(Local模式为EventBusImpl
),并且调用eventBus
的start
方法对其进行一些额外的初始化工作。我们来看一下EventBusImpl
类的start
方法:
|
|
首先初始化过程需要防止race condition,因此方法为synchronized
的。该方法仅仅将EventBusImpl
类中的一个started
标志位设为true
来代表Event Bus已启动。注意started
标志位为volatile
的,这样可以保证其可见性,确保其它线程通过checkStarted
方法读到的started
结果总是最新的。设置完started
标志位后,Vert.x会接着调用传入的completionHandler
处理函数,也就是上面我们在createAndStartEventBus
方法中看到的 —— 调用metrics
成员的eventBusInitialized
方法以便Metrics类可以在Event Bus初始化完毕后使用它(不过默认情况下此方法的逻辑为空)。
可以看到初始化过程还是比较简单的,我们接下来先来看看订阅消息 —— consumer
方法的逻辑。
consume
我们来看一下consumer
方法的逻辑,其原型位于EventBus
接口中:
|
|
其实现位于EventBusImpl
类中:
|
|
首先要确保传入的handler
不为空,然后Vert.x会调用只接受一个address
参数的consumer
方法获取对应的MessageConsumer
,最后给获取到的MessageConsumer
绑定上传入的handler
。我们首先来看一下另一个consumer
方法的实现:
|
|
首先Vert.x会检查Event Bus是否已经启动,并且确保传入的地址不为空。然后Vert.x会传入一大堆参数创建一个新的HandlerRegistration
类型的实例,并返回。可以推测HandlerRegistration
是MessageConsumer
接口的具体实现,它一定非常重要。所以我们来看一看HandlerRegistration
类是个啥玩意。首先看一下HandlerRegistration
的类体系结构:
可以看到HandlerRegistration
类同时继承了MessageConsumer<T>
以及Handler<Message<T>>
接口,从其类名可以看出它相当于一个”Handler注册记录”,是非常重要的一个类。它有一堆的成员变量,构造函数对vertx
, metrics
, eventBus
, address
(发送地址), repliedAddress
(回复地址), localOnly
(是否在集群内传播), asyncResultHandler
等几个成员变量进行初始化,并且检查超时时间timeout
,如果设定了超时时间那么设定并保存超时计时器(仅用于reply handler中),如果计时器时间到,代表回复超时。因为有一些函数还没介绍,超时的逻辑我们后边再讲。
Note: 由于
MessageConsumer
接口继承了ReadStream
接口,因此它支持back-pressure,其实现就在HandlerRegistration
类中。我们将稍后解析back-pressure的实现。
现在回到consumer
方法中来。创建了MessageConsumer
实例后,我们接着调用它的handler
方法绑定上对应的消息处理函数。handler
方法的实现位于HandlerRegistration
类中:
|
|
首先,handler
方法将此HandlerRegistration
中的handler
成员设置为传入的消息处理函数。HandlerRegistration
类中有一个registered
标志位代表是否已绑定消息处理函数。handler
方法会检查传入的handler
是否为空且是否已绑定消息处理函数。如果不为空且未绑定,Vert.x就会将registered
标志位设为true
并且调用eventBus
的addRegistration
方法将此consumer注册至Event Bus上;如果handler
为空且已绑定消息处理函数,我们就调用unregister
方法注销当前的consumer。我们稍后会分析unregister
方法的实现。
前面提到过注册consumer的逻辑位于Event Bus的addRegistration
方法中,因此我们来分析一下它的实现:
|
|
addRegistration
方法接受四个参数:发送地址address
、传入的consumer registration
、代表是否为reply handler的标志位replyHandler
以及代表是否在集群范围内传播的标志位localOnly
。首先确保传入的HandlerRegistration
不为空。然后Vert.x会调用addLocalRegistration
方法将此consumer注册至Event Bus上:
|
|
首先该方法要确保地址address
不为空,接着它会获取当前线程下对应的Vert.x Context,如果获取不到则表明当前不在Verticle
中(即Embedded),需要调用vertx.getOrCreateContext()
来获取Context
;然后将获取到的Context
赋值给registration
内部的handlerContext
(代表消息处理对应的Vert.x Context)。
下面就要将给定的registration
注册至Event Bus上了。这里Vert.x用一个HandlerHolder
类来包装registration
和context
。接着Vert.x会从存储消息处理Handler
的哈希表handlerMap
中获取给定地址对应的Handlers
,哈希表的类型为ConcurrentMap<String, Handlers>
,key为地址,value为对应的HandlerHolder
集合。注意这里的Handlers
类代表一些Handler
的集合,它内部维护着一个列表list
用于存储每个HandlerHolder
。Handlers
类中只有一个choose
函数,此函数根据轮询算法从HandlerHolder
集合中选定一个HandlerHolder
,这即是Event Bus发送消息时实现load-balancing的方法:
|
|
获取到对应的handlers
以后,Vert.x首先需要检查其是否为空,如果为空代表此地址还没有注册消息处理Handler
,Vert.x就会创建一个Handlers
并且将其置入handlerMap
中,将newAddress
标志位设为true
代表这是一个新注册的地址,然后将其赋值给handlers
。接着我们向handlers
中的HandlerHolder
列表list
中添加刚刚创建的HandlerHolder
实例,这样就将registration
注册至Event Bus中了。
前面判断当前线程是否在Vert.x Context的标志位hasContext
还有一个用途:如果当前线程在Vert.x Context下(比如在Verticle中),Vert.x会通过addCloseHook
方法给当前的context
添加一个钩子函数用于注销当前绑定的registration
。当对应的Verticle
被undeploy的时候,此Verticle绑定的所有消息处理Handler
都会被unregister。Hook的类型为HandlerEntry<T>
,它继承了Closeable
接口,对应的逻辑在close
函数中实现:
|
|
可以看到close
函数会将绑定的registration
从Event Bus的handlerMap
中移除并执行completionHandler
中的逻辑,completionHandler
可由用户指定。
那么在哪里调用这些绑定的hook呢?答案是在DeploymentManager
类中的doUndeploy
方法中,通过context
的runCloseHooks
方法执行绑定的hook函数。相关代码如下(只截取相关逻辑):
|
|
再回到addRegistration
方法中。刚才addLocalRegistration
方法的返回值newAddress
代表对应的地址是否为新注册的。接着我们调用另一个版本的addRegistration
方法,传入了一大堆参数:
|
|
好吧,传入的前几个参数没用到。。。最后一个参数completionHandler
传入的是registration::setResult
方法引用,也就是说这个方法调用了对应registration
的setResult
方法。其实现位于HandlerRegistration
类中:
|
|
首先先设置registration
内部的result
成员(正常情况下为Future.succeededFuture()
)。接着Vert.x会判断registration
是否绑定了completionHandler
(与之前的completionHandler
不同,这里的completionHandler
是MessageConsumer
注册成功时调用的Handler
),若绑定则记录Metrics信息(handlerRegistered
)并在Vert.x Context内调用completionHandler
的逻辑;若未绑定completionHandler
则仅记录Metrics信息。
到此为止,consumer
方法的逻辑就分析完了。在分析send
和publish
方法的逻辑之前,我们先来看一下如何注销绑定的MessageConsumer
。
unregister
我们通过调用MessageConsumer
的unregister
方法实现注销操作。Vert.x提供了两个版本的unregister
方法:
|
|
其中第二个版本的unregister
方法会在注销操作完成时调用传入的completionHandler
。比如在cluster范围内注销consumer需要消耗一定的时间在集群内传播,因此第二个版本的方法就会派上用场。我们来看一下其实现,它们最后都是调用了HandlerRegistration
类的doUnregister
方法:
|
|
如果设定了超时定时器(timeoutID
合法),那么Vert.x会首先将定时器关闭。接着Vert.x会判断是否需要调用endHandler
。那么endHandler
又是什么呢?前面我们提到过MessageConsumer
接口继承了ReadStream
接口,而ReadStream
接口定义了一个endHandler
方法用于绑定一个endHandler
,当stream中的数据读取完毕时会调用。而在Event Bus中,消息源源不断地从一处发送至另一处,因此只有在某个consumer
被unregister的时候,其对应的stream才可以叫“读取完毕”,因此Vert.x选择在doUnregister
方法中调用endHandler
。
接着Vert.x会判断此consumer是否已注册消息处理函数Handler
(通过检查registered
标志位),若已注册则将对应的Handler
从Event Bus中的handlerMap
中移除并将registered
设为false
;若还未注册Handler
且提供了注销结束时的回调completionHandler
(注意不是HandlerRegistration
类的成员变量completionHandler
,而是之前第二个版本的unregister
中传入的Handler
,用同样的名字很容易混。。。),则通过callCompletionHandlerAsync
方法调用回调函数。
从Event Bus中移除Handler
的逻辑位于EventBusImpl
类的removeRegistration
方法中:
|
|
其真正的unregister
逻辑位于removeLocalRegistration
方法中。首先需要从handlerMap
中获取地址对应的Handlers
实例handlers
,如果handlers
不为空,为了防止并发问题,Vert.x需要对其加锁后再进行操作。Vert.x需要遍历handlers
中的列表,遇到与传入的HandlerRegistration
相匹配的HandlerHolder
就将其从列表中移除,然后调用对应holder
的setRemoved
方法标记其为已注销并记录Metrics数据(handlerUnregistered
)。如果移除此HandlerHolder
后handlers
没有任何注册的Handler
了,就将该地址对应的Handlers
实例从handlerMap
中移除并保存刚刚移除的HandlerHolder
。另外,由于已经将此consumer注销,在undeploy verticle的时候不需要再进行unregister,因此这里还要将之前注册到context的hook移除。
调用完removeLocalRegistration
方法以后,Vert.x会调用另一个版本的removeRegistration
方法,调用completionHandler
(用户在第二个版本的unregister
方法中传入的处理函数)对应的逻辑,其它的参数都没什么用。。。
这就是MessageConsumer
注销的逻辑实现。下面就到了本文的另一重头戏了 —— 发送消息相关的函数send
和publish
。
send & publish
send
和publish
的逻辑相近,只不过一个是发送至目标地址的某一消费者,一个是发布至目标地址的所有消费者。Vert.x使用一个标志位send
来代表是否为点对点发送模式。
几个版本的send
和publish
最终都归结于生成消息对象然后调用sendOrPubInternal
方法执行逻辑,只不过send
标志位不同:
|
|
两个方法中都是通过createMessage
方法来生成对应的消息对象的:
|
|
createMessage
方法接受5个参数:send
即上面提到的标志位,address
为发送目标地址,headers
为设置的header,body
代表发送的对象,codecName
代表对应的Codec(消息编码解码器)名称。createMessage
方法首先会确保地址不为空,然后通过codecManager
来获取对应的MessageCodec
。如果没有提供Codec(即codecName
为空),那么codecManager
会根据发送对象body
的类型来提供内置的Codec实现(具体逻辑请见此处)。准备好MessageCodec
后,createMessage
方法就会创建一个MessageImpl
实例并且返回。
这里我们还需要了解一下MessageImpl
的构造函数:
|
|
createMessage
方法并没有设置回复地址replyAddress
。如果用户指定了replyHandler
的话,后边sendOrPubInternal
方法会对此消息实体进行加工,设置replyAddress
并生成回复逻辑对应的HandlerRegistration
。
我们看一下sendOrPubInternal
方法的源码:
|
|
它接受三个参数:要发送的消息message
,发送配置选项options
以及回复处理函数replyHandler
。首先sendOrPubInternal
方法要检查Event Bus是否已启动,接着如果绑定了回复处理函数,Vert.x就会调用createReplyHandlerRegistration
方法给消息实体message
包装上回复地址,并且生成对应的reply consumer。接着Vert.x创建了一个包装消息的SendContextImpl
实例并调用了其next
方法。
我们一步一步来解释。首先是createReplyHandlerRegistration
方法:
|
|
createReplyHandlerRegistration
方法首先检查传入的replyHandler
是否为空(是否绑定了replyHandler
,回复处理函数),如果为空则代表不需要处理回复,直接返回null
;若replyHandler
不为空,createReplyHandlerRegistration
方法就会从配置中获取reply的最大超时时长(默认30s),然后调用generateReplyAddress
方法生成对应的回复地址replyAddress
:
|
|
生成回复地址的逻辑有点简单。。。。EventBusImpl
实例中维护着一个AtomicLong
类型的replySequence
成员变量代表对应的回复地址。每次生成的时候都会使其自增,然后转化为String。也就是说生成的replyAddress
都类似于”1”、”5”这样,而不是我们想象中的直接回复至sender的地址。。。
生成完毕以后,createReplyHandlerRegistration
方法会将生成的replyAddress
设定到消息对象message
中。接着Vert.x会通过convertHandler
方法对replyHandler
进行包装处理并生成类型简化为Handler<Message<T>>
的simpleReplyHandler
,它用于绑定至后面创建的reply consumer上。接着Vert.x会创建对应的reply consumer。关于reply
操作的实现,我们后边会详细讲述。下面Vert.x就通过handler
方法将生成的回复处理函数simpleReplyHandler
绑定至创建好的reply consumer中,其底层实现我们之前已经分析过了,这里就不再赘述。最后此方法返回生成的registration
,即对应的reply consumer。注意这个reply consumer是一次性的,也就是说Vert.x会在其接收到回复或超时的时候自动对其进行注销。
OK,现在回到sendOrPubInternal
方法中来。下面Vert.x会创建一个SendContextImpl
实例并调用其next
方法。SendContextImpl
类实现了SendContext
接口,它相当于一个消息的封装体,并且可以与Event Bus中的interceptors
(拦截器)结合使用。
SendContext
接口定义了三个方法:
message
: 获取当前SendContext
包装的消息实体next
: 调用下一个消息拦截器send
: 代表消息的发送模式是否为点对点模式
在Event Bus中,消息拦截器本质上是一个Handler<SendContext>
类型的处理函数。Event Bus内部存储着一个interceptors
列表用于存储绑定的消息拦截器。我们可以通过addInterceptor
和removeInterceptor
方法进行消息拦截器的添加和删除操作。如果要进行链式拦截,则在每个拦截器中都应该调用对应SendContext
的next
方法,比如:
|
|
我们来看一下SendContextImpl
类中next
方法的实现:
|
|
我们可以看到,SendContextImpl
类中维护了一个拦截器列表对应的迭代器。每次调用next
方法时,如果迭代器中存在拦截器,就将下个拦截器取出并进行相关调用。如果迭代器为空,则代表拦截器都已经调用完毕,Vert.x就会调用EventBusImpl
类下的sendOrPub
方法进行消息的发送操作。
sendOrPub
方法仅仅在metrics模块中记录相关数据(messageSent
),最后调用deliverMessageLocally(SendContextImpl<T>)
方法执行消息的发送逻辑:
|
|
这里面又套了一层。。。它最后其实是调用了deliverMessageLocally(MessageImpl)
方法。此方法返回值代表发送消息的目标地址是否注册有MessageConsumer
,如果没有(false
)则记录错误并调用sendContext
中保存的回复处理函数处理错误(如果绑定了replyHandler
的话)。
deliverMessageLocally(MessageImpl)
方法是真正区分send
和publish
的地方,我们来看一下其实现:
|
|
首先Vert.x需要从handlerMap
中获取目标地址对应的处理函数集合handlers
。接着,如果handlers
不为空的话,Vert.x就会判断消息实体的send
标志位。如果send
标志位为true
则代表以点对点模式发送,Vert.x就会通过handlers
的choose
方法(之前提到过),按照轮询算法来获取其中的某一个HandlerHolder
。获取到HandlerHolder
之后,Vert.x会通过deliverToHandler
方法将消息分发至HandlerHolder
中进行处理;如果send
标志位为false
则代表向所有消费者发布消息,Vert.x就会对handlers
中的每一个HandlerHolder
依次调用deliverToHandler
方法,以便将消息分发至所有注册到此地址的Handler
中进行处理。
消息处理的真正逻辑就在deliverToHandler
方法中。我们来看一下它的源码:
|
|
首先deliverToHandler
方法会复制一份要发送的消息,然后deliverToHandler
方法会调用metrics
的scheduleMessage
方法记录对应的Metrics信息(计划对消息进行处理。此函数修复了Issue 1480)。接着deliverToHandler
方法会从传入的HandlerHolder
中获取对应的Vert.x Context,然后调用runOnContext
方法以便可以让消息处理逻辑在Vert.x Context中执行。为防止对应的handler在处理之前被移除,这里还需要检查一下holder
的isRemoved
属性。如果没有移除,那么就从holder
中获取对应的handler
并调用其handle
方法执行消息的处理逻辑。注意这里获取的handler
实际上是一个HandlerRegistration
。前面提到过HandlerRegistration
类同时实现了MessageConsumer
接口和Handler
接口,因此它兼具这两个接口所期望的功能。另外,之前我们提到过Vert.x会自动注销接收过回复的reply consumer,其逻辑就在这个finally块中。Vert.x会检查holder
中的handler
是否为reply handler(reply consumer),如果是的话就调用其unregister
方法将其注销,来确保reply consumer为一次性的。
之前我们提到过MessageConsumer
继承了ReadStream
接口,因此HandlerRegistration
需要实现flow control(back-pressure)的相关逻辑。那么如何实现呢?我们看到,HandlerRegistration
类中有一个paused
标志位代表是否还继续处理消息。ReadStream
接口中定义了两个函数用于控制stream的通断:当处理速度小于读取速度(发生拥塞)的时候我们可以通过pause
方法暂停消息的传递,将积压的消息暂存于内部的消息队列(缓冲区)pending
中;当相对速度正常的时候,我们可以通过resume
方法恢复消息的传递和处理。
我们看一下HandlerRegistration
中handle
方法的实现:
|
|
果然。。。handle
方法在处理消息的基础上实现了拥塞控制的功能。为了防止资源争用,需要对自身进行加锁;首先handle
方法会判断当前的consumer
是否为paused
状态,如果为paused
状态,handle
方法会检查当前缓冲区大小是否已经超过给定的最大缓冲区大小maxBufferedMessages
,如果没超过,就将收到的消息push到缓冲区中;如果大于或等于阈值,Vert.x就需要丢弃超出的那部分消息。如果当前的consumer
为正常状态,则如果缓冲区不为空,就将收到的消息push到缓冲区中并从缓冲区中pull队列首端的消息,然后调用deliver
方法执行真正的消息处理逻辑。注意这里是在锁之外执行deliver
方法的,这是为了保证在multithreaded worker context下可以并发传递消息(见Bug 473714
)。由于multithreaded worker context允许在不同线程并发执行逻辑(见官方文档),如果将deliver
方法置于synchronized
块之内,其他线程必须等待当前锁被释放才能进行消息的传递逻辑,因而不能做到“delivery concurrently”。
deliver
方法是真正执行“消息处理”逻辑的地方:
|
|
首先Vert.x会调用checkNextTick
方法来检查消息队列(缓冲区)中是否存在更多的消息等待被处理,如果有的话就取出队列首端的消息并调用deliver
方法将其传递给handler
进行处理。这里仍需要注意并发问题,相关实现:
|
|
检查完消息队列以后,Vert.x会接着根据message
判断消息是否仅在本地进行处理并给local
标志位赋值,local
标志位将在记录Metrics数据时用到。
接下来我们看到Vert.x从消息的headers
中获取了一个地址creditsAddress
,如果creditsAddress
存在就向此地址发送一条消息,body为1
。那么这个creditsAddress
又是啥呢?其实,它与flow control有关,我们会在下面详细分析。发送完credit
消息以后,接下来就到了调用handler
处理消息的时刻了。在处理消息之前需要调用metrics
的beginHandleMessage
方法记录消息开始处理的metrics数据,在处理完消息以后需要调用endHandleMessage
方法记录消息处理结束的metrics数据。
嗯。。。到此为止,消息的发送和处理过程就已经一目了然了。下面我们讲一讲之前代码中出现的creditsAddress
到底是啥玩意~
MessageProducer
之前我们提到过,Vert.x定义了两个接口作为 flow control aware object 的规范:WriteStream
以及ReadStream
。对于ReadStream
我们已经不再陌生了,MessageConsumer
就继承了它;那么大家应该可以想象到,有MessageConsumer
就必有MessageProducer
。不错,Vert.x中的MessageProducer
接口对应某个address
上的消息生产者,同时它继承了WriteStream
接口,因此MessageProducer
的实现类MessageProducerImpl
同样具有flow control的能力。我们可以把MessageProducer
看做是一个具有flow control功能的增强版的EventBus
。我们可以通过EventBus
接口的publisher
方法创建一个MessageProducer
。
对MessageProducer
有了初步了解之后,我们就可以解释前面deliver
方法中的creditsAddress
了。MessageProducer
接口的实现类 —— MessageProducerImpl
类的流量控制功能是基于credit
的,其内部会维护一个credit
值代表“发送消息的能力”,其默认值等于DEFAULT_WRITE_QUEUE_MAX_SIZE
:
|
|
在采用点对点模式发送消息的时候,MessageProducer
底层会调用doSend
方法进行消息的发送。发送依然利用Event Bus的send
方法,只不过doSend
方法中添加了flow control的相关逻辑:
|
|
与MessageConsumer
类似,MessageProducer
内部同样保存着一个消息队列(缓冲区)用于暂存堆积的消息。当credits
大于0的时候代表可以发送消息(没有出现拥塞),Vert.x就会调用Event Bus的send
方法进行消息的发送,同时credits
要减1;如果credits
小于等于0,则代表此时消息发送的速度太快,出现了拥塞,需要暂缓发送,因此将要发送的对象暂存于缓冲区中。大家可能会问,credits
值不断减小,那么恢复消息发送能力(增大credits
)的逻辑在哪呢?这就要提到creditsAddress
了。我们看一下MessageProducerImpl
类的构造函数:
|
|
MessageProducerImpl
的构造函数中生成了一个creditAddress
,然后给该地址绑定了一个Handler
,当收到消息时调用doReceiveCredit
方法执行解除拥塞,恢复消息发送的逻辑。MessageProducerImpl
会将此MessageConsumer
保存,以便在关闭消息生产者流的时候将其注销。接着构造函数会往options
的headers
中添加一条记录,保存对应的creditAddress
,这也就是上面我们在deliver
函数中获取的creditAddress
:
|
|
这样,发送消息到creditsAddress
的逻辑也就好理解了。由于deliver
函数的逻辑就是处理消息,因此这里向creditsAddress
发送一个 1 其实就是将对应的credits
值加1。恢复消息发送的逻辑位于MessageProducerImpl
类的doReceiveCredit
方法中:
|
|
逻辑一目了然。首先给credits
加上发送过来的值(正常情况下为1),然后恢复发送能力,将缓冲区的数据依次取出、发送然后减小credits
。同时如果MessageProducer
绑定了drainHandler
(消息流不拥塞的时候调用的逻辑,详见官方文档),并且MessageProducer
发送的消息不再拥塞(credits >= maxSize / 2
),那么就在Vert.x Context中执行drainHandler
中的逻辑。
怎么样,体会到Vert.x中flow control的强大之处了吧!官方文档中MessageProducer
的篇幅几乎没有,只在介绍WriteStream
的时候提了提,因此这部分也可以作为MessageProducer
的参考。
reply
最后就是消息的回复逻辑 —— reply
方法了。reply
方法的实现位于MessageImpl
类中,最终调用的是reply(Object, DeliveryOptions, Handler<AsyncResult<Message<R>>>)
这个版本:
|
|
这里reply
方法同样调用EventBus
的createMessage
方法创建要回复的消息实体,传入的replyAddress
即为之前讲过的生成的非常简单的回复地址。然后再将消息实体、配置以及对应的replyHandler
(如果有的话)传入sendReply
方法进行消息的回复。最后其实是调用了Event Bus中的四个参数的sendReply
方法,它的逻辑与之前讲过的sendOrPubInternal
非常相似:
|
|
参数中replyMessage
代表回复消息实体,replierMessage
则代表回复者自身的消息实体(sender)。
如果地址为空则抛出异常;如果地址不为空,则先调用createReplyHandlerRegistration
方法创建对应的replyHandlerRegistration
。createReplyHandlerRegistration
方法的实现之前已经讲过了。注意这里的createReplyHandlerRegistration
其实对应的是此replier的回复,因为Vert.x中的 Request-Response 消息模型不限制相互回复(通信)的次数。当然如果没有指定此replier的回复的replyHandler
,那么此处的replyHandlerRegistration
就为空。最后sendReply
方法会创建一个ReplySendContextImpl
并调用其next
方法。
ReplySendContextImpl
类同样是SendContext
接口的一个实现(继承了SendContextImpl
类)。ReplySendContextImpl
比起其父类就多保存了一个replierMessage
。next
方法的逻辑与父类逻辑非常相似,只不过将回复的逻辑替换成了另一个版本的sendReply
方法:
|
|
然而。。。sendReply
方法并没有用到传入的replierMessage
,所以这里最终还是调用了sendOrPub
方法(尼玛,封装的ReplySendContextImpl
貌似并没有什么卵用,可能为以后的扩展考虑?)。。。之后的逻辑我们都已经分析过了。
这里再强调一点。当我们发送消息同时指定replyHandler
的时候,其内部为reply创建的reply consumer(类型为HandlerRegistration
)指定了timeout
。这个定时器从HandlerRegistration
创建的时候就开始计时了。我们回顾一下:
|
|
计时器会在超时的时候记录错误并强制注销当前consumer。由于reply consumer是一次性的,当收到reply的时候,Vert.x会自动对reply consumer调用unregister
方法对其进行注销(实现位于EventBusImpl#deliverToHandler
方法中),而在注销逻辑中会关闭定时器(参见前面对doUnregister
方法的解析);如果超时,那么计时器就会触发,Vert.x会调用sendAsyncResultFailure
方法注销当前reply consumer并处理错误。
synchronized的性能问题
大家可能看到为了防止race condition,Vert.x底层大量使用了synchronized
关键字(重量级锁)。这会不会影响性能呢?其实,如果开发者遵循Vert.x的线程模型和开发规范(使用Verticle)的话,有些地方的synchronized
对应的锁会被优化为 偏向锁 或 轻量级锁(因为通常都是同一个Event Loop线程获取对应的锁),这样性能总体开销不会太大。当然如果使用Multi-threaded worker verticles就要格外关注性能问题了。。。
总结
我们来简略地总结一下Event Bus的工作原理。当我们调用consumer
绑定一个MessageConsumer
时,Vert.x会将它保存至Event Bus实例内部的Map中;当我们通过send
或publish
向对应的地址发送消息的时候,Vert.x会遍历Event Bus中存储consumer的Map,获取与地址相对应的consumer集合,然后根据相应的策略传递并处理消息(send
通过轮询策略获取任意一个consumer并将消息传递至consumer中,publish
则会将消息传递至所有注册到对应地址的consumer中)。同时,MessageConsumer
和MessageProducer
这两个接口的实现都具有flow control功能,因此它们也可以用在Pump
中。
Event Bus是Vert.x中最为重要的一部分之一,探索Event Bus的源码可以加深我们对Event Bus工作原理的理解。作为开发者,只会使用框架是不够的,能够理解内部的实现原理和精华,并对其进行改进才是更为重要的。本篇文章分析的是Local模式下的Event Bus,下篇文章我们将来探索一下生产环境中更常用的 Clustered Event Bus 的实现原理,敬请期待!