【回顾Future接口
Future接口时java5引入的,设计初衷是对将来某个时刻会发生的结果建模。它建模了一种异步计算,返回了一个执行预算结果的引用。比如,你去干洗店洗衣服,店员会告诉你什么时候可以来取衣服,而不是让你一直在干洗店等待。要使用Future只需要将耗时操作封装在一个Callable对象中,再将其提交给ExecutorService就可以了。
ExecutorService executor = Executors.newFixedThreadPool(10); Future<Double> future = executor.submit(new Callable<Double>() { @Override public Double call() throws Exception { return doSomeLongComputation(); } }); doSomethingElse(); try { //最多等待1秒 Double result = future.get(1,TimeUnit.SECONDS); } catch (InterruptedException e) { //当前线程等待过程中被打断 e.printStackTrace(); } catch (ExecutionException e) { //计算时出现异常 e.printStackTrace(); } catch (TimeoutException e) { //完成计算前就超时 e.printStackTrace(); }
但是Future依然有一些局限性:
- 无法将两个异步计算的结果合并为一个。
- 等待Future集合中所有任务完成。
- 等待Future集合中最快任务完成(选择最优的执行方案)。
- 通过编程的方式完成一个Future任务的执行(手工设定异步结果处理)。
- 应对Future的完成事件,当Future的完成事件发生时会收到通知,并可以使用Future的结果进行下一步操作,不只是简单的阻塞等待。
而CompletableFuture类实现了Future接口,可以将上述的问题全部解决。CompletableFuture与Stream的设计都遵循了类似的设计模式:使用Lambda表达式以及流水线的思想,从这个角度可以说CompletableFuture与Future的关系类似于Stream与Collection的关系。
【构建一个异步应用
最佳价格查询器:查询多个线上商店对同一商品的价格。
首先构建商店对象:
package BestPriceFinder;import java.util.Random;import java.util.concurrent.CompletableFuture;import java.util.concurrent.Future;public class Shop { private String name; public Shop(String name){ this.name = name; } public String getName(){ return name; } /** * 异步api:使用创建CompletableFuture类提供的工厂方法与getPriceAsync()效果完全一致 * 可以更轻易的完成这个流程,并且不用担心实现细节 * @param product * @return */ public Future<Double> getPriceAsyncByFactory(String product){ return CompletableFuture.supplyAsync(() -> calculatePrice(product)); } /** * 异步api: * @param product * @return */ public Future<Double> getPriceAsync(String product){ //创建CompletableFuture对象,它将包含计算结果 CompletableFuture<Double> futurePrice = new CompletableFuture<>(); //在新线程中异步计算结果 new Thread(() -> { try { double price = calculatePrice(product); //需要长时间计算的任务结束时,设置future的返回值 futurePrice.complete(price); }catch (Exception e){ //如这里没有使用completeExceptionally,线程不会结束,调用方会永远的执行下去 futurePrice.completeExceptionally(e); } }).start(); //无需等待计算结果,直接返回future对象 return futurePrice; } /** * 同步api: * 每个商店都需要提供的查询api:根据名称返回价格; * 模拟查询数据库等一些耗时操作:使用delay()模拟这些耗时操作。 * @param product * @return */ public double getPrice(String product){ return calculatePrice(product); } private double calculatePrice(String product){ delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } private Random random = new Random(); /** * 模拟耗时操作:延迟一秒 */ private static void delay(){ try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } }}
下面我们针对Shop.java提供的同步方法与异步方法来进行测试:
package BestPriceFinder;import java.util.Arrays;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import java.util.concurrent.ThreadFactory;import java.util.stream.Collectors;/** * 最佳价格查询器 */public class BestFinder { List<Shop> shops = Arrays.asList( new Shop("A"), new Shop("B"), new Shop("C"), new Shop("D"), new Shop("E"), new Shop("F"), new Shop("G"), new Shop("H"), new Shop("I"), new Shop("J") ); /** * 顺序查询 */ public List<String> findPrices(String product){ return shops.stream() .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))) .collect(Collectors.toList()); } /** * 并行流查询 */ public List<String> findPricesParallel(String product){ return shops.parallelStream() .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))) .collect(Collectors.toList()); } /** * 异步查询 * 相比并行流的话CompletableFuture更有优势:可以对执行器配置,设置线程池大小 */ @SuppressWarnings("all") private final Executor myExecutor = Executors.newFixedThreadPool(Math.min(shops.size(), 800), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); //使用守护线程保证不会阻止程序的关停 t.setDaemon(true); return t; } }); @SuppressWarnings("all") public List<String> findPricesAsync(String product){ List<CompletableFuture<String>> priceFuctures = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)),myExecutor)) .collect(Collectors.toList()); /** 这里需要使用新的stream来等待所有的子线程执行完, * 因为:如果在一个stream中使用两个map: * List<CompletableFuture<String>> priceFuctures = shops.parallelStream() * .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)))) * .map(c -> c.join()).collect(Collectors.toList()) * .collect(Collectors.toList()); * 考虑到流操作之间的延迟特性。如果你在单一的流水线中处理流,发向不同商家的请求只能以同步顺序的方式执行才会成功。因此每个创建CompletableFuture * 对象只能在前一个操作结束之后执行查询商家动作。 */ return priceFuctures.stream().map(c -> c.join()).collect(Collectors.toList()); }}
@Test public void findPrices(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPrices("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); } @Test public void findPricesParallel(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPrices("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); } @Test public void findPricesAsync(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPricesAsync("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); }
同步api测试结果:毫无疑问是10秒之上
![](http://pubimage.360doc.com/wz/default.gif)
并行流获取同步api测试结果:也是10秒之上,但是并行流不是很高效吗?怎么会如此凄惨呢?因为这与并行流可以调用的系统核数相关,我的计算机是8核,最多8个线程同时运行。而商店有10个,也就是说,我们的两个线程会一直等待前面的某一个线程释放出空闲才能继续运行。
![](http://pubimage.360doc.com/wz/default.gif)
异步获取api测试结果:一秒左右
![](http://pubimage.360doc.com/wz/default.gif)
为何差距如此大呢?
明智的选择是创建了一个配有线程池的执行器,线程池中线程的数目取决于你的应用需要处理的负担,但是你该如何选择合适的线程数目呢?
【选择正确的线程池大小
《Java并发编程实战》中给出如下公式:
Number = NCpu * Ucpu * ( 1 + W/C)Number : 线程数量NCpu : 处理器核数UCpu : 期望cpu利用率W/C : 等待时间与计算时间比
我们这里:99%d的时间是等待商店响应 W/C = 99 ,cpu利用率期望 100% ,NCpu = 9,推断出 number = 800。但是为了避免过多的线程搞死计算机,我们选择商店数与计算值中较小的一个。
【并行流与CompletableFuture
目前,我们对集合进行计算有两种方式:1.并行流 2.CompletableFuture;而CompletableFuture更加的灵活,我们可以配置线程池的大小确保整体的计算不会因为等待IO而发生阻塞。
书上给出的建议如下:
- 如果是计算密集型的操作并且没有IO推荐stream接口,因为实现简单效率也高,如果所有的线程都是计算密集型的也就没有必要创建比核数更多的线程。
- 反之,如果任务涉及到IO,网络等操作:CompletableFuture灵活性更好,因为大部分线程处于等待状态,需要让他们更加忙碌,并且再逻辑中加入异常处理可以更有效的监控是什么原因触发了等待。
现在我们知道了如何用CompletableFuture提供异步的api,后面的文章会学习如何利用CompletableFuture高效的操作同步api。