Vert.x 技术内幕 | 解道 Vert.x 线程模型

线程模型概述

Vert.x 的线程模型设计的非常巧妙。总的来说,Vert.x 中主要有两种线程:Event Loop 线程Worker 线程。其中,Event Loop 线程结合了 Netty 的 EventLoop,用于处理事件。每一个 EventLoop 都与唯一的线程相绑定,这个线程就叫 Event Loop 线程。Event Loop 线程不能被阻塞,否则事件将无法被处理。

Worker 线程用于执行阻塞任务,这样既可以执行阻塞任务而又不阻塞 Event Loop 线程。

如果像 Node.js 一样只有单个 Event Loop 的话就不能充分利用多核 CPU 的性能了。为了充分利用多核 CPU 的性能,Vert.x 中提供了一组 Event Loop 线程。每个 Event Loop 线程都可以处理事件。为了保证线程安全,防止资源争用,Vert.x 保证了某一个 Handler 总是被同一个 Event Loop 线程执行,这样不仅可以保证线程安全,而且还可以在底层对锁进行优化提升性能。所以,只要开发者遵循 Vert.x 的线程模型,开发者就不需要再担心线程安全的问题,这是非常方便的。

本篇文章将底层的角度来解析 Vert.x 的线程模型。对应的 Vert.x 版本为 3.3.3

Event Loop 线程

首先回顾一下 Event Loop 线程,它会不断地轮询获取事件,并将获取到的事件分发到对应的事件处理器中进行处理:

Vert.x Event Loop

Vert.x 线程模型中最重要的一点就是:永远不要阻塞 Event Loop 线程。因为一旦处理事件的线程被阻塞了,事件就会一直积压着不能被处理,整个应用也就不能正常工作了。

Vert.x 中内置一种用于检测 Event Loop 是否阻塞的线程:vertx-blocked-thread-checker。一旦 Event Loop 处理某个事件的时间超过一定阈值(默认为 2000 ms)就会警告,如果阻塞的时间过长就会抛出异常。Block Checker 的实现原理比较简单,底层借助了 JUC 的 TimerTask,定时计算每个 Event Loop 线程的处理事件消耗的时间,如果超时就进行相应的警告。

Vert.x Thread

Vert.x 中的 Event Loop 线程及 Worker 线程都用 VertxThread 类表示,并通过 VertxThreadFactory 线程工厂来创建。VertxThreadFactory 创建 Vert.x 线程的过程非常简单:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public Thread newThread(Runnable runnable) {
VertxThread t = new VertxThread(runnable, prefix + threadCount.getAndIncrement(), worker, maxExecTime);
if (checker != null) {
checker.registerThread(t);
}
addToMap(t);
t.setDaemon(false);
return t;
}

除了创建 VertxThread 线程之外,VertxThreadFactory 还会将此线程注册至 Block Checker 线程中以监视线程的阻塞情况,并且将此线程添加至内部的 weakMap 中。这个 weakMap 作用只有一个,就是在注销对应的 Verticle 的时候可以将每个 VertxThread 中的 Context 实例清除(unset)。为了保证资源不被一直占用,这里使用了 WeakHashMap 来存储每一个 VertxThread。当里面的 VertxThread 的引用不被其他实例持有的时候,它就会被标记为可清除的对象,等待 GC。

至于 VertxThread,它其实就是在普通线程的基础上存储了额外的数据(如对应的 Vert.x Context,最大执行时长,当前执行时间,是否为 Worker 线程等),这里就不多讲了。

Vert.x Context

Vert.x 底层中一个重要的概念就是 Context,每个 Context 都会绑定着一个 Event Loop 线程(而一个 Event Loop 线程可以对应多个 Context)。我们可以把 Context 看作是控制一系列的 Handler 的执行作用域及顺序的上下文对象。

每当 Vert.x 底层将事件分发至 Handler 的时候,Vert.x 都会给此 Handler 绑定一个 Context 用于处理任务:

  • 如果当前线程是 Vert.x 线程(VertxThread),那么 Vert.x 就会复用此线程上绑定的 Context;如果没有对应的 Context 就创建新的
  • 如果当前线程是普通线程,就创建新的 Context

Vert.x 中存在三种 Context,与之前的线程种类相对应:

  • EventLoopContext
  • WorkerContext
  • MultiThreadedWorkerContext

Event Loop Context

每个 Event Loop Context 都会对应着唯一的一个 EventLoop,即一个 Event Loop Context 只会在同一个 Event Loop 线程上执行任务。在创建 Context 的时候,Vert.x 会自动根据轮询策略选择对应的 EventLoop:

1
2
3
4
5
6
7
8
9
10
11
protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,
ClassLoader tccl) {
// ...
EventLoopGroup group = vertx.getEventLoopGroup();
if (group != null) {
this.eventLoop = group.next();
} else {
this.eventLoop = null;
}
// ...
}

在 Netty 中,EventLoopGroup 代表一组 EventLoop,而从中获取 EventLoop 的方法则是 next 方法。EventLoopGroupEventLoop 的数量由 CPU 核数所确定。Vert.x 这里使用了 Netty NIO 对应的 NioEventLoop

1
2
eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);
eventLoopGroup.setIoRatio(NETTY_IO_RATIO);

对应的轮询算法:

1
2
3
4
5
6
7
8
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

可以看到,正常情况下 Netty 会用轮询策略选择 EventLoop。特别地,如果 EventLoop 的个数是 2 的倍数的话,选择的会快一些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
// ...
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
// ...
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}

我们可以在 Embedded 模式下测试一下 Event Loop 线程的分配:

1
2
3
4
5
6
7
System.out.println(Thread.currentThread());
Vertx vertx = Vertx.vertx();
for (int i = 0; i < 20; i++) {
int index = i;
vertx.setTimer(1, t -> {
System.out.println(index + ":" + Thread.currentThread());
});

运行结果(不同机器运行顺序、Event Loop线程数可能不同):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Thread[main,5,main]
0:Thread[vert.x-eventloop-thread-0,5,main]
1:Thread[vert.x-eventloop-thread-1,5,main]
2:Thread[vert.x-eventloop-thread-2,5,main]
3:Thread[vert.x-eventloop-thread-3,5,main]
5:Thread[vert.x-eventloop-thread-5,5,main]
6:Thread[vert.x-eventloop-thread-6,5,main]
8:Thread[vert.x-eventloop-thread-8,5,main]
7:Thread[vert.x-eventloop-thread-7,5,main]
10:Thread[vert.x-eventloop-thread-10,5,main]
9:Thread[vert.x-eventloop-thread-9,5,main]
4:Thread[vert.x-eventloop-thread-4,5,main]
11:Thread[vert.x-eventloop-thread-11,5,main]
12:Thread[vert.x-eventloop-thread-12,5,main]
13:Thread[vert.x-eventloop-thread-13,5,main]
14:Thread[vert.x-eventloop-thread-14,5,main]
16:Thread[vert.x-eventloop-thread-0,5,main]
17:Thread[vert.x-eventloop-thread-1,5,main]
15:Thread[vert.x-eventloop-thread-15,5,main]
18:Thread[vert.x-eventloop-thread-2,5,main]
19:Thread[vert.x-eventloop-thread-3,5,main]

可以看到尽管每个 Context 对应唯一的 Event Loop 线程,而每个 Event Loop 线程却可能对应多个 Context

Event Loop Context 会在对应的 EventLoop 中执行 Handler 进行事件的处理(IO 事件,非阻塞)。Vert.x 会保证同一个 Handler 会一直在同一个 Event Loop 线程中执行,这样可以简化线程模型,让开发者在写 Handler 的时候不需要考虑并发的问题,非常方便。

我们来看一下 Handler 是如何在 EventLoop 上执行的。EventLoopContext 中实现了 executeAsync 方法用于包装 Handler 中事件处理的逻辑并将其提交至对应的 EventLoop 中进行执行:

1
2
3
4
public void executeAsync(Handler<Void> task) {
// No metrics, we are on the event loop.
nettyEventLoop().execute(wrapTask(null, task, true, null));
}

这里 Vert.x 使用了 wrapTask 方法将 Handler 封装成了一个 Runnable 用于向 EventLoop 中提交。代码比较直观,大致就是检查当前线程是否为 Vert.x 线程,然后记录事件处理开始的时间,给当前的 Vert.x 线程设置 Context,并且调用 Handler 里面的事件处理方法。具体请参考源码,这里就不贴出来了。

那么把封装好的 task 提交到 EventLoop 以后,EventLoop 是怎么处理的呢?这就需要更多的 Netty 相关的知识了。根据Netty 的模型,Event Loop 线程需要处理 IO 事件,普通事件(即我们的 Handler)以及定时事件(比如 Vert.x 的 setTimer)。Vert.x 会提供一个 NETTY_IO_RATIO 给Netty代表 EventLoop 处理 IO 事件时间占用的百分比(默认为 50,即 IO事件时间占用:非IO事件时间占用 = 1:1)。当 EventLoop 启动的时候,它会不断轮询 IO 事件及其它事件并进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
// process the error
// ...
}
}
}

这里面 Netty 会调用 processSelectedKeys 方法进行 IO 事件的处理,并且会计算出处理 IO 时间所用的事件然后计算出给非 IO 事件处理分配的时间,然后调用 runAllTasks 方法执行所有的非 IO 任务(这里面就有我们的各个 Handler)。

runAllTasks 会按顺序从内部的任务队列中取出任务(Runnable)然后进行安全执行。而我们刚才调用的 NioEventLoopexecute 方法其实就是将包装好的 Handler 置入 NioEventLoop 内部的任务队列中等待执行。

Worker Context

顾名思义,Worker Context 用于跑阻塞任务。与 Event Loop Context 相似,每一个 Handler 都只会跑在固定的 Worker 线程下。

Vert.x 还提供一种 Multi-threaded worker context 可以在多个 Worker 线程下并发执行任务,这样就会出现并发问题,需要开发者自行解决并发问题。因此一般情况下我们用不到 Multi-threaded worker context。

Verticle

我们再来讨论一下 Verticle 中的 Context。在部署 Verticle 的时候,Vert.x 会根据配置来创建 Context 并绑定到 Verticle 上,此后此 Verticle 上所有绑定的 Handler 都会在此 Context 上执行。相关实现位于 doDeploy 方法,这里摘取核心部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
for (Verticle verticle: verticles) {
WorkerExecutorImpl workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize()) : null;
WorkerPool pool = workerExec != null ? workerExec.getPool() : null;
// 根据配置创建Context
ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
vertx.createEventLoopContext(deploymentID, pool, conf, tccl);
if (workerExec != null) {
context.addCloseHook(workerExec);
}
context.setDeployment(deployment);
deployment.addVerticle(new VerticleHolder(verticle, context));
// 此Verticle上的Handler都会在创建的context作用域内执行
context.runOnContext(v -> {
try {
verticle.init(vertx, context);
Future<Void> startFuture = Future.future();
// 大家熟悉的start方法的执行点
verticle.start(startFuture);
startFuture.setHandler(ar -> {
if (ar.succeeded()) {
if (parent != null) {
parent.addChild(deployment);
deployment.child = true;
}
vertx.metricsSPI().verticleDeployed(verticle);
deployments.put(deploymentID, deployment);
if (deployCount.incrementAndGet() == verticles.length) {
reportSuccess(deploymentID, callingContext, completionHandler);
}
} else if (!failureReported.get()) {
reportFailure(ar.cause(), callingContext, completionHandler);
}
});
} catch (Throwable t) {
reportFailure(t, callingContext, completionHandler);
}
});
}

通过这样一种方式,Vert.x保证了Verticle的线程安全 —— 即某个Verticle上的所有Handler都会在同一个Vert.x线程上执行,这样也保证了Verticle内部成员的安全(没有race condition问题)。比如下面Verticle中处理IO及事件的处理都一直是在同一个Vert.x线程下执行的,每次打印出的线程名称应该是一样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TcpClientVerticle extends AbstractVerticle {
int i = 0;
@Override
public void start() throws Exception {
vertx.createNetClient().connect(6666, "localhost", ar -> {
if (ar.succeeded()) {
NetSocket socket = ar.result();
System.out.println(Thread.currentThread().getName());
socket.handler(buffer -> {
i++;
System.out.println(Thread.currentThread().getName());
System.out.println("Net client receiving: " + buffer.toString("UTF-8"));
});
socket.write("+1s\n");
} else {
ar.cause().printStackTrace();
}
});
}
}

线程池

Event Loop 线程池

之前我们已经提到过,Event Loop 线程池的类型为 Netty 中的NioEventLoopGroup,里面的线程通过 Vert.x 自己的线程工厂VertxThreadFactory进行创建:

1
2
3
eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime());
eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);
eventLoopGroup.setIoRatio(NETTY_IO_RATIO);

其中 Event Loop 线程的数目可以在配置中指定。

Worker 线程池

在之前讲 executeBlocking 底层实现的文章中我们已经提到过 Worker 线程池,它其实就是一种 Fixed Thread Pool:

1
2
3
4
5
ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(workerExec, "worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null;
workerPool = new WorkerPool(workerExec, workerPoolMetrics);

Worker线程同样由VertxThreadFactory构造,类型为VertxThread,用于执行阻塞任务。我们同样可以在配置中指定其数目。

内部阻塞线程池

1
2
3
4
ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics internalBlockingPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(internalBlockingExec, "worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null;
internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics);

Internal Blocking Pool可能设计用于内部使用,在executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler)这个版本的方法中就使用了它。

Acceptor Event Loop 线程池

大家可能会发现VertxImpl类中还有一个acceptorEventLoopGroup。顾名思义,它是Netty中的Acceptor线程池,负责处理客户端的连接请求:

1
2
acceptorEventLoopGroup = new NioEventLoopGroup(1, acceptorEventLoopThreadFactory);
acceptorEventLoopGroup.setIoRatio(100);

由于系统只有一个服务端端口需要监听,因此这里只需要一个线程。

Vert.x中的HttpServer就利用了acceptorEventLoopGroup处理客户端的连接请求,具体的实现后边会另起一篇介绍。

文章目录
  1. 1. 线程模型概述
  2. 2. Event Loop 线程
  3. 3. Vert.x Thread
  4. 4. Vert.x Context
    1. 4.1. Event Loop Context
    2. 4.2. Worker Context
  5. 5. Verticle
  6. 6. 线程池
    1. 6.1. Event Loop 线程池
    2. 6.2. Worker 线程池
    3. 6.3. 内部阻塞线程池
    4. 6.4. Acceptor Event Loop 线程池