CompletableFuture学习笔记

前言

JDK8的新特性真的是非常多,今天看到一个特别好用的并发工具类CompletableFuture,学会了这个可以说多线程代码是非常好写了,于是学习了一下,这边跟随B站上一个宝藏博主的视频学习,点击宝藏博主链接可以直达他的主页。

下面主要以CompletableFuture的几个API结合各种场景展开。


工具类

我们在看下面代码前先看一下工具类的作用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CommonUtil {
private CommonUtil() {
}

// 耗时
public static void timeCost(long millis) {
ThreadUtil.sleep(millis);
}

// 创建耗时行为
public static void createBehavior(String behavior, long millis) {
System.out.println(String.join(StrPool.DASHED, String.valueOf(System.currentTimeMillis()), String.valueOf(Thread.currentThread().getId()),
String.valueOf(Thread.currentThread().getName()), behavior));
timeCost(millis);
}

// 创建行为
public static void createBehavior(String behavior) {
System.out.println(String.join(StrPool.DASHED, String.valueOf(System.currentTimeMillis()), String.valueOf(Thread.currentThread().getId()),
String.valueOf(Thread.currentThread().getName()), behavior));
}
}

SupplyAsync

场景:
小明进门点菜,点完菜之后厨师开始做菜,同时小明打王者进行等待,等厨师做完菜之后小明开始吃饭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class OpenAsync {
public static void main(String[] args) {
CommonUtil.createBehavior("小明进门");
CommonUtil.createBehavior("小明点了 番茄炒蛋 + 饭");

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("厨师做菜", 1000);
return "番茄炒鸡蛋";
});

CommonUtil.createBehavior("小明玩王者");
CommonUtil.createBehavior(String.format("小明吃%s", future.join()));
}
}
1
2
3
4
5
1624810471194-1-main-小明进门
1624810471194-1-main-小明点了 番茄炒蛋 + 饭
1624810471239-1-main-小明玩王者
1624810471239-11-ForkJoinPool.commonPool-worker-1-厨师做菜
1624810472253-1-main-小明吃番茄炒鸡蛋

Compose

场景:
小明进门点菜,这时厨师开始做菜,等做完菜之后需要服务员打饭,在此期间小明可以玩王者等待,等到服务员打饭成功后,小明开始吃饭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Compose {
public static void main(String[] args) {
CommonUtil.createBehavior("小明进门");
CommonUtil.createBehavior("小明点了 番茄炒蛋 + 饭");

CompletableFuture<String> order = CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("厨师做菜", 1000);
return "番茄炒蛋";
}).thenCompose(dish -> CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("服务员打饭", 1000);
return dish + " + 饭";
}));

CommonUtil.createBehavior("小明打王者");
CommonUtil.createBehavior(String.format("小明吃%s", order.join()));
}
}
1
2
3
4
5
6
1624810596064-1-main-小明进门
1624810596065-1-main-小明点了 番茄炒蛋 + 饭
1624810596112-11-ForkJoinPool.commonPool-worker-1-厨师做菜
1624810596113-1-main-小明打王者
1624810597117-13-ForkJoinPool.commonPool-worker-2-服务员打饭
1624810598135-1-main-小明吃番茄炒蛋 + 饭

thenCompose方法的结果是上一个任务的跑完之后把结果交给下一个异步任务也就是说上一个异步任务跑完之后才会开启下一个异步任务。


Combine

场景:
小明进门点菜,这时饭店饭还没蒸好,所以厨师开始做菜的同时,服务员需要开始蒸饭,小明打王者等待等到厨师和服务员,蒸完饭做完菜,并且服务员打完饭,小明开始吃饭。

我们使用之前的学过的知识也可以做到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void other() {
CommonUtil.createBehavior("小明进门");
CommonUtil.createBehavior("小明点了番茄炒蛋");


CompletableFuture<String> dish = CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("厨师做菜", 1000);
return "番茄炒蛋";
});

CompletableFuture<String> rice = CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("服务员蒸饭", 2000);
return "饭";
});

CommonUtil.createBehavior("小明打王者");
String order = String.format("%s + %s", dish.join(), rice.join());
CommonUtil.createBehavior("服务员打饭", 1000);
CommonUtil.createBehavior(String.format("小明吃%s", order));
}
1
2
3
4
5
6
7
1624810758571-1-main-小明进门
1624810758571-1-main-小明点了番茄炒蛋
1624810758618-11-ForkJoinPool.commonPool-worker-1-厨师做菜
1624810758618-1-main-小明打王者
1624810758619-12-ForkJoinPool.commonPool-worker-2-服务员蒸饭
1624810760642-1-main-服务员打饭
1624810761646-1-main-小明吃番茄炒蛋 + 饭

但是相对来说代码不够优雅,这边有更好的API让代码变得更加优雅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {
CommonUtil.createBehavior("小明进门");
CommonUtil.createBehavior("小明点了番茄炒蛋");

CompletableFuture<String> order = CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("厨师做菜", 1000);
return "番茄炒蛋";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("服务员蒸饭", 2000);
return "饭";
}), (dish, rice) -> {
CommonUtil.createBehavior("服务员打饭", 10);
return dish + " + " + rice;
});

CommonUtil.createBehavior("小明打王者");
CommonUtil.createBehavior(String.format("小明吃%s", order.join()));
}
1
2
3
4
5
6
7
1624810821940-1-main-小明进门
1624810821941-1-main-小明点了番茄炒蛋
1624810821986-11-ForkJoinPool.commonPool-worker-1-厨师做菜
1624810821986-12-ForkJoinPool.commonPool-worker-2-服务员蒸饭
1624810821987-1-main-小明打王者
1624810823990-12-ForkJoinPool.commonPool-worker-2-服务员打饭
1624810824017-1-main-小明吃番茄炒蛋 + 饭

thenCombine的作用就是等两个任务都执行完之后得到两个结果,再把两个结果加工成一个结果。


Apply

场景:
小明买单,把钱付给服务员并要求开发票,服务员收到款之后就去开发票,同时小明也没有闲着,接了个电话准备回去开黑,刚接完电话服务员就过来,于是小明就走出餐厅,准备回家。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class Apply {

/**
* 小明和服务员分别在不同的两个线程
*/
public static void method1() {
CommonUtil.createBehavior("小明吃好了");
CommonUtil.createBehavior("小明 结账、要求发票");

CompletableFuture<String> invoice = CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("服务员收款 500元", 100);
CommonUtil.createBehavior("服务员开发票 面额 500元", 1000);
return "500元发票";
});
CommonUtil.createBehavior("小明接到朋友电话,想一起打游戏");
CommonUtil.createBehavior(String.format("小明拿到%s,准备回家", invoice.join()));
}

/**
* 如果要求收款和开发票的两个服务员不在同一个线程,我们如何实现
*/
public static void method2() {
CommonUtil.createBehavior("小明吃好了");
CommonUtil.createBehavior("小明 结账、要求发票");

CompletableFuture<String> invoice = CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("服务员收款 500元", 100);


CompletableFuture<String> waiter2 = CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("服务员开发票 面额 500元", 1000);
return "500元发票";
});

return waiter2.join();
});

CommonUtil.createBehavior("小明接到朋友电话,想一起打游戏");
CommonUtil.createBehavior(String.format("小明拿到%s,准备回家", invoice.join()));
}

/**
* 上面两种方法两个操作都放在同一代码块中,两个服务员也都放在同一代码块中,如果我们
* 要修改其中一段逻辑就会影响另一段逻辑这是十分不好的。
*/
public static void method3() {
CommonUtil.createBehavior("小明吃好了");
CommonUtil.createBehavior("小明 结账、要求发票");

CompletableFuture<String> invoice = CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("服务员收款 500元", 100);
return "500";
}).thenApply(money -> {
CommonUtil.createBehavior(String.format("服务员开发票 面额 %s", money), 1000);
return String.format("%s元发票", money);
});

CommonUtil.createBehavior("小明接到朋友电话,想一起打游戏");
CommonUtil.createBehavior(String.format("小明拿到%s,准备回家", invoice.join()));
}

/**
* 我们发现上面这种写法开发票和收款是同一个服务员,也就是在同一个线程,如果我们要两个服务员操作,我们怎么办
*
* 这里两个线程我这边展示出来是一个线程,是因为线程间调度判断上个线程已经空闲了就直接复用了.
*/
public static void method4() {
CommonUtil.createBehavior("小明吃好了");
CommonUtil.createBehavior("小明 结账、要求发票");

CompletableFuture<String> invoice = CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("服务员收款 500元", 2000);
return "500";
}).thenApplyAsync(money -> {
CommonUtil.createBehavior(String.format("服务员开发票 面额 %s", money), 1000);
return String.format("%s元发票", money);
});

CommonUtil.createBehavior("小明接到朋友电话,想一起打游戏");
CommonUtil.createBehavior(String.format("小明拿到%s,准备回家", invoice.join()));
}

public static void main(String[] args) {
method1();
System.out.println("-----------------------------");
method2();
System.out.println("-----------------------------");
method3();
System.out.println("-----------------------------");
method4();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1624810955359-1-main-小明吃好了
1624810955359-1-main-小明 结账、要求发票
1624810955405-1-main-小明接到朋友电话,想一起打游戏
1624810955405-11-ForkJoinPool.commonPool-worker-1-服务员收款 500元
1624810955512-11-ForkJoinPool.commonPool-worker-1-服务员开发票 面额 500元
1624810956534-1-main-小明拿到500元发票,准备回家
-----------------------------
1624810956534-1-main-小明吃好了
1624810956534-1-main-小明 结账、要求发票
1624810956534-1-main-小明接到朋友电话,想一起打游戏
1624810956534-11-ForkJoinPool.commonPool-worker-1-服务员收款 500元
1624810956636-13-ForkJoinPool.commonPool-worker-2-服务员开发票 面额 500元
1624810957639-1-main-小明拿到500元发票,准备回家
-----------------------------
1624810957639-1-main-小明吃好了
1624810957639-1-main-小明 结账、要求发票
1624810957640-11-ForkJoinPool.commonPool-worker-1-服务员收款 500元
1624810957641-1-main-小明接到朋友电话,想一起打游戏
1624810957745-11-ForkJoinPool.commonPool-worker-1-服务员开发票 面额 500
1624810958751-1-main-小明拿到500元发票,准备回家
-----------------------------
1624810958751-1-main-小明吃好了
1624810958751-1-main-小明 结账、要求发票
1624810958752-11-ForkJoinPool.commonPool-worker-1-服务员收款 500元
1624810958752-1-main-小明接到朋友电话,想一起打游戏
1624810960755-11-ForkJoinPool.commonPool-worker-1-服务员开发票 面额 500
1624810961758-1-main-小明拿到500元发票,准备回家

ApplyToEither

场景:
小明走出餐厅,到公交站,现在两个公交车都能回家一个700路一个800路。现在小明是哪路先来就做哪路,用代码如何描述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ApplyToEither {
public static void main(String[] args) {
CommonUtil.createBehavior("小明走出餐厅, 来到公交站");
CommonUtil.createBehavior("等待 700路 或者 800路 公交到来");

CompletableFuture<String> bus = CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("700路公交车正在赶来。。", 2000);
return "700路公交到了";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("800路公交车正在赶来。。", 1000);
return "800路公交到了";
}), first -> first);

CommonUtil.createBehavior(String.format("%s,小明坐车回家", bus.join()));
}
}
1
2
3
4
5
1624811044152-1-main-小明走出餐厅, 来到公交站
1624811044153-1-main-等待 700路 或者 800路 公交到来
1624811044197-11-ForkJoinPool.commonPool-worker-1-700路公交车正在赶来。。
1624811044197-12-ForkJoinPool.commonPool-worker-2-800路公交车正在赶来。。
1624811045212-1-main-800路公交到了,小明坐车回家

applyToEither的作用就是上个任务和这个任务一起运行,那个先运行结束,就把任务结果交给Function。


Exceptionally

场景:
小明做800路公交回家,又拿起了电话和朋友聊起了游戏,这时候公交撞大树了,这时小明打滴滴回家。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Exceptionally {
public static void main(String[] args) {
CommonUtil.createBehavior("小明走出餐厅, 来到公交站");
CommonUtil.createBehavior("等待 700路 或者 800路 公交到来");

CompletableFuture<String> bus = CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("700路公交车正在赶来。。", 2000);
return "700路公交到了";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("800路公交车正在赶来。。", 1000);
return "800路公交到了";
}), first -> {
CommonUtil.createBehavior(first);
if (first.startsWith("800")) {
throw new RuntimeException("撞树了。。");
}
return first;
}).exceptionally(e -> {
CommonUtil.createBehavior(e.getMessage());
CommonUtil.createBehavior("小明叫滴滴");
return "滴滴叫到了";
});

CommonUtil.createBehavior(String.format("%s,小明坐车回家", bus.join()));
}
}
1
2
3
4
5
6
7
8
1624811127907-1-main-小明走出餐厅, 来到公交站
1624811127908-1-main-等待 700路 或者 800路 公交到来
1624811127961-11-ForkJoinPool.commonPool-worker-1-700路公交车正在赶来。。
1624811127961-12-ForkJoinPool.commonPool-worker-2-800路公交车正在赶来。。
1624811128969-12-ForkJoinPool.commonPool-worker-2-800路公交到了
1624811128970-12-ForkJoinPool.commonPool-worker-2-java.lang.RuntimeException: 撞树了。。
1624811128970-12-ForkJoinPool.commonPool-worker-2-小明叫滴滴
1624811128978-1-main-滴滴叫到了,小明坐车回家

exceptionally就是遇到异常之后,小明处理异常的步骤,exceptionally当然不只可以加在尾部还能在上面的链式中任何一步加。相比于try-catch块来说真的简洁不少。


扩展

在上面我们学了6个API。

1
2
3
4
5
6
# supplyAsync   用来开启异步任务
# thenCompose 用来连接两个异步任务
# thenCombine 用来合并两个异步任务
# thenApply 用来做任务的后置处理
# applyToEither 用来获取最先完成的异步任务
# exceptionally 用来处理异常

以上6个API大部分都满足下面三个规则:

  • xxx(arg)
  • xxxAsync(arg)
  • xxxAsync(arg,Executor)
1
2
3
4
5
6
7
CompletableFuture<String> invoice = CompletableFuture.supplyAsync(() -> {
CommonUtil.createBehavior("服务员收款 500元", 100);
return "500";
}).thenApply(money -> {
CommonUtil.createBehavior(String.format("服务员开发票 面额 %s", money), 1000);
return String.format("%s元发票", money);
});

Apply方法就是上一个任务执行完之后去执行下一个任务,supplyAsyncthenApply中的代码都在同一个线程里运行,thenApply就是把后面的代码块放在上面同一个代码块里的去运行,CompletableFuture会把这两个代码封装成一个任务去运行,随意这两个代码在一个线程里运行。当调用thenApplyAsync方法CompletableFuture就会把这两块代码看成是独立的两个任务。我们明白了thenApplythenApplyAsync的区别,那另外的也是一样的。

由于CompletableFuture存在线程的复用,我们不能只观察线程的ID来判断。

类似方法比较

1
2
# supplyAsync 开启异步任务且提供返回值
# runAsync 开启异步任务没有返回值
1
2
3
# thenApply  接收前面任务参数且有返回值
# thenRun 不接收前面任务参数,且没有返回值
# thenAccept 接收前面任务参数但是没有返回值
1
2
3
# thenCombine      接收前面两个任务任务参数且有返回值
# thenAcceptBoth 内部消化前两个任务的结果,没有返回值
# thenRunAfterBoth 不关心前两个任务的结果且没有返回值
1
2
3
# applyToEither  会得到最快执行完任务的结果,有返回值
# AcceptEither 会得到最快执行完任务的结果,没有返回值
# RunAfterEither 既不关心最快执行完任务的结果,也没有返回值
1
2
3
# exceptionally 处理前面任务的异常并且修正成一个正常值
# handle 如果前面的程序正常执行就会收到正常执行的结果,如果前面程序异常就会接收到异常,无论前面正常或者异常,handle方法都会返回一个结果让后面程序继续执行
# whenComplete 和handle的区别是没有返回值

性能问题

任务巨多,如何保证性能?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TerribleCode {
public static void main(String[] args) {
CommonUtil.createBehavior("小明和小伙伴们 进餐厅点菜");
long startTime = System.currentTimeMillis();

ArrayList<Dish> dishes = new ArrayList<>();
for (int i = 0; i < 7; i++) {
dishes.add(new Dish("菜" + i, 1));
}

dishes.forEach(dish -> CompletableFuture.runAsync(dish::make).join());
CommonUtil.createBehavior("菜都做好了,上桌" + (System.currentTimeMillis() - startTime));
}
}
1
2
3
4
5
6
7
8
9
1624982786532-1-main-小明和小伙伴们 进餐厅点菜
1624982786581-11-ForkJoinPool.commonPool-worker-1-菜0制作完毕,来吃我把
1624982787588-11-ForkJoinPool.commonPool-worker-1-菜1制作完毕,来吃我把
1624982788588-11-ForkJoinPool.commonPool-worker-1-菜2制作完毕,来吃我把
1624982789589-11-ForkJoinPool.commonPool-worker-1-菜3制作完毕,来吃我把
1624982790593-11-ForkJoinPool.commonPool-worker-1-菜4制作完毕,来吃我把
1624982791596-11-ForkJoinPool.commonPool-worker-1-菜5制作完毕,来吃我把
1624982792599-11-ForkJoinPool.commonPool-worker-1-菜6制作完毕,来吃我把
1624982793600-1-main-菜都做好了,上桌7068

由于制作一盆菜需要1S所以七盘菜需要7S,硬生生把多线程写成了单线程还不如直接放在main线程中直接执行七次。直接把异步变成了串行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TerribleCodeImprove {
public static void main(String[] args) {
CommonUtil.createBehavior("小明和小伙伴们 进餐厅点菜");
long startTime = System.currentTimeMillis();

ArrayList<Dish> dishes = new ArrayList<>();
for (int i = 0; i < 7; i++) {
dishes.add(new Dish("菜" + i, 1));
}

// 做菜
ArrayList<CompletableFuture> cfList = new ArrayList<>();
dishes.forEach(dish -> cfList.add(CompletableFuture.runAsync(dish::make)));

// 等待所有任务执行完毕
CompletableFuture.allOf(cfList.toArray(new CompletableFuture[cfList.size()])).join();
CommonUtil.createBehavior("菜都做好了,上桌" + (System.currentTimeMillis() - startTime));
}
}
1
2
3
4
5
6
7
8
9
1624982458305-1-main-小明和小伙伴们 进餐厅点菜
1624982458355-11-ForkJoinPool.commonPool-worker-1-菜0制作完毕,来吃我把
1624982458355-12-ForkJoinPool.commonPool-worker-2-菜1制作完毕,来吃我把
1624982458355-13-ForkJoinPool.commonPool-worker-3-菜2制作完毕,来吃我把
1624982458355-14-ForkJoinPool.commonPool-worker-4-菜3制作完毕,来吃我把
1624982458355-16-ForkJoinPool.commonPool-worker-6-菜5制作完毕,来吃我把
1624982458355-15-ForkJoinPool.commonPool-worker-5-菜4制作完毕,来吃我把
1624982458355-17-ForkJoinPool.commonPool-worker-7-菜6制作完毕,来吃我把
1624982459359-1-main-菜都做好了,上桌1054

这时我们发现七盘菜1S就做好了,那同时执行的线程能有多少个呢,我们只需要每次都多加一盘菜,什么时候发现执行的时间要之前的两倍就知道同时能执行多少个线程了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    public static void main(String[] args) {
CommonUtil.createBehavior("小明和小伙伴们 进餐厅点菜");
long startTime = System.currentTimeMillis();

CompletableFuture[] dishes = IntStream.rangeClosed(0, 7)
.mapToObj(i -> new Dish("菜" + i, 1))
.map(dish -> CompletableFuture.runAsync(dish::make))
.toArray(CompletableFuture[]::new);

// 等待所有任务执行完毕
CompletableFuture.allOf(dishes).join();
CommonUtil.createBehavior("菜都做好了,上桌" + (System.currentTimeMillis() - startTime));
}
}
1
2
3
4
5
6
7
8
9
10
1624982828176-1-main-小明和小伙伴们 进餐厅点菜
1624982828250-11-ForkJoinPool.commonPool-worker-1-菜0制作完毕,来吃我把
1624982828250-12-ForkJoinPool.commonPool-worker-2-菜1制作完毕,来吃我把
1624982828250-13-ForkJoinPool.commonPool-worker-3-菜2制作完毕,来吃我把
1624982828250-15-ForkJoinPool.commonPool-worker-5-菜4制作完毕,来吃我把
1624982828251-16-ForkJoinPool.commonPool-worker-6-菜5制作完毕,来吃我把
1624982828251-14-ForkJoinPool.commonPool-worker-4-菜3制作完毕,来吃我把
1624982828251-17-ForkJoinPool.commonPool-worker-7-菜6制作完毕,来吃我把
1624982829256-15-ForkJoinPool.commonPool-worker-5-菜7制作完毕,来吃我把
1624982830258-1-main-菜都做好了,上桌2082

我们发现在第八盘菜的时候就需要2S了那这个8这个数从哪里来的呢,然后JDK开发人员就写死了8把,显然是不可能的。

1
2
3
4
5
6
7
8
9
10
public class CommonPoolSize {
public static void main(String[] args) {
// CPU核心数 - 8
System.out.println(Runtime.getRuntime().availableProcessors());
// 当前线程数 - 0
System.out.println(ForkJoinPool.commonPool().getPoolSize());
// 最大线程数 - 7
System.out.println(ForkJoinPool.commonPool().getParallelism());
}
}

这边我这台电脑是4核8线程的CPU,也就是说我这台电脑能同时运行八个任务,上面的7就是通过8-1得到的。如果你的处理器支持超线程的话就要用线程数去减了得到的结果就是CompletableFuture底层用到的线程池的最大线程数。

我们想在知道只要任务大于7就是大量任务了,假如我们项目中很多时候任务数量都是大于7的,那我们只要修改线程池的最大线程数不久好了吗,我们只需要在线程池初始化之前改掉最大线程数就好了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class GoodCode {
public static void main(String[] args) {
// 或者启动参数-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");

CommonUtil.createBehavior("小明和小伙伴们 进餐厅点菜");
long startTime = System.currentTimeMillis();

CompletableFuture[] dishes = IntStream.rangeClosed(0, 7)
.mapToObj(i -> new Dish("菜" + i, 1))
.map(dish -> CompletableFuture.runAsync(dish::make))
.toArray(CompletableFuture[]::new);

// 等待所有任务执行完毕
CompletableFuture.allOf(dishes).join();
CommonUtil.createBehavior("菜都做好了,上桌" + (System.currentTimeMillis() - startTime));
}
}
1
2
3
4
5
6
7
8
9
10
1624983453369-1-main-小明和小伙伴们 进餐厅点菜
1624983453422-11-ForkJoinPool.commonPool-worker-1-菜0制作完毕,来吃我把
1624983453423-12-ForkJoinPool.commonPool-worker-2-菜1制作完毕,来吃我把
1624983453423-14-ForkJoinPool.commonPool-worker-4-菜3制作完毕,来吃我把
1624983453423-13-ForkJoinPool.commonPool-worker-3-菜2制作完毕,来吃我把
1624983453423-16-ForkJoinPool.commonPool-worker-6-菜5制作完毕,来吃我把
1624983453423-17-ForkJoinPool.commonPool-worker-7-菜6制作完毕,来吃我把
1624983453423-15-ForkJoinPool.commonPool-worker-5-菜4制作完毕,来吃我把
1624983453423-18-ForkJoinPool.commonPool-worker-0-菜7制作完毕,来吃我把
1624983454429-1-main-菜都做好了,上桌1060

我们发现现在8盘菜只需要1S就做好了,把线程数设置一个合适的数值就能调整性能了。这个CommonPool不单单给CF使用,所以我们在开发中需要创建一个自定义的线程池来给CF使用。方便控制且互不影响。

我们也可以根据任务数量动态创建自定义线程池,当任务来了就创建线程池,当执行完之后再把线程池给销毁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CustomThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();

CommonUtil.createBehavior("小明和小伙伴们 进餐厅点菜");
long startTime = System.currentTimeMillis();

CompletableFuture[] dishes = IntStream.rangeClosed(0, 99)
.mapToObj(i -> new Dish("菜" + i, 1))
.map(dish -> CompletableFuture.runAsync(dish::make, executorService))
.toArray(CompletableFuture[]::new);

// 等待所有任务执行完毕
CompletableFuture.allOf(dishes).join();
CommonUtil.createBehavior("菜都做好了,上桌" + (System.currentTimeMillis() - startTime));
}
}
1
2
3
4
5
6
7
8
9
10
11
1624983798696-101-pool-1-thread-91-菜90制作完毕,来吃我把
1624983798696-102-pool-1-thread-92-菜91制作完毕,来吃我把
1624983798696-103-pool-1-thread-93-菜92制作完毕,来吃我把
1624983798697-104-pool-1-thread-94-菜93制作完毕,来吃我把
1624983798697-105-pool-1-thread-95-菜94制作完毕,来吃我把
1624983798697-106-pool-1-thread-96-菜95制作完毕,来吃我把
1624983798697-107-pool-1-thread-97-菜96制作完毕,来吃我把
1624983798697-108-pool-1-thread-98-菜97制作完毕,来吃我把
1624983798697-109-pool-1-thread-99-菜98制作完毕,来吃我把
1624983798697-110-pool-1-thread-100-菜99制作完毕,来吃我把
1624983799702-1-main-菜都做好了,上桌1070

我们发现创建100到菜也只需要1S,因为newCachedThreadPool可是个大胃王,他能不停的创建线程

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

如何观察任务调度的情况?

1
2
3
4
5
6
7
8
9
10
/**
* 这段代码里有两个异步任务 按道理A和B应该在不同的线程中执行
*/
public class ThenRunAsyncThreadReuse {
public static void main(String[] args) {
CompletableFuture.runAsync(() -> CommonUtil.createBehavior("A"))
.thenRunAsync(() -> CommonUtil.createBehavior("B"))
.join();
}
}
1
2
1624984232215-11-ForkJoinPool.commonPool-worker-1-A
1624984232215-11-ForkJoinPool.commonPool-worker-1-B

由于A很快就结束了,B一开始就发现有空闲线程,就直接复用A的线程了。也就是A和B在同一个线程中执行。

1
2
3
4
5
6
7
8
9
10
public class DealThenRunAsyncThreadReuse {
public static void main(String[] args) {
// 我们把线程的存活时间设置为0,代表这个线程完成任务后就立马销毁掉自己
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, new SynchronousQueue<>());

CompletableFuture.runAsync(() -> CommonUtil.createBehavior("A"), threadPoolExecutor)
.thenRunAsync(() -> CommonUtil.createBehavior("B"), threadPoolExecutor)
.join();
}
}
1
2
1624984512899-11-pool-1-thread-1-A
1624984512899-12-pool-1-thread-2-B

这样我们发现A和B永远不会在一个线程中运行,注意这种方式只能用于测试和研究不要写到生产环境去。

点此获取Demo链接