引入
大家都知道,Vert.x中的executeBlocking
方法用于执行阻塞任务,并且有两种模式:有序执行和无序执行。下面我们来看两段代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| vertx.setPeriodic(1000, t -> { vertx.executeBlocking(future -> { try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); future.complete(); }, r -> {}); }); vertx.setPeriodic(1000, t -> { vertx.executeBlocking(future -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); future.complete(); }, r -> {}); });
|
我们思考一下,每段代码每次执行的时候使用的线程相同么?正常情况下大家都知道executeBlocking
底层使用了Worker线程池,因此貌似两种情况没什么区别,都是轮询Worker线程池,每次可能用不同的Worker线程。但是我们测一下:
第一段代码:
1 2 3 4 5
| vert.x-worker-thread-0 vert.x-worker-thread-1 vert.x-worker-thread-2 vert.x-worker-thread-3 vert.x-worker-thread-4
|
第二段代码:
1 2 3 4 5
| vert.x-worker-thread-0 vert.x-worker-thread-0 vert.x-worker-thread-0 vert.x-worker-thread-0 vert.x-worker-thread-0
|
额。。。两段代码每次执行的线程居然有差异?第二次为什么每次都用相同的Worker线程?其实,大家可能忽略了一点:executeBlocking
方法默认顺序执行提交的阻塞任务。今天我们就来探究一下executeBlocking
内部的实现。
Worker线程池
我们来回顾一下Vert.x底层的Worker线程池,它在创建VertxImpl
实例的时候进行初始化:
1 2 3 4 5
| ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(), new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime())); PoolMetrics workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(workerExec, "worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null; workerPool = new WorkerPool(workerExec, workerPoolMetrics);
|
可以看到底层的Worker线程池本质上是一种FixedThreadPool
,里面的线程由VertxThreadFactory
控制生成,对应的线程类型为VertxThread
。Vert.x内部用WorkerPool
类对线程池以及线程池相关的Metrics类进行了封装。
阻塞任务在哪里执行?
有了Worker线程池的基础,我们来看一下Vertx
实例中的executeBlocking
方法,它的过程很简单:获取当前的Vert.x Context(没有就创建),然后委托调用Context
里的executeBlocking
方法:
1 2 3 4 5 6 7 8 9 10 11 12
| @Override public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> asyncResultHandler) { ContextImpl context = getOrCreateContext(); context.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler); } @Override public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> asyncResultHandler) { executeBlocking(blockingCodeHandler, true, asyncResultHandler); }
|
在此方法中可以看到,ordered
标志位默认为true
,即默认按提交的次序执行阻塞任务。
我们再来看一下ContextImpl
类中的executeBlocking
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| @Override public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) { executeBlocking(null, blockingCodeHandler, resultHandler, ordered ? workerExec : workerPool.executor(), workerPool.metrics()); } <T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler, Executor exec, PoolMetrics metrics) { Object queueMetric = metrics != null ? metrics.submitted() : null; try { exec.execute(() -> { Object execMetric = null; if (metrics != null) { execMetric = metrics.begin(queueMetric); } Future<T> res = Future.future(); try { if (blockingCodeHandler != null) { ContextImpl.setContext(this); blockingCodeHandler.handle(res); } else { T result = action.perform(); res.complete(result); } } catch (Throwable e) { res.fail(e); } if (metrics != null) { metrics.end(execMetric, res.succeeded()); } if (resultHandler != null) { runOnContext(v -> res.setHandler(resultHandler)); } }); } catch (RejectedExecutionException ignore) { if (metrics != null) { metrics.rejected(queueMetric); } } }
|
它调用了另一个具体版本的executeBlocking
方法,其中第四个参数即为要执行阻塞任务的线程池。如果要有序执行(ordered
为true),底层就使用context
实例里的workerExec
线程池;如果无序执行,就调用workerPool
的executor
方法获取另一种线程池。
看到这里,我们大致已经想到了,有序执行和无序执行两种模式使用不同的线程池,因此底层实现肯定有差异。我们来看一下前面提到的两个线程池,它们都是ContextImpl
类的成员变量:
1 2
| protected final WorkerPool workerPool; protected final Executor workerExec;
|
在通过Vertx
实例创建Context
的时候,这几个变量会被初始化,其来源就是之前我们看过的VertxImpl
实例中的Worker线程池。看一下ContextImpl
类的构造函数就一目了然了:
1 2
| this.workerPool = workerPool; this.workerExec = workerPool.createOrderedExecutor();
|
嗯。。。有序执行对应的线程池通过workerPool
的createOrderedExecutor
方法获得,而无序执行对应的线程池通过workerPool
的executor
方法获得。因此,WorkerPool
类是一个关键点,我们稍后就看一下其实现。
注意Vert.x规定,blockingCodeHandler
中的逻辑(即阻塞任务)在Worker线程内执行,而resultHandler
内的逻辑(结果处理)需要在Vert.x Conext中执行,因此前面需要预先设置当前使用的Worker线程的Context
为this
以便后面调用runOnContext
方法执行结果处理逻辑。
下面就来看一下有序执行和无序执行这两种线程池的具体区别。
无序执行
我们看一下WorkerPool
类的源码中获取无序执行线程池的逻辑:
1 2 3
| ExecutorService executor() { return pool; }
|
可以看到executor
方法直接返回了内部的pool
线程池,而pool
线程池其实就是VertxImpl
中的workerExec
线程池:
1
| workerPool = new WorkerPool(workerExec, workerPoolMetrics);
|
OK!如果大家熟悉并发的话,大家应该对无序执行对应的线程池 —— Worker线程池的行为非常清楚了。它属于一种FixedThreadPool
,底层通过阻塞队列LinkedBlockingQueue
实现。底层通过轮询算法获取Worker线程执行任务。
有序执行
下面是时候看有序执行对应的逻辑了:
1 2 3 4 5 6 7 8 9 10 11 12 13
| private final OrderedExecutorFactory orderedFact; private final ExecutorService pool; private final PoolMetrics metrics; public WorkerPool(ExecutorService pool, PoolMetrics metrics) { this.orderedFact = new OrderedExecutorFactory(pool); this.pool = pool; this.metrics = metrics; } Executor createOrderedExecutor() { return orderedFact.getExecutor(); }
|
可以看到有序执行对应的线程池是通过OrderedExecutorFactory
创建的。其实,OrderedExecutorFactory
类会生成真正的有序执行线程池OrderedExecutor
,它其实是对Worker线程池pool
的一个简单包装,仅仅添加了有序执行相关的逻辑,最后还是委托Worker线程池进行任务处理。
那么OrderedExecutor
是如何实现顺序执行的呢?OrderedExecutor
内部维护着一个任务队列。每当调用executeBlocking
方法执行阻塞过程的时候,Vert.x会将阻塞过程包装成Runnable
然后置入OrderedExecutor
中的任务队列中;同时如果OrderedExecutor
没有开始执行任务,就委托内部的Worker线程池执行任务:
1 2 3 4 5 6 7 8 9
| public void execute(Runnable command) { synchronized (tasks) { tasks.add(command); if (!running) { running = true; parent.execute(runner); } } }
|
从代码中可以看出,最后委托Worker线程池执行的线程其实是又包装了一层的runner
线程。runner
的逻辑不难想:不断地从任务队列中取出队首的Runnable
然后调用其run
方法执行(相当于执行了此任务,只不过在runner对应的线程中);如果没有任务了就结束本线程。
这里就出现了一种情况:大批量提交阻塞任务的时候,线程池的状态running
一直为true
,此时所有的任务都积压到任务队列中,而执行所有任务的线程只有一个 —— runner
对应的线程。这种情况其实很好想,因为要保证有序执行,就只能让它一个接一个地在同个线程中执行。如果在不同线程中依次执行则不好调度,如果直接并行执行则不能保证有序性。
所以,根据OrderedExecutor
线程池的内部实现,只要提交任务的间隔时间小于任务执行的时间,底层其实就仅执行了一次runner
,也就是说所有提交的阻塞任务都只在一个线程下跑(running标志位控制)。
这样就可以很好地解释我们一开始提出的问题了。当sleep(200), setPeriodic(1000)
的时候,提交任务的间隔时间大于任务执行的时间,这样每次的runner
就可以在下一个任务提交之前执行完,因此每次所用的线程会不同(轮询策略);而sleep(2000), setPeriodic(1000)
的时候,提交任务的间隔时间小于任务执行的时间,底层最后都归结到一个runner
中执行了,因此所有过程都是在同一个Worker线程执行的(很好想,保证有序就要串行执行)。
当然,如果不想有序执行,可以用void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> asyncResultHandler)
这个版本的executeBlocking
方法,并将ordered
标志位设为false
。根据上面的源码,底层会直接使用Worker线程池而不是OrderedExecutor
线程池,这样就不会有上面OrderedExecutor
的情况了。