在异步编程中,Future/Promise模式是一种广泛使用的异步开发模式,其中 Future
对象代表一个尚未完成异步操作的结果。从JDK 1.5以来,JUC包一直提供着最基本的Future
,不过它太鸡肋了,除了get
、cancel
、isDone
和isCancelled
方法之外就没有其他的操作了,这样很不方便。好在JDK 1.8中引入了具有FRP风格的 CompletableFuture
,它类似于Scala中的 Future
。CompletableFuture
属于Monad, 因此支持一系列的函数式的组合、运算操作,非常方便,可以写出很FRP风格的代码而摆脱callback hell。
下面我们来结合FRP的思想,总结一下这些操作(有的时候为了方便表示,我会用Haskell或Scala的语法来表示类型,毕竟Java的类型系统太渣):
构造CompletableFuture对象
CompletableFuture类通过工厂模式创建CompletableFuture
对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
|
如果我们的异步操作不需要返回值,那么可以通过runAsync
方法提供一个Runnable创建一个CompletableFuture<Void>
对象。如果我们的异步操作需要返回值,那么可以通过supplyAsync
方法提供一个Supplier<U>
对象来创建:
1
| final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> longTask(param));
|
如果不提供Executor
的话,默认使用ForkJoinPool.commonPool()
作为线程池。
后缀为Async
的方法代表异步执行。
变换(fmap)
假如我们要通过CompletableFuture来异步获取一组数据,并对数据进行一些处理(变换),我们可以使用thenApply
和thenApplyAsync
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn) { return uniApplyStage(asyncPool, fn); } public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn, Executor executor) { return uniApplyStage(screenExecutor(executor), fn); }
|
它其实就是fmap
函数,用Haskell表示原型为:
1 2 3 4
| fmap :: Functor f => (a -> b) -> f a -> f b thenApply :: (a -> b) -> CompletableFuture a -> CompletableFuture b
|
它们不仅可以变换数据的值,也可以变换数据的类型,如:
1 2 3
| CompletableFuture<Double> f = CompletableFuture.supplyAsync(() -> "4") .thenApply(Integer::parseInt) .thenApply(r -> r * r * Math.PI);
|
fmap以后,数据流的类型进行了以下变换:String -> Integer -> Double
。
组合(bind)
有的时候,我们需要在异步操作完成的时候对异步操作的结果进行一些操作,并且操作仍然返回CompletableFuture
类型。我们可以利用thenCompose
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(null, fn); } public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(asyncPool, fn); } public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) { return uniComposeStage(screenExecutor(executor), fn); }
|
可以看出它其实对应了Monad里的bind
操作(Java和Scala中为flatMap
),用Haskell表示原型为:
1 2 3
| (>>=) :: Monad m => m a -> (a -> m b) -> m b thenCompose :: CompletableFuture a -> (a -> CompletableFuture b) -> CompletableFuture b
|
thenCompose
是一个非常重要的操作,它对于构建异步的pipeline非常有用。举个简单的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class TaskWorkI { public static Optional<List<Integer>> longTask(Integer i) { if (i > 0) { List<Integer> list = new ArrayList<>(); for(int pc = 0; pc < i; pc++) list.add(pc); return Optional.of(list); } else return Optional.empty(); } public static CompletableFuture<Long> getResultFuture(Optional<List<Integer>> op) { return CompletableFuture.supplyAsync(() -> { if (op.isPresent()) return op.get().stream() .map(Integer::toUnsignedLong) .reduce(0L, (x, y) -> x + y); else return -1L; }); } public static void main(String[] args) throws Exception { CompletableFuture<Long> f = CompletableFuture.supplyAsync(() -> longTask(1000000)) .thenComposeAsync(TaskWorkI::getResultFuture); Long result = f.get(); System.out.println(result); } }
|
超级变换(liftM2)
CompletableFuture
类里面还有个thenCombine
操作,它的原型看起来非常晕:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(null, other, fn); } public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(asyncPool, other, fn); } public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) { return biApplyStage(screenExecutor(executor), other, fn); }
|
Java类型系统过于坑爹,我们用Haskell表示其原型就一目了然了:
1
| thenCombine :: CompletableFuture a -> CompletableFuture b -> (a -> b -> c) -> CompletableFuture c
|
把参数调调位置,可以发现thenCombine
其实类似于Haskell中的liftM2
操作:
1 2 3
| liftM2 :: Monad m => (a1 -> a2 -> r) -> m a1 -> m a2 -> m r thenCombine :: CompletableFuture m => (a -> b -> c) -> m a -> m b -> m c
|
简单示例
下面我们用一个简单的例子来说明CompletableFuture
的使用。假设我们需要获取一篇文章(Article)的信息、对应分类(Category)信息以及对应的评论数,而且从数据库中query的操作是异步的(每个DB操作都返回一个CompletableFuture
),我们可以这样写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| class Article {} class Category {} class ArticleWithCategory { private Article article; private Category category; public ArticleWithCategory(Article article, Category category) { this.article = article; this.category = category; } } class AwcWithCount { private ArticleWithCategory awc; private int count; public AwcWithCount(ArticleWithCategory awc, int count) { this.awc = awc; this.count = count; } } public CompletableFuture<ArticleWithCategory> fetchAWC(int aid) { } public CompletableFuture<Integer> getCount(int aid) { } public CompletableFuture<AwcWithCount> fetchWithAWCC(int aid) { return fetchAWC(aid).thenCompose(x -> getCount(aid).thenApply(y -> new AwcWithCount(x, y) )); }
|
这其实和Scala中的Slick的各种组合特别相似:
1 2 3 4 5 6 7 8 9 10 11 12 13
| def fetchWithAWCC(aid: Int): Future[Option[(Article, Category, Int)]] = { db.run((for { a <- articles if _.aid === aid c <- categories if _.cid === a.cid } yield(a, c)).result.headOption) flatMap { case Some(a, c) => db.run(comments.filter(_.aid === aid).length) map { case res => Some(a, c, res) } case None => Future(None) } }
|
Rx中同样也提供了类似的组合操作,而且更为丰富。