线程模型概述
Vert.x 的线程模型设计的非常巧妙。总的来说,Vert.x 中主要有两种线程:Event Loop 线程 和 Worker 线程。其中,Event Loop 线程结合了 Netty 的 EventLoop
,用于处理事件。每一个 EventLoop
都与唯一的线程相绑定,这个线程就叫 Event Loop 线程。Event Loop 线程不能被阻塞,否则事件将无法被处理。
Worker 线程用于执行阻塞任务,这样既可以执行阻塞任务而又不阻塞 Event Loop 线程。
如果像 Node.js 一样只有单个 Event Loop 的话就不能充分利用多核 CPU 的性能了。为了充分利用多核 CPU 的性能,Vert.x 中提供了一组 Event Loop 线程。每个 Event Loop 线程都可以处理事件。为了保证线程安全,防止资源争用,Vert.x 保证了某一个 Handler
总是被同一个 Event Loop 线程执行,这样不仅可以保证线程安全,而且还可以在底层对锁进行优化提升性能。所以,只要开发者遵循 Vert.x 的线程模型,开发者就不需要再担心线程安全的问题,这是非常方便的。
本篇文章将底层的角度来解析 Vert.x 的线程模型。对应的 Vert.x 版本为 3.3.3。
Event Loop 线程
首先回顾一下 Event Loop 线程,它会不断地轮询获取事件,并将获取到的事件分发到对应的事件处理器中进行处理:
Vert.x 线程模型中最重要的一点就是:永远不要阻塞 Event Loop 线程。因为一旦处理事件的线程被阻塞了,事件就会一直积压着不能被处理,整个应用也就不能正常工作了。
Vert.x 中内置一种用于检测 Event Loop 是否阻塞的线程:vertx-blocked-thread-checker
。一旦 Event Loop 处理某个事件的时间超过一定阈值(默认为 2000 ms)就会警告,如果阻塞的时间过长就会抛出异常。Block Checker 的实现原理比较简单,底层借助了 JUC 的 TimerTask
,定时计算每个 Event Loop 线程的处理事件消耗的时间,如果超时就进行相应的警告。
Vert.x Thread
Vert.x 中的 Event Loop 线程及 Worker 线程都用 VertxThread
类表示,并通过 VertxThreadFactory
线程工厂来创建。VertxThreadFactory
创建 Vert.x 线程的过程非常简单:
1 2 3 4 5 6 7 8 9 10 11 12
| @Override public Thread newThread(Runnable runnable) { VertxThread t = new VertxThread(runnable, prefix + threadCount.getAndIncrement(), worker, maxExecTime); if (checker != null) { checker.registerThread(t); } addToMap(t); t.setDaemon(false); return t; }
|
除了创建 VertxThread
线程之外,VertxThreadFactory
还会将此线程注册至 Block Checker 线程中以监视线程的阻塞情况,并且将此线程添加至内部的 weakMap
中。这个 weakMap
作用只有一个,就是在注销对应的 Verticle 的时候可以将每个 VertxThread
中的 Context
实例清除(unset)。为了保证资源不被一直占用,这里使用了 WeakHashMap
来存储每一个 VertxThread
。当里面的 VertxThread
的引用不被其他实例持有的时候,它就会被标记为可清除的对象,等待 GC。
至于 VertxThread
,它其实就是在普通线程的基础上存储了额外的数据(如对应的 Vert.x Context,最大执行时长,当前执行时间,是否为 Worker 线程等),这里就不多讲了。
Vert.x Context
Vert.x 底层中一个重要的概念就是 Context
,每个 Context
都会绑定着一个 Event Loop 线程(而一个 Event Loop 线程可以对应多个 Context
)。我们可以把 Context
看作是控制一系列的 Handler
的执行作用域及顺序的上下文对象。
每当 Vert.x 底层将事件分发至 Handler
的时候,Vert.x 都会给此 Handler
绑定一个 Context
用于处理任务:
- 如果当前线程是 Vert.x 线程(
VertxThread
),那么 Vert.x 就会复用此线程上绑定的 Context
;如果没有对应的 Context
就创建新的
- 如果当前线程是普通线程,就创建新的
Context
Vert.x 中存在三种 Context
,与之前的线程种类相对应:
EventLoopContext
WorkerContext
MultiThreadedWorkerContext
Event Loop Context
每个 Event Loop Context 都会对应着唯一的一个 EventLoop
,即一个 Event Loop Context 只会在同一个 Event Loop 线程上执行任务。在创建 Context
的时候,Vert.x 会自动根据轮询策略选择对应的 EventLoop
:
1 2 3 4 5 6 7 8 9 10 11
| protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config, ClassLoader tccl) { EventLoopGroup group = vertx.getEventLoopGroup(); if (group != null) { this.eventLoop = group.next(); } else { this.eventLoop = null; } }
|
在 Netty 中,EventLoopGroup
代表一组 EventLoop
,而从中获取 EventLoop
的方法则是 next
方法。EventLoopGroup
中 EventLoop
的数量由 CPU 核数所确定。Vert.x 这里使用了 Netty NIO 对应的 NioEventLoop
:
1 2
| eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory); eventLoopGroup.setIoRatio(NETTY_IO_RATIO);
|
对应的轮询算法:
1 2 3 4 5 6 7 8
| @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTowEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } }
|
可以看到,正常情况下 Netty 会用轮询策略选择 EventLoop
。特别地,如果 EventLoop
的个数是 2 的倍数的话,选择的会快一些:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private static final class GenericEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } } private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } }
|
我们可以在 Embedded 模式下测试一下 Event Loop 线程的分配:
1 2 3 4 5 6 7
| System.out.println(Thread.currentThread()); Vertx vertx = Vertx.vertx(); for (int i = 0; i < 20; i++) { int index = i; vertx.setTimer(1, t -> { System.out.println(index + ":" + Thread.currentThread()); });
|
运行结果(不同机器运行顺序、Event Loop线程数可能不同):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| Thread[main,5,main] 0:Thread[vert.x-eventloop-thread-0,5,main] 1:Thread[vert.x-eventloop-thread-1,5,main] 2:Thread[vert.x-eventloop-thread-2,5,main] 3:Thread[vert.x-eventloop-thread-3,5,main] 5:Thread[vert.x-eventloop-thread-5,5,main] 6:Thread[vert.x-eventloop-thread-6,5,main] 8:Thread[vert.x-eventloop-thread-8,5,main] 7:Thread[vert.x-eventloop-thread-7,5,main] 10:Thread[vert.x-eventloop-thread-10,5,main] 9:Thread[vert.x-eventloop-thread-9,5,main] 4:Thread[vert.x-eventloop-thread-4,5,main] 11:Thread[vert.x-eventloop-thread-11,5,main] 12:Thread[vert.x-eventloop-thread-12,5,main] 13:Thread[vert.x-eventloop-thread-13,5,main] 14:Thread[vert.x-eventloop-thread-14,5,main] 16:Thread[vert.x-eventloop-thread-0,5,main] 17:Thread[vert.x-eventloop-thread-1,5,main] 15:Thread[vert.x-eventloop-thread-15,5,main] 18:Thread[vert.x-eventloop-thread-2,5,main] 19:Thread[vert.x-eventloop-thread-3,5,main]
|
可以看到尽管每个 Context
对应唯一的 Event Loop 线程,而每个 Event Loop 线程却可能对应多个 Context
。
Event Loop Context 会在对应的 EventLoop
中执行 Handler
进行事件的处理(IO 事件,非阻塞)。Vert.x 会保证同一个 Handler
会一直在同一个 Event Loop 线程中执行,这样可以简化线程模型,让开发者在写 Handler
的时候不需要考虑并发的问题,非常方便。
我们来看一下 Handler
是如何在 EventLoop
上执行的。EventLoopContext
中实现了 executeAsync
方法用于包装 Handler
中事件处理的逻辑并将其提交至对应的 EventLoop
中进行执行:
1 2 3 4
| public void executeAsync(Handler<Void> task) { nettyEventLoop().execute(wrapTask(null, task, true, null)); }
|
这里 Vert.x 使用了 wrapTask
方法将 Handler
封装成了一个 Runnable
用于向 EventLoop
中提交。代码比较直观,大致就是检查当前线程是否为 Vert.x 线程,然后记录事件处理开始的时间,给当前的 Vert.x 线程设置 Context
,并且调用 Handler
里面的事件处理方法。具体请参考源码,这里就不贴出来了。
那么把封装好的 task 提交到 EventLoop
以后,EventLoop
是怎么处理的呢?这就需要更多的 Netty 相关的知识了。根据Netty 的模型,Event Loop 线程需要处理 IO 事件,普通事件(即我们的 Handler
)以及定时事件(比如 Vert.x 的 setTimer
)。Vert.x 会提供一个 NETTY_IO_RATIO
给Netty代表 EventLoop
处理 IO 事件时间占用的百分比(默认为 50,即 IO事件时间占用:非IO事件时间占用 = 1:1)。当 EventLoop
启动的时候,它会不断轮询 IO 事件及其它事件并进行处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| @Override protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { } } }
|
这里面 Netty 会调用 processSelectedKeys
方法进行 IO 事件的处理,并且会计算出处理 IO 时间所用的事件然后计算出给非 IO 事件处理分配的时间,然后调用 runAllTasks
方法执行所有的非 IO 任务(这里面就有我们的各个 Handler
)。
runAllTasks
会按顺序从内部的任务队列中取出任务(Runnable
)然后进行安全执行。而我们刚才调用的 NioEventLoop
的 execute
方法其实就是将包装好的 Handler
置入 NioEventLoop
内部的任务队列中等待执行。
Worker Context
顾名思义,Worker Context 用于跑阻塞任务。与 Event Loop Context 相似,每一个 Handler
都只会跑在固定的 Worker 线程下。
Vert.x 还提供一种 Multi-threaded worker context 可以在多个 Worker 线程下并发执行任务,这样就会出现并发问题,需要开发者自行解决并发问题。因此一般情况下我们用不到 Multi-threaded worker context。
Verticle
我们再来讨论一下 Verticle
中的 Context
。在部署 Verticle
的时候,Vert.x 会根据配置来创建 Context
并绑定到 Verticle 上,此后此 Verticle 上所有绑定的 Handler
都会在此 Context
上执行。相关实现位于 doDeploy
方法,这里摘取核心部分:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| for (Verticle verticle: verticles) { WorkerExecutorImpl workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize()) : null; WorkerPool pool = workerExec != null ? workerExec.getPool() : null; ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) : vertx.createEventLoopContext(deploymentID, pool, conf, tccl); if (workerExec != null) { context.addCloseHook(workerExec); } context.setDeployment(deployment); deployment.addVerticle(new VerticleHolder(verticle, context)); context.runOnContext(v -> { try { verticle.init(vertx, context); Future<Void> startFuture = Future.future(); verticle.start(startFuture); startFuture.setHandler(ar -> { if (ar.succeeded()) { if (parent != null) { parent.addChild(deployment); deployment.child = true; } vertx.metricsSPI().verticleDeployed(verticle); deployments.put(deploymentID, deployment); if (deployCount.incrementAndGet() == verticles.length) { reportSuccess(deploymentID, callingContext, completionHandler); } } else if (!failureReported.get()) { reportFailure(ar.cause(), callingContext, completionHandler); } }); } catch (Throwable t) { reportFailure(t, callingContext, completionHandler); } }); }
|
通过这样一种方式,Vert.x保证了Verticle
的线程安全 —— 即某个Verticle
上的所有Handler
都会在同一个Vert.x线程上执行,这样也保证了Verticle
内部成员的安全(没有race condition问题)。比如下面Verticle中处理IO及事件的处理都一直是在同一个Vert.x线程下执行的,每次打印出的线程名称应该是一样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class TcpClientVerticle extends AbstractVerticle { int i = 0; @Override public void start() throws Exception { vertx.createNetClient().connect(6666, "localhost", ar -> { if (ar.succeeded()) { NetSocket socket = ar.result(); System.out.println(Thread.currentThread().getName()); socket.handler(buffer -> { i++; System.out.println(Thread.currentThread().getName()); System.out.println("Net client receiving: " + buffer.toString("UTF-8")); }); socket.write("+1s\n"); } else { ar.cause().printStackTrace(); } }); } }
|
线程池
Event Loop 线程池
之前我们已经提到过,Event Loop 线程池的类型为 Netty 中的NioEventLoopGroup
,里面的线程通过 Vert.x 自己的线程工厂VertxThreadFactory
进行创建:
1 2 3
| eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime()); eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory); eventLoopGroup.setIoRatio(NETTY_IO_RATIO);
|
其中 Event Loop 线程的数目可以在配置中指定。
Worker 线程池
在之前讲 executeBlocking
底层实现的文章中我们已经提到过 Worker 线程池,它其实就是一种 Fixed Thread Pool:
1 2 3 4 5
| ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(), new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime())); PoolMetrics workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(workerExec, "worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null; workerPool = new WorkerPool(workerExec, workerPoolMetrics);
|
Worker线程同样由VertxThreadFactory
构造,类型为VertxThread
,用于执行阻塞任务。我们同样可以在配置中指定其数目。
内部阻塞线程池
1 2 3 4
| ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(), new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime())); PoolMetrics internalBlockingPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(internalBlockingExec, "worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null; internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics);
|
Internal Blocking Pool可能设计用于内部使用,在executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler)
这个版本的方法中就使用了它。
Acceptor Event Loop 线程池
大家可能会发现VertxImpl
类中还有一个acceptorEventLoopGroup
。顾名思义,它是Netty中的Acceptor线程池,负责处理客户端的连接请求:
1 2
| acceptorEventLoopGroup = new NioEventLoopGroup(1, acceptorEventLoopThreadFactory); acceptorEventLoopGroup.setIoRatio(100);
|
由于系统只有一个服务端端口需要监听,因此这里只需要一个线程。
Vert.x中的HttpServer
就利用了acceptorEventLoopGroup
处理客户端的连接请求,具体的实现后边会另起一篇介绍。