Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

10.9. CompletableFuture

10.9.1. runAsync 创建没有返回值的异步任务

runAsync 创建没有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是自定义线程池的重载方法

			
package cn.netkiller;

import cn.netkiller.thread.ThreadManager;
import lombok.SneakyThrows;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    @SneakyThrows
    public static void main(String[] args) {

        CompletableFuture.runAsync(() -> {
            System.out.println("do something...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CompletableFuture.runAsync(() -> {
            System.out.println("do something...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, executorService);

        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("do something....");
//            Thread.currentThread().setName("测试有返回值的异步执行");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        ThreadManager tm = new ThreadManager();
        System.out.println(tm.show());

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("Result ->" + completableFuture.isDone());


    }
}
			
			

			
do something...
do something...
do something....

=======================================================================================
|   ID |                             Name | Group | Daemon |         State | Priority |
---------------------------------------------------------------------------------------
|    1 |                             main |  main |  false |      RUNNABLE |        5 |
|   21 | ForkJoinPool.commonPool-worker-1 |  main |   true | TIMED_WAITING |        5 |
|   22 |                  pool-1-thread-1 |  main |  false | TIMED_WAITING |        5 |
|   23 | ForkJoinPool.commonPool-worker-2 |  main |   true | TIMED_WAITING |        5 |
=======================================================================================

Result ->true
			
			
			
package cn.netkiller.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread());

        AtomicInteger variable = new AtomicInteger(0);
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable));
        runAsync.join();
        System.out.println(variable.get());

    }

    public static void process(AtomicInteger variable) {
        System.out.println(Thread.currentThread().getName() + " Process...");
        variable.set(1024);
    }
}
			
			

10.9.2. supplyAsync 创建带有返回值的异步任务。

supplyAsync 创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法

			
// 带返回值异步请求,默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
 
// 带返回值的异步请求,可以自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)			
			
			
			
package cn.netkiller;

import cn.netkiller.thread.ThreadManager;
import lombok.SneakyThrows;

import java.util.concurrent.CompletableFuture;

public class Main {
    @SneakyThrows
    public static void main(String[] args) {

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("do something....");
            return "done";
        });

        System.out.println("Result ->" + completableFuture.get());

        ThreadManager tm = new ThreadManager();
        System.out.println(tm.show());
    }
}			
			
			

运行结果

			
do something....
Result ->done
=======================================================================================
|   ID |                             Name | Group | Daemon |         State | Priority |
---------------------------------------------------------------------------------------
|    1 |                             main |  main |  false |      RUNNABLE |        5 |
|   21 | ForkJoinPool.commonPool-worker-1 |  main |   true | TIMED_WAITING |        5 |
=======================================================================================		
			
			
			
package cn.netkiller;

import cn.netkiller.thread.ThreadManager;
import lombok.SneakyThrows;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    @SneakyThrows
    public static void main(String[] args) {


        ExecutorService executorService = Executors.newSingleThreadExecutor();

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("do something....");
            try {
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "done";
        }, executorService);

        System.out.println("Result ->" + completableFuture.get());

        ThreadManager tm = new ThreadManager();
        System.out.println(tm.show());
    }
}
			
			

运行结果

			
do something....
Result ->done
======================================================================
|   ID |            Name | Group | Daemon |         State | Priority |
----------------------------------------------------------------------
|    1 |            main |  main |  false |      RUNNABLE |        5 |
|   21 | pool-1-thread-1 |  main |  false |       WAITING |        5 |
======================================================================			
			
			

设置线程名称

			
package cn.netkiller;

import cn.netkiller.thread.ThreadManager;
import lombok.SneakyThrows;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    @SneakyThrows
    public static void main(String[] args) {

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("do something....");
            Thread.currentThread().setName("测试有返回值的异步执行");
            return "done";
        });

        System.out.println("Result ->" + completableFuture.get());

        ThreadManager tm = new ThreadManager();
        System.out.println(tm.show());
    }
}			
			
			

运行结果

			
do something....
Result ->done
==================================================================
|   ID |        Name | Group | Daemon |         State | Priority |
------------------------------------------------------------------
|    1 |        main |  main |  false |      RUNNABLE |        5 |
|   21 | 测试有返回值的异步执行 |  main |   true | TIMED_WAITING | 5 |
==================================================================	
			
			

通过 Supplier 对象创建异步执行

			
        CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {

                return "https://www.netkiller.cn";
            }
        }).exceptionally((throwable) -> {
            return throwable.toString();
        });			
			
			

10.9.3. 创建 CompletableFuture 实例,并且其他线程中使用

			
		public Future<Double> getPriceAsync(String product) {
        //创建 CompletableFuture 对象,对象中包含异步计算结果
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        //新建线程计算商品价格
        new Thread(() -> {
            try {
                double price = calculatePrice(product);
                //将异步计算得到的结果设置到 CompletableFuture 中,
                futurePrice.complete(price);
            } catch (Exception e) {
                futurePrice.completeExceptionally(e);
            }
        }).start();
        //无需等待计算结果,直接返回 CompletableFuture 对象
        return futurePrice;
    }			
			
			

10.9.4. 获取结果

			
        CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello netkiller");
        if (completableFuture.isDone()) {
            System.out.println(completableFuture.get());
        }			
			
			

Spring Service 用法

			
@Service
public class MyService {

    @Async
    public CompletableFuture<String> asyncMethod() {
        // 异步方法逻辑...
        return CompletableFuture.completedFuture("Result");
    }
}

// 调用异步方法并获取结果
CompletableFuture<String> future = myService.asyncMethod();
String result = future.get(); // 阻塞等待结果			
			
			

10.9.5. thenRun / thenRunAsync

thenRun/thenRunAsync 功能是什么?完成前置任务之后,自己在执行。

thenRun/thenRunAsync 区别是什么?thenRun 使用同一个线程执行任务,thenRunAsync 会再开一个新线程执行任务。

			
    @GetMapping("/completableFutureRun")
    public String completableFutureRun() {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " - CompletableFuture 前置任务");
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        CompletableFuture thenRun = completableFuture.thenRun(() -> {
            System.out.println(Thread.currentThread().getName() + " - 接着执行第二个 thenRun 任务");
        });
        CompletableFuture thenRunAsync = completableFuture.thenRunAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " - 接着执行第二个 thenRunAsync 任务");
        });
        return "Done";
    }			
			
			

运行结果

			
ForkJoinPool.commonPool-worker-1 - CompletableFuture 前置任务
ForkJoinPool.commonPool-worker-1 - 接着执行第二个 thenRun 任务
ForkJoinPool.commonPool-worker-2 - 接着执行第二个 thenRunAsync 任务
			
			

这里可以看到 thenRunAsync 的线程变化,开启新线程 ForkJoinPool.commonPool-worker-2 处理任务

10.9.6. thenAccept / thenAcceptAsync

thenAccept/thenAcceptAsync 的功能是,前置任务执行完毕之后,将返回值给到 thenAccept/thenAcceptAsync,再执行接下来的任务。

			
    @GetMapping("/completableFutureAccept")
    public String completableFutureAccept() {
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
            log.info(Thread.currentThread().getName() + " - CompletableFuture 前置任务");
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "前置任务执行完成";
        });
        CompletableFuture<Void> thenAccept = supplyAsync.thenAccept((rev) -> {
            log.info(Thread.currentThread().getName() + " - 接着执行第二个 thenAccept 任务");
            log.info("前置任务返回值:" + rev);
        });
        CompletableFuture<Void> thenAcceptAsync = supplyAsync.thenAcceptAsync((rev) -> {
            log.info(Thread.currentThread().getName() + " - 接着执行第二个 thenAcceptAsync 任务");
            log.info("前置任务返回值:" + rev);
        });
        return "Done";
    }			
			
			

输出结果

			
2023-05-10T10:38:48.008+08:00  INFO 96282 --- [onPool-worker-1] c.n.c.test.TestThreadController          : ForkJoinPool.commonPool-worker-1 - CompletableFuture 前置任务
2023-05-10T10:38:53.015+08:00  INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController          : ForkJoinPool.commonPool-worker-2 - 接着执行第二个 thenAcceptAsync 任务
2023-05-10T10:38:53.015+08:00  INFO 96282 --- [onPool-worker-1] c.n.c.test.TestThreadController          : ForkJoinPool.commonPool-worker-1 - 接着执行第二个 thenAccept 任务
2023-05-10T10:38:53.016+08:00  INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController          : 前置任务返回值:前置任务执行完成
2023-05-10T10:38:53.016+08:00  INFO 96282 --- [onPool-worker-1] c.n.c.test.TestThreadController          : 前置任务返回值:前置任务执行完成			
			
			

10.9.7. thenApply / thenApplyAsync

thenApply/thenApplyAsync 前置任务执行完毕之后,结果作为入参,thenApply/thenApplyAsync 执行完毕之后再返回执行结果

			
    @GetMapping("/completableFutureApply")
    public String completableFutureApply() throws ExecutionException, InterruptedException {
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
            log.info(Thread.currentThread().getName() + " - CompletableFuture 前置任务");
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "第一步";
        });

        CompletableFuture<String> thenApply = supplyAsync.thenApply((rev) -> {
            log.info(Thread.currentThread().getName() + " - 接着执行第二个 thenApply 任务");
            log.info("前置任务返回值:" + rev);
            return "第二步";
        });

        CompletableFuture<String> thenApplyAsync = supplyAsync.thenApplyAsync((rev) -> {
            log.info(Thread.currentThread().getName() + " - 接着执行第二个 thenApplyAsync 任务");
            log.info("前置任务返回值:" + rev);
            return "第二步";
        });
        log.info("supplyAsync:{}", supplyAsync.get());
        log.info("thenApply:{}", thenApply.get());
        log.info("thenApplyAsync:{}", thenApplyAsync.get());
        return "Done";
    }			
			
			

			
2023-05-10T10:39:57.913+08:00  INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController          : ForkJoinPool.commonPool-worker-2 - CompletableFuture 前置任务
2023-05-10T10:40:02.917+08:00  INFO 96282 --- [  XNIO-1 task-2] c.n.c.test.TestThreadController          : XNIO-1 task-2 - 接着执行第二个 thenApply 任务
2023-05-10T10:40:02.917+08:00  INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController          : ForkJoinPool.commonPool-worker-2 - 接着执行第二个 thenApplyAsync 任务
2023-05-10T10:40:02.918+08:00  INFO 96282 --- [  XNIO-1 task-2] c.n.c.test.TestThreadController          : 前置任务返回值:第一步
2023-05-10T10:40:02.918+08:00  INFO 96282 --- [onPool-worker-2] c.n.c.test.TestThreadController          : 前置任务返回值:第一步
2023-05-10T10:40:02.918+08:00  INFO 96282 --- [  XNIO-1 task-2] c.n.c.test.TestThreadController          : supplyAsync:第一步
2023-05-10T10:40:02.918+08:00  INFO 96282 --- [  XNIO-1 task-2] c.n.c.test.TestThreadController          : thenApply:第二步
2023-05-10T10:40:02.919+08:00  INFO 96282 --- [  XNIO-1 task-2] c.n.c.test.TestThreadController          : thenApplyAsync:第二步
			
			

10.9.8. runAsync / thenAccept / thenApply 区别

runAsync 配合 thenRun/thenRunAsync 使用

			
runAsync --> thenRun/thenRunAsync 无返回值			
			
			

supplyAsync 配合 thenAccept/thenAcceptAsync 使用

			
supplyAsync -- 返回值 --> thenAccept/thenAcceptAsync --> 无返回值
			
			

supplyAsync 配合 thenApply/thenApplyAsync 使用

			
supplyAsync -- 返回值 --> thenApply/thenApplyAsync -- 返回值 -->			
			
			

10.9.9. whenComplete 任务完成时执行,并且返回结果和异常

whenComplete 与 runAsync / thenAccept / thenApply 区别是能处理 Throwable

			
    @GetMapping("/completableFutureWhenComplete")
    public String completableFutureWhenComplete() throws ExecutionException, InterruptedException {

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程名称:" + Thread.currentThread().getName());
            return "前置任务完成";
        }).whenComplete((result, throwable) -> {
            System.out.println("前置任务返回值:" + result);
        });
        System.out.println(completableFuture.get());
        return "Done";
    }			
			
			

运行结果

			
当前线程名称:ForkJoinPool.commonPool-worker-1
前置任务返回值:前置任务完成
前置任务完成			
			
			
			
        future.whenCompleteAsync(new BiConsumer<String, Throwable>() {
            @Override
            public void accept(String s, Throwable throwable) {
                System.out.println("whenCompleteAsync: " + System.currentTimeMillis() + " : " + s);
                System.out.println("whenCompleteAsync: " + System.currentTimeMillis() + " : " + throwable.toString());
            }
        });			
			
			

10.9.10. 超时处理

			
		CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        });

        future.completeOnTimeout("default timeout result", 3 * 1000, TimeUnit.MILLISECONDS);			
			
			

10.9.11. 按顺序执行

			
@Service
public class MyService {

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<String> asyncMethod1() {
        // 异步方法1逻辑...
        return CompletableFuture.completedFuture("Result1");
    }

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<String> asyncMethod2() {
        // 异步方法2逻辑...
        return CompletableFuture.completedFuture("Result2");
    }
}

// 调用异步方法并处理结果顺序 
CompletableFuture<String> future1 = myService.asyncMethod1(); 
CompletableFuture<String> future2 = future1.thenCompose(result1 -> myService.asyncMethod2());

String finalResult = future2.get(); // 阻塞等待最终结果	
			
			

10.9.12. thenCombine、thenAcceptBoth 和runAfterBoth

thenCombine会将两个任务的执行结果作为所提供函数的参数,且该方法有返回值。

thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值。

runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。

			
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " completableFuture1 do something....");
            return 1;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " completableFuture2 do something....");
            return 2;
        });

        CompletableFuture<Integer> completableFuture3 = completableFuture1.thenCombine(completableFuture2, (a, b) -> {
            System.out.println(Thread.currentThread() + " completableFuture3 do something....");
            return a + b;
        });

        System.out.println("completableFuture3结果->" + completableFuture3.get());
}

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " completableFuture1 do something....");
            return 1;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " completableFuture2 do something....");
            return 2;
        });
        
        CompletableFuture<Void> completableFuture3 = completableFuture1.thenAcceptBoth(completableFuture2, (a, b) -> {
            System.out.println(Thread.currentThread() + " completableFuture3 do something....");
            System.out.println(a + b);
        });

        System.out.println("completableFuture3结果->" + completableFuture3.get());
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " completableFuture1 do something....");
            return 1;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " completableFuture2 do something....");
            return 2;
        });

        CompletableFuture<Void> completableFuture3 = completableFuture1.runAfterBoth(completableFuture2, () -> {
            System.out.println(Thread.currentThread() + " completableFuture3 do something....");
        });

        System.out.println("completableFuture3结果->" + completableFuture3.get());
}
			
			

10.9.13. applyToEither、acceptEither和runAfterEither

这三个方法和上面一样也是将两个CompletableFuture组合起来处理,当有一个任务正常完成时,就会进行下阶段任务。

applyToEither会将已经完成任务的执行结果作为所提供函数的参数,且该方法有返回值;

acceptEither同样将已经完成任务的执行结果作为方法入参,但是无返回值;

runAfterEither没有入参,也没有返回值。

				
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " completableFuture1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "completableFuture1 任务完成";
        });

        CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " completableFuture2 do something....");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "completableFuture2 任务完成";
        });

        CompletableFuture<String> completableFuture3 = completableFuture1.applyToEither(completableFuture2, (result) -> {
            System.out.println("接收到" + result);
            System.out.println(Thread.currentThread() + " completableFuture3 do something....");
            return "completableFuture3 任务完成";
        });

        System.out.println("completableFuture3结果->" + completableFuture3.get());
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " completableFuture1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "completableFuture1 任务完成";
        });

        CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " completableFuture2 do something....");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "completableFuture2 任务完成";
        });

        CompletableFuture<Void> completableFuture3 = completableFuture1.acceptEither(completableFuture2, (result) -> {
            System.out.println("接收到" + result);
            System.out.println(Thread.currentThread() + " completableFuture3 do something....");
        });

        System.out.println("completableFuture3结果->" + completableFuture3.get());
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " completableFuture1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("completableFuture1 任务完成");
            return "completableFuture1 任务完成";
        });

        CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " completableFuture2 do something....");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("completableFuture2 任务完成");
            return "completableFuture2 任务完成";
        });

        CompletableFuture<Void> completableFuture3 = completableFuture1.runAfterEither(completableFuture2, () -> {
            System.out.println(Thread.currentThread() + " completableFuture3 do something....");
            System.out.println("completableFuture3 任务完成");
        });

        System.out.println("completableFuture3结果->" + completableFuture3.get());
}
				
			

10.9.14. allOf / anyOf

allOf:CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。

			
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "AAA").whenComplete((value, throwable) -> {
            System.out.println(value);
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "BBB").whenComplete((value, throwable) -> {
            System.out.println(value);
        });
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "CCC").whenComplete((value, throwable) -> {
            System.out.println(value);
        });
        CompletableFuture.allOf(new CompletableFuture[]{future1, future2, future3}).join();			
			
			
			
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "AAA").whenComplete((value, throwable) -> {
            System.out.println(value);
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "BBB").whenComplete((value, throwable) -> {
            System.out.println(value);
        });
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "CCC").whenComplete((value, throwable) -> {
            System.out.println(value);
        });
        CompletableFuture.allOf(future1, future2, future3).join();			
			
			
				
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "AAA").whenComplete((value, throwable) -> {
            System.out.println(value);
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "BBB").whenComplete((value, throwable) -> {
            System.out.println(value);
        });
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "CCC").whenComplete((value, throwable) -> {
            System.out.println(value);
        });
        List<CompletableFuture<String>> completableFutures = Stream.of(future1, future2, future3).toList();
        var completableFutureArray = completableFutures.toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(completableFutureArray).join();
				
			

anyOf:CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果。

			
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "AAA").whenComplete((value, throwable) -> {
            System.out.println(value);
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "BBB").whenComplete((value, throwable) -> {
            System.out.println(value);
        });
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "CCC").whenComplete((value, throwable) -> {
            System.out.println(value);
        });

        CompletableFuture<Object> completableFuture = CompletableFuture.anyOf(future1, future2, future3);
        System.out.println(completableFuture.get());
			
			

10.9.15. 并行执行 CompletableFuture

			
package cn.netkiller.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread());


        Parallel parallel = new Parallel();

        parallel.addAsyncTask(() -> {
                    System.out.println(Thread.currentThread().getName());
                    return "task1";
                })
                .addAsyncTask(() -> {
                    System.out.println(Thread.currentThread().getName());
                    return "task2";
                })
                .addAsyncTask(() -> {
                    System.out.println(Thread.currentThread().getName());
                    return "task3";
                })
                .addAsyncTask(() -> {
                    System.out.println(Thread.currentThread().getName());
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "finally";
                }).join();

        List<CompletableFuture<String>> futures = parallel.get();

        futures.stream().forEach(item -> {
            System.out.println(item.getNow("no result"));
        });

    }

    public static class Parallel<T> {

        private final List<CompletableFuture<T>> futures;

        Parallel() {
            this(10);
        }

        Parallel(int size) {
            futures = new ArrayList<>(size);
        }

        public Parallel addAsyncTask(Supplier<T> supplier) {
            futures.add(CompletableFuture.supplyAsync(supplier));
            return this;
        }

        public List<CompletableFuture<T>> get() {
            return futures;
        }

        public void join() {
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join();
        }

        public void clear() {
            futures.clear();
        }
    }
}

			
			
			

10.9.16. 通知完成任务

			
package cn.netkiller.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public class Test {


    public static void main(String[] args) throws InterruptedException, ExecutionException {

        CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    while (true) {
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println("supplyAsync: " + System.currentTimeMillis());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }

                return "https://www.netkiller.cn";
            }
        });


        future.whenCompleteAsync(new BiConsumer<String, Throwable>() {
            @Override
            public void accept(String s, Throwable throwable) {
                System.out.println("whenCompleteAsync: " + System.currentTimeMillis() + " : " + s);
                System.out.println("whenCompleteAsync: " + System.currentTimeMillis() + " : " + throwable.toString());
            }
        });


        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (Exception e) {
                    //异常退出。
                    future.completeExceptionally(e);
                }

                // CompletableFuture被通知线程任务完成。
                System.out.println("完成任务: " + System.currentTimeMillis());
                future.complete("任务完成");
            }
        }).start();

        System.out.println("任务返回:" + future.get());


    }
}			
			
			

completeExceptionally 抛出异常,终止执行

			
        CompletableFuture completableFuture = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
        completableFuture.completeExceptionally(new Exception("异常终止"));

        try {
            completableFuture.join();
            System.out.println(completableFuture.get());
        } catch (CompletionException ex) { // just for testing
            System.err.println("completed exceptionally: " + ex.getCause().getMessage());
        }			
			
			

10.9.17. 异常处理

			
package cn.netkiller.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Test {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Neo!");
//            return "Neo";
        }).thenApply(i -> "Success: " + i);

        CompletableFuture<String> result = future.exceptionally(e -> {
            return e.toString();
        });

        System.out.println(result.get());

    }
}
			
			
			
	public static CompletableFuture whenComplete(int a, int b){
        return CompletableFuture.supplyAsync(() -> a/b)
                .whenComplete((result, ex) -> {
                    if (null != ex) {
                        System.out.println("whenComplete error:\t"+ex.getMessage());
                    }
                });
    }

        try {
            System.out.println("success:\t"+whenComplete(6,3).get());
            System.out.println("exception:\t"+whenComplete(6,0).get());
        } catch (Exception exception){
            System.out.println("catch===="+exception.getMessage());
        }

输出结果:

success:    2
whenComplete error:    java.lang.ArithmeticException: / by zero
catch====java.lang.ArithmeticException: / by zero			
			
			
			
public static CompletableFuture divide(int a, int b){
        return CompletableFuture.supplyAsync(() -> a/b)
                .handle((result, ex) -> {
                    if (null != ex) {
                        System.out.println(ex.getMessage());
                        return 0;
                    } else {
                        return result;
                    }
                });
    }

        try {
            System.out.println("success:\t"+divide(6,3).get());
            System.out.println("exception:\t"+divide(6,0).get());
        } catch (Exception exception){
            System.out.println("catch="+exception.getMessage());
        }

输出结果:
success:    2
java.lang.ArithmeticException: / by zero
exception:    0			
			
			

10.9.18. CompletableFuture 实现 Pipeline 流水线

			
package cn.netkiller.test;

import lombok.SneakyThrows;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class CompletableFuturePipeline {

    private final Parallel parallel = new Parallel();

    public CompletableFuturePipeline() {

    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread().getName());

        CompletableFuturePipeline test = new CompletableFuturePipeline();
        test.begin().batch().run().end();
        test.parallel().supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + ": parallel1");
            return "parallel1";
        }).supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + ": parallel2");
            return "parallel2";
        }).join();


    }

    @SneakyThrows
    public CompletableFuturePipeline begin() {
        String method = Thread.currentThread().getStackTrace()[1].getMethodName();

        parallel.runAsync(() -> {
            String thread = Thread.currentThread().getName();
            System.out.printf("%s - %s\n", thread, method);
        });
        return this;
    }

    public CompletableFuturePipeline run() throws ExecutionException, InterruptedException {
        String method = Thread.currentThread().getStackTrace()[1].getMethodName();
        parallel.supplyAsync(() -> {
            System.out.printf("%s - %s\n", Thread.currentThread().getName(), method);
            return "OK";
        });
        System.out.println(this.asyncMethod1().get());
        return this;
    }

    public CompletableFuturePipeline batch() {
        String method = Thread.currentThread().getStackTrace()[1].getMethodName();
        Parallel batchs = this.parallel();

        batchs.runAsync(() -> {
                    String thread = Thread.currentThread().getName();
                    System.out.printf("%s - %s - task1\n", thread, method);
                })
                .runAsync(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    String thread = Thread.currentThread().getName();
                    System.out.printf("%s - %s - task2\n", thread, method);

                })
                .runAsync(() -> {
                    String thread = Thread.currentThread().getName();
                    System.out.printf("%s - %s - task3\n", thread, method);
                })
                .supplyAsync(() -> {
                    String thread = Thread.currentThread().getName();
                    System.out.printf("%s - %s - finally\n", thread, method);
                    return "finally";
                }).join();
        System.out.println(batchs.futures.size());

        return this;
    }

    @SneakyThrows
    public CompletableFuturePipeline end() {
        String method = Thread.currentThread().getStackTrace()[1].getMethodName();
        parallel.supplyAsync(() -> {
            System.out.printf("%s - %s\n", Thread.currentThread().getName(), method);
            return "End";
        });

        parallel.join();

        List<CompletableFuture<String>> futures = parallel.get();

        futures.stream().forEach(item -> {
            System.out.println(item.getNow("no result"));
        });

        System.out.println(parallel.futures.size());
        return this;
    }

    public Parallel parallel() {
        return new Parallel();
    }

    public Parallel parallel(int size) {
        return new Parallel(size);
    }

    public CompletableFuture<String> asyncMethod1() {
//        Thread.currentThread().setName(Thread.currentThread().getName() + "-" + this.getClass().getSimpleName());
        System.out.println(Thread.currentThread().getName() + ": asyncMethod1");
        return CompletableFuture.completedFuture("Result1");
    }

    public static class Parallel<T> {

        private final List<CompletableFuture> futures;
//        private final List<CompletableFuture<Void>> voids = new ArrayList<>();

        Parallel() {
            this(10);
        }

        Parallel(int size) {
            futures = new ArrayList<>(size);
        }

        public Parallel runAsync(Runnable runnable) {
            futures.add(CompletableFuture.runAsync(runnable));
            return this;
        }

        public Parallel runAsync(Runnable runnable, Executor executor) {
            futures.add(CompletableFuture.runAsync(runnable, executor));
            return this;
        }

        public Parallel supplyAsync(Supplier<T> supplier) {
            futures.add(CompletableFuture.supplyAsync(supplier));
            return this;
        }

        public Parallel supplyAsync(Supplier<T> supplier, Executor executor) {
            futures.add(CompletableFuture.supplyAsync(supplier, executor));
            return this;
        }

        public List<CompletableFuture> get() {
            return futures;
        }

        public void join() {
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join();
        }

        public void clear() {
            futures.clear();
        }
    }
}