Future 和Callable接口
Future接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
Callable接口中定义了需要有返回的任务需要实现的方法。
比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,
主线程就去做其他事情了,过了一会才去获取子任务的执行结果。
FutureTask的缺陷
get()方法会造成阻塞
public class CompletableFutureDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { FutureTask<String> futureTask = new FutureTask<>(() -> { System.out.println("-----come in FutureTask"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return ""+ThreadLocalRandom.current().nextInt(100); }); Thread t1 = new Thread(futureTask,"t1"); t1.start(); //3秒钟后才出来结果,还没有计算你提前来拿(只要一调用get方法,对于结果就是不见不散,会导致阻塞) //System.out.println(Thread.currentThread().getName()+"\t"+futureTask.get()); //3秒钟后才出来结果,我只想等待1秒钟,过时不候 System.out.println(Thread.currentThread().getName()+"\t"+futureTask.get(1L,TimeUnit.SECONDS)); System.out.println(Thread.currentThread().getName()+"\t"+" run... here"); } }
一旦调用
get()
方法,不管是否计算完成都会导致阻塞轮询获取结果
futureTask.isDone()
返回true的时候再调用get()
方法 优点 : 解决了传统get()的阻塞问题
缺陷:
- 轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果.
public class CompletableFutureDemo2 { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<>(() -> { System.out.println("-----come in FutureTask"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return ""+ThreadLocalRandom.current().nextInt(100); }); new Thread(futureTask,"t1").start(); System.out.println(Thread.currentThread().getName()+"\t"+"线程完成任务"); /** * 用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果 */ while(true) { if (futureTask.isDone()) { System.out.println(futureTask.get()); break; } } } }
CompletableFuture 对Future的改进
CompletionStage
- CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另一个阶段。
- 一个阶段的计算执行结果可以是一个Function,Consumer或者Runnable , 比如
stage.thenApply(x → square(x) ).thenAccept( x → System.out.print(x)).thenRun( ( )→ System.out.println( ) )
- 一个阶段的执行可能是被单个阶段的完成触发,也可能由多个阶段一起触发
代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数
CompletableFuture
在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
它可能代表一个明确完成的Future,也有可能代表一个完成阶段 CompletionStage,它支持在计算完成以后触发一些函数或执行某些动作
核心静态方法
runAsync 无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码public class CompletableFutureDemo2 { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()+"\t"+"-----come in"); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("-----task is over"); }); System.out.println(future.get()); } }
supplyAsync 有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor
没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码public class CompletableFutureDemo2 { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "-----come in"); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return ThreadLocalRandom.current().nextInt(100); }); System.out.println(completableFuture.get()); } }
对Future的增强
- 异步任务结束时,会自动回调某个对象的方法
- 异步任务出错时,会自动回调某个对象的方法
- 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
public class cfuture4 { public static void main(String[] args) throws Exception { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "-----come in"); int result = ThreadLocalRandom.current().nextInt(10); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("-----计算结束耗时1秒钟,result: "+result); if(result > 6) { int age = 10/0; } return result; }).whenComplete((v,e) ->{ if(e == null) { System.out.println("-----result: "+v); } }).exceptionally(e -> { System.out.println("-----exception: "+e.getCause()+"\t"+e.getMessage()); return -44; }); //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } }
并行计算案例
经常出现在等待某条 SQL 执行完成后,再继续执行下一条 SQL ,而这两条 SQL 本身是并无关系的,可以同时进行执行的。
我们希望能够两条 SQL 同时进行处理,而不是等待其中的某一条 SQL 完成后,再继续下一条。同理,
对于分布式微服务的调用,按照实际业务,如果是无关联step by step的业务,可以尝试是否可以多箭齐发,同时调用。
我们去比同一个商品在各个平台上的价格,要求获得一个清单列表,
1 step by step,查完京东查淘宝,查完淘宝查天猫......
2 all 一口气同时查询。。。。。
public class T1 { static List<NetMall> list = Arrays.asList( new NetMall("jd"), new NetMall("tmall"), new NetMall("pdd"), new NetMall("mi") ); public static List<String> findPriceSync(List<NetMall> list,String productName) { return list.stream().map(mall -> String.format(productName+" %s price is %.2f",mall.getNetMallName(),mall.getPriceByName(productName))).collect(Collectors.toList()); } public static List<String> findPriceASync(List<NetMall> list,String productName) { return list.stream().map(mall -> CompletableFuture.supplyAsync(() -> String.format(productName + " %s price is %.2f", mall.getNetMallName(), mall.getPriceByName(productName)))).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList()); } public static void main(String[] args) { long startTime = System.currentTimeMillis(); List<String> list1 = findPriceSync(list, "thinking in java"); for (String element : list1) { System.out.println(element); } long endTime = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"); long startTime2 = System.currentTimeMillis(); List<String> list2 = findPriceASync(list, "thinking in java"); for (String element : list2) { System.out.println(element); } long endTime2 = System.currentTimeMillis(); System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒"); } } class NetMall { @Getter private String netMallName; public NetMall(String netMallName) { this.netMallName = netMallName; } public double getPriceByName(String productName) { return calcPrice(productName); } private double calcPrice(String productName) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return ThreadLocalRandom.current().nextDouble() + productName.charAt(0); } }
CompletableFuture常用方法
获得结果
public T get()
普通Future的阻塞效果
public T get(long timeout, TimeUnit unit)
普通Future的有效时间的阻塞效果
public T getNow(T valueIfAbsent)
立即获取结果不阻塞 计算完,返回计算完成后的结果 没算完,返回设定的valueIfAbsent值
public class CompletableFutureDemo2 { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return 533; }); //去掉注释上面计算没有完成,返回444 //开启注释上满计算完成,返回计算结果 try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(completableFuture.getNow(444)); } }
public T join()
public class CompletableFutureDemo2 { public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(CompletableFuture.supplyAsync(() -> "abc").thenApply(r -> r + "123").join()); } }
主动触发计算
public boolean complete(T value)
public class CompletableFutureDemo2 { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return 533; }); //注释掉暂停线程,get还没有算完只能返回complete方法设置的444;暂停2秒钟线程,异步线程能够计算完成返回get // try { // TimeUnit.SECONDS.sleep(2); // } catch (InterruptedException e) { // e.printStackTrace(); // } //当调用CompletableFuture.get()被阻塞的时候,complete方法就是结束阻塞并get()获取设置的complete里面的值. System.out.println(completableFuture.complete(444) + "\t" + completableFuture.get()); // true 444 } }
对计算结果进行处理
thenApply
计算结果存在依赖关系,这两个线程串行化
由于存在依赖关系(当前步报错,不走下一步),当前步骤有异常的话就叫停。
public static void main(String[] args) throws ExecutionException, InterruptedException { //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化, CompletableFuture.supplyAsync(() -> { //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("111"); return 1024; }).thenApply(f -> { System.out.println("222"); return f + 1; }).thenApply(f -> { int age = 10/0; // 异常情况:那步出错就停在那步。 System.out.println("333"); return f + 1; }).whenCompleteAsync((v, e) -> { System.out.println("*****v: " + v); }).exceptionally(e -> { e.printStackTrace(); return null; }); System.out.println("-----主线程结束,END"); // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭: try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } }
handle
有异常也可以往下一步走,根据带的异常参数可以进一步处理
public static void main(String[] args) throws ExecutionException, InterruptedException { //当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化, // 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理 CompletableFuture.supplyAsync(() -> { //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("111"); return 1024; }).handle((f, e) -> { int age = 10 / 0; System.out.println("222"); return f + 1; }).handle((f, e) -> { System.out.println("333"); return f + 1; }).whenCompleteAsync((v, e) -> { System.out.println("*****v: " + v); }).exceptionally(e -> { e.printStackTrace(); return null; }); System.out.println("-----主线程结束,END"); // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭: try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } }
whenComplete
执行当前任务的线程继续执行
whenComplete
的任务whenCompleteAsync
把任务继续提交给线程池来进行执行
exceptionally
处理异常
对计算结果进行消费
thenAccept
接收任务的处理结果,并消费处理,无返回结果
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(() -> { return 1; }).thenApply(f -> { return f + 2; }).thenApply(f -> { return f + 3; }).thenApply(f -> { return f + 4; }).thenAccept(r -> System.out.println(r)); }
thenRun
thenRun(Runnable runnable)
任务 A 执行完执行 B,并且 B 不需要 A 的结果
thenApply
任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join()); System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join()); System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join()); }
对计算速度进行选用
applyToEither
谁快用谁
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in "); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return 10; }); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in "); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return 20; }); CompletableFuture<Integer> thenCombineResult = completableFuture1.applyToEither(completableFuture2, f -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in "); return f + 1; }); System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get()); }
对计算结果进行合并
thenCombine
两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine 来处理
先完成的先等着,等待其它分支任务
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in "); return 10; }); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in "); return 20; }); CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in "); return x + y; }); System.out.println(thenCombineResult.get()); }
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1"); return 10; }).thenCombine(CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2"); return 20; }), (x,y) -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3"); return x + y; }).thenCombine(CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4"); return 30; }),(a,b) -> { System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5"); return a + b; }); System.out.println("-----主线程结束,END"); System.out.println(thenCombineResult.get()); // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭: try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }