CompletableFuture原理与实践详解
前言
在现代软件开发中,随着系统复杂度和并发需求的增加,异步编程变得尤为重要。Java 8引入的CompletableFuture类,为开发者提供了强大的工具,以简化异步任务的管理和组合。本文将探讨CompletableFuture的使用方法。
一、CompletableFuture的背景与定义
1.1 Future的局限性
在Java 8之前,异步编程主要依赖于Future接口。然而,Future存在以下局限性:
- 阻塞获取结果:调用
get()方法时,线程会被阻塞,直到结果可用。这会导致线程资源的浪费,影响系统的并发性能。 - 缺乏回调机制:无法在任务完成后自动执行后续操作,需要手动检查任务状态,代码复杂且容易出错。
- 难以组合多个任务:对于需要等待多个异步任务完成或组合多个任务结果的场景,
Future提供的支持不够完善。
1.2 CompletableFuture的优势
为了解决这些问题,Java 8引入了CompletableFuture,它实现了CompletionStage和Future接口,提供了更丰富的API,支持异步回调和任务组合。CompletableFuture具有以下优势:
- 非阻塞获取结果:通过回调函数,在任务完成后自动触发后续操作,无需阻塞等待。
- 丰富的任务组合:提供了多种方法来组合多个异步任务,如
thenApply、thenCombine、thenCompose,allOf、anyOf等。 - 完善的异常处理:内置了异常处理机制,如
exceptionally、handle等方法,方便地处理异步任务中的异常情况。 - 灵活的执行控制:支持自定义线程池,可以更精确地控制任务的执行环境。
二、CompletableFuture的核心方法
2.1 创建异步任务
CompletableFuture提供了两种方法来创建异步任务:
2.1.1 supplyAsync – 创建有返回值的异步任务
// 使用默认线程池(ForkJoinPool.commonPool())
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
// 执行异步操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return \"异步任务结果\";
});
// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
return \"使用自定义线程池的结果\";
}, executor);
2.1.2 runAsync – 创建无返回值的异步任务
// 使用默认线程池
CompletableFuture future = CompletableFuture.runAsync(() -> {
// 执行异步操作,不返回结果
System.out.println(\"执行异步任务\");
});
// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture future2 = CompletableFuture.runAsync(() -> {
System.out.println(\"使用自定义线程池执行\");
}, executor);
注意:在实际生产环境中,建议使用自定义线程池,因为默认的ForkJoinPool.commonPool()在高并发场景下可能无法满足需求。
2.2 异步回调处理
CompletableFuture提供了丰富的回调方法,以便在任务完成后执行特定操作:
2.2.1 thenApply – 对结果进行转换
thenApply方法接收一个函数,将前一个阶段的结果转换为新的结果。它有两个版本:
thenApply:在完成当前任务的同一个线程中执行thenApplyAsync:在新的线程中执行(使用默认线程池或自定义线程池)
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return \"Hello\";
});
// 同步版本
CompletableFuture result1 = future.thenApply(s -> {
return s + \" World\";
});
// 异步版本(使用默认线程池)
CompletableFuture result2 = future.thenApplyAsync(s -> {
return s + \" World\";
});
// 异步版本(使用自定义线程池)
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture result3 = future.thenApplyAsync(s -> {
return s + \" World\";
}, executor);
2.2.2 thenAccept – 消费结果,无返回值
thenAccept方法接收一个消费者函数,用于消费前一个阶段的结果,但不返回新值。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return \"Hello World\";
});
// 同步版本
future.thenAccept(result -> {
System.out.println(\"消费结果: \" + result);
});
// 异步版本
future.thenAcceptAsync(result -> {
System.out.println(\"消费结果: \" + result);
});
// 异步版本(使用自定义线程池)
ExecutorService executor = Executors.newFixedThreadPool(10);
future.thenAcceptAsync(result ->{
System.out.println(\"消费结果:\" + result);
},executor);
2.2.3 thenRun – 不关心结果,执行后续操作
thenRun方法接收一个Runnable,在前一个阶段完成后执行,但不关心前一个阶段的结果。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return \"Hello World\";
});
// 同步版本
future.thenRun(() -> {
System.out.println(\"任务完成后执行清理操作\");
});
// 异步版本
ExecutorService executor = Executors.newFixedThreadPool(10);
future.thenRunAsync(() -> {
System.out.println(\"任务完成后执行清理操作\");
},executor);
2.2.4 thenCompose – 将两个异步操作串联
thenCompose方法用于将两个异步操作串联起来,前一个操作的结果作为后一个操作的输入。这类似于流式处理。(与下面的thenCombine区别在于:这个方法主要用于异步任务的串行编排。简单来说,thenCompose() 用于将一个异步任务的结果作为输入,继续发起一个新的异步任务,形成一条异步流水线。thenCompose() 把两个 Future 合并成一个连续的 Future,而不是thenCombine()形成的嵌套 CompletableFuture)
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
return \"Hello\";
});
CompletableFuture future2 = future1.thenCompose(result -> {
return CompletableFuture.supplyAsync(() -> {
return result + \" World\";
});
});
// 结果:future2 的值为 \"Hello World\"
2.2.5 thenCombine – 合并两个并行任务的结果,返回参数
thenCombine方法用于合并两个并行的异步任务的结果。
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
return \"Hello\";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
return \"World\";
});
CompletableFuture combined = future1.thenCombine(future2, (result1, result2) -> {
return result1 + \" \" + result2;
});
// 结果:combined 的值为 \"Hello World\"
2.2.6thenAcceptBoth – 方法用于合并两个并行任务的结果,不返回参数
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + \" cf1 do something....\");
return 1;
});
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + \" cf2 do something....\");
return 2;
});
CompletableFuture cf3 = cf1.thenAcceptBoth(cf2, (a, b) -> {
System.out.println(Thread.currentThread() + \" cf3 do something....\");
System.out.println(a + b);
});
2.2.7runAfterBoth
runAfterBoth没有入参,也没有返回值。两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + \" cf1 do something....\");
return 1;
});
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + \" cf2 do something....\");
return 2;
});
CompletableFuture cf3 = cf1.runAfterBoth(cf2, () -> {
System.out.println(Thread.currentThread() + \" cf3 do something....\");
});
2.3 组合多个异步任务
2.3.1 allOf – 等待所有任务完成
allOf方法创建一个新的CompletableFuture,当所有给定的CompletableFuture都完成时,它才会完成。
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
return \"结果1\";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
return \"结果2\";
});
CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
return \"结果3\";
});
CompletableFuture allFutures = CompletableFuture.allOf(future1, future2, future3);
// 等待所有任务完成
allFutures.thenRun(() -> {
System.out.println(\"所有任务已完成\");
// 获取各个任务的结果
String result1 = future1.join();
String result2 = future2.join();
String result3 = future3.join();
System.out.println(\"结果1: \" + result1);
System.out.println(\"结果2: \" + result2);
System.out.println(\"结果3: \" + result3);
});
2.3.2 anyOf – 任意一个任务完成即可继续
anyOf方法创建一个新的CompletableFuture,当任意一个给定的CompletableFuture完成时,它就会完成。
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return \"结果1\";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return \"结果2\";
});
CompletableFuture
2.4 异常处理
CompletableFuture提供了完善的异常处理机制:
2.4.1 exceptionally – 处理异常并返回默认值
exceptionally方法用于处理异常情况,当任务抛出异常时,会执行提供的异常处理函数,并返回一个默认值。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
// 模拟可能抛出异常的操作
if (Math.random() > 0.5) {
throw new RuntimeException(\"发生异常\");
}
return \"成功\";
});
CompletableFuture result = future.exceptionally(ex -> {
System.out.println(\"捕获异常: \" + ex.getMessage());
return \"默认值\";
});
// 无论是否发生异常,都能获取到结果
String value = result.join();
2.4.2 handle – 同时处理正常结果和异常(有返回值)
handle方法可以同时处理正常结果和异常情况,无论任务成功还是失败,都会执行提供的处理函数。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException(\"发生异常\");
}
return \"成功\";
});
CompletableFuture result = future.handle((value, ex) -> {
if (ex != null) {
System.out.println(\"处理异常: \" + ex.getMessage());
return \"异常时的默认值\";
}
return \"处理后的值: \" + value;
});
2.4.3 whenComplete – 观察结果和异常,但不修改结果(无返回值)
whenComplete方法类似于handle,但它只用于观察结果和异常,不能修改返回的值。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException(\"发生异常\");
}
return \"成功\";
});
CompletableFuture result = future.whenComplete((value, ex) -> {
if (ex != null) {
System.out.println(\"观察到异常: \" + ex.getMessage());
} else {
System.out.println(\"观察结果: \" + value);
}
});
2.5 其他实用方法
2.5.1 get – 获取结果或抛出检查异常
-
抛出经检查的异常(Checked Exceptions) :
包括 InterruptedException 和 ExecutionException。这意味着你需要在方法签名中声明这些异常或在调用处捕获它们。
-
支持超时参数:允许你指定等待结果的最大时间,这对于避免无限期等待某些操作完成特别有用。
try {
String result = future.get(1, TimeUnit.SECONDS); // 等待最多1秒
} catch (InterruptedException e) {
// 当前线程被中断时执行
} catch (ExecutionException e) {
// 当Future完成时发生异常
} catch (TimeoutException e) {
// 超过指定时间未完成任务
}
2.5.2 getNow – 立即获取结果或返回默认值
getNow方法用于立即获取结果,如果任务尚未完成,则返回提供的默认值。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return \"结果\";
});
// 立即尝试获取结果,如果未完成则返回默认值
String result = future.getNow(\"默认值\");
2.5.3 join – 等待结果完成(不抛出检查异常)
join方法类似于get(),但不会抛出检查异常(ExecutionException、InterruptedException),只会抛出运行时异常。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return \"结果\";
});
// 等待结果,不抛出检查异常
String result = future.join();
2.5.4 complete – 手动完成Future
complete方法用于手动完成CompletableFuture,如果任务尚未完成,则将其设置为已完成状态,并返回结果。
CompletableFuture future = new CompletableFuture();
// 手动完成Future
future.complete(\"手动完成的结果\");
String result = future.join(); // \"手动完成的结果\"
2.5.5 completeExceptionally – 手动完成Future并设置异常
completeExceptionally方法用于手动完成CompletableFuture,但设置为异常状态。
CompletableFuture future = new CompletableFuture();
// 手动设置为异常状态
future.completeExceptionally(new RuntimeException(\"手动设置异常\"));
try {
future.join();
} catch (Exception e) {
System.out.println(\"捕获异常: \" + e.getMessage());
}
三、CompletableFuture的实现原理
3.1 核心数据结构
CompletableFuture的内部实现依赖于以下几个核心组件:
3.1.1 result字段
CompletableFuture内部维护一个volatile Object result字段,用于存储任务的结果。这个字段可以是:
- 正常的结果值
- 异常对象(包装在
AltResult中) CompletableFuture本身(用于依赖关系)
3.1.2 Completion链表
CompletableFuture维护一个volatile Completion stack字段,这是一个栈结构的链表,用于存储所有依赖于当前Future完成的后续操作(Completion)。
// 简化的内部结构
static abstract class Completion extends ForkJoinTask
implements Runnable, AsynchronousCompletionTask {
volatile Completion next; // 链表中的下一个节点
abstract CompletableFuture tryFire(int mode);
abstract boolean isLive();
}
3.2 任务链的构建
当调用thenApply、thenAccept等方法时,CompletableFuture会创建一个新的Completion节点,并将其添加到当前Future的stack链表中:
// 简化的thenApply实现逻辑
public CompletableFuture thenApply(Function<? super T,? extends U> fn) {
CompletableFuture dep = new CompletableFuture();
Completion c = new UniApply(this, dep, fn);
// 将c压入stack栈
push(c);
// 尝试触发执行
c.tryFire(SYNC);
return dep;
}
3.3 非阻塞的结果获取
CompletableFuture通过内部的Completion机制实现非阻塞的结果获取:
- 任务完成时触发:当异步任务完成时,会调用
complete或completeExceptionally方法。 - 遍历Completion栈:在完成时,会遍历
stack中的所有Completion节点。 - 尝试执行后续操作:对每个
Completion节点调用tryFire方法,尝试执行后续的回调操作。 - 线程池执行:如果是异步执行,会将任务提交到线程池中执行。
3.4 线程池的使用
- 默认线程池:如果不指定线程池,
CompletableFuture使用ForkJoinPool.commonPool()作为默认的线程池。 - 自定义线程池:可以通过方法的第二个参数传入自定义的
Executor,以更好地控制线程资源。
3.5 异步执行的实现
CompletableFuture的异步执行主要通过以下方式实现:
- 提交到线程池:当调用
supplyAsync、runAsync等方法时,会将任务提交到线程池执行。 - 回调执行:当使用
thenApplyAsync等方法时,回调函数会在新的线程中执行。 - 工作窃取:
ForkJoinPool使用工作窃取算法,可以更高效地利用多核CPU。
四、实际应用案例
4.1 案例:美团外卖商家端API的异步化改造
美团技术团队在外卖商家端API服务的异步化改造中,成功应用了CompletableFuture,显著提升了系统性能。
4.1.1 改造前的痛点
改造前,系统采用同步调用方式,从多个下游服务获取数据:
// 改造前的同步调用方式
public ApiResponse getShopInfo(Long shopId) {
// 同步调用多个下游服务
ShopBasicInfo basicInfo = shopService.getBasicInfo(shopId); // 耗时 50ms
ShopStatistics statistics = shopService.getStatistics(shopId); // 耗时 100ms
ShopActivity activity = activityService.getActivity(shopId); // 耗时 80ms
// 组装返回结果
return ApiResponse.success()
.setBasicInfo(basicInfo)
.setStatistics(statistics)
.setActivity(activity);
}
存在的问题:
- 总耗时 = 50ms + 100ms + 80ms = 230ms
- 线程阻塞时间长,资源利用率低
- 随着下游服务增加,响应时间线性增长
4.1.2 改造后的异步化方案
改造后,使用CompletableFuture实现并行调用:
// 改造后的异步调用方式
public ApiResponse getShopInfo(Long shopId) {
// 并行调用多个下游服务
CompletableFuture basicInfoFuture =
CompletableFuture.supplyAsync(() ->
shopService.getBasicInfo(shopId), executor);
CompletableFuture statisticsFuture =
CompletableFuture.supplyAsync(() ->
shopService.getStatistics(shopId), executor);
CompletableFuture activityFuture =
CompletableFuture.supplyAsync(() ->
activityService.getActivity(shopId), executor);
// 等待所有任务完成并组装结果
CompletableFuture responseFuture =
CompletableFuture.allOf(basicInfoFuture, statisticsFuture, activityFuture)
.thenApply(v -> {
try {
ShopBasicInfo basicInfo = basicInfoFuture.get();
ShopStatistics statistics = statisticsFuture.get();
ShopActivity activity = activityFuture.get();
return ApiResponse.success()
.setBasicInfo(basicInfo)
.setStatistics(statistics)
.setActivity(activity);
} catch (Exception e) {
throw new RuntimeException(\"获取数据失败\", e);
}
})
.exceptionally(ex -> {
// 异常处理
log.error(\"获取店铺信息失败\", ex);
return ApiResponse.error(\"获取店铺信息失败\");
});
return responseFuture.join();
}
改造效果:
- 总耗时 ≈ max(50ms, 100ms, 80ms) = 100ms(约提升2.3倍)
- 线程资源利用率提升,CPU利用率提高
- 系统吞吐量显著提升,满足业务增长需求
4.1.3 进一步优化:部分失败容错
在实际场景中,还可以实现部分失败的容错机制:
public ApiResponse getShopInfo(Long shopId) {
CompletableFuture basicInfoFuture =
CompletableFuture.supplyAsync(() ->
shopService.getBasicInfo(shopId), executor)
.exceptionally(ex -> {
log.warn(\"获取基础信息失败,使用默认值\", ex);
return ShopBasicInfo.defaultInfo();
});
CompletableFuture statisticsFuture =
CompletableFuture.supplyAsync(() ->
shopService.getStatistics(shopId), executor)
.exceptionally(ex -> {
log.warn(\"获取统计信息失败,使用默认值\", ex);
return ShopStatistics.empty();
});
CompletableFuture activityFuture =
CompletableFuture.supplyAsync(() ->
activityService.getActivity(shopId), executor)
.exceptionally(ex -> {
log.warn(\"获取活动信息失败,使用默认值\", ex);
return ShopActivity.empty();
});
return CompletableFuture.allOf(basicInfoFuture, statisticsFuture, activityFuture)
.thenApply(v -> {
try {
return ApiResponse.success()
.setBasicInfo(basicInfoFuture.get())
.setStatistics(statisticsFuture.get())
.setActivity(activityFuture.get());
} catch (Exception e) {
throw new RuntimeException(\"组装数据失败\", e);
}
})
.join();
}
4.2 案例:RPC框架中的异步调用
在RPC框架中,CompletableFuture可以用于优化客户端接收服务端返回结果的处理方式。
4.2.1 使用AttributeMap的原始方式
// 客户端发送请求
public Object sendRequest(RpcRequest request) {
// 将请求ID和响应结果绑定到Channel的AttributeMap
channel.attr(AttributeKey.valueOf(request.getRequestId()))
.set(new AtomicReference());
channel.writeAndFlush(request);
// 阻塞等待响应
channel.closeFuture().sync();
// 从AttributeMap获取结果
AtomicReference responseRef = channel.attr(
AttributeKey.valueOf(request.getRequestId())).get();
return responseRef.get().getData();
}
问题:需要手动阻塞等待,代码不清晰。
4.2.2 使用CompletableFuture优化
// 客户端发送请求
public CompletableFuture sendRequest(RpcRequest request) {
CompletableFuture future = new CompletableFuture();
// 将future存储到未处理请求的Map中
unprocessedRequests.put(request.getRequestId(), future);
// 发送请求
channel.writeAndFlush(request);
return future;
}
// 客户端处理器接收响应
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof RpcResponse) {
RpcResponse response = (RpcResponse) msg;
CompletableFuture future =
unprocessedRequests.remove(response.getRequestId());
if (future != null) {
// 完成Future,触发回调
future.complete(response);
}
}
}
// 使用方式
CompletableFuture future = rpcClient.sendRequest(request);
RpcResponse response = future.get(); // 或者使用future.thenAccept()进行异步处理
优势:
- 代码更清晰,语义更明确
- 支持异步回调处理
- 不需要手动阻塞等待
五、最佳实践与注意事项
5.1 线程池管理
5.1.1 避免使用默认线程池
在高并发场景下,默认的ForkJoinPool.commonPool()可能无法满足需求,建议使用自定义线程池:
// 创建自定义线程池
ExecutorService executor = new ThreadPoolExecutor(
10, // 核心线程数
50, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue(1000), // 任务队列
new ThreadFactoryBuilder() // 线程工厂
.setNameFormat(\"async-task-%d\")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 使用自定义线程池
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return \"结果\";
}, executor);
5.1.2 线程池大小设置
线程池大小的设置需要考虑:
- CPU密集型任务:线程数 = CPU核数 + 1
- IO密集型任务:线程数 = CPU核数 × (1 + IO等待时间/CPU计算时间)
// 计算合适的线程数
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maxPoolSize = corePoolSize * 2;
ExecutorService executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue(1000)
);
5.2 异常处理
5.2.1 不要忽略异常
异步任务中的异常可能被吞噬,必须进行适当的处理:
// 错误示例:可能忽略异常
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
// 可能抛出异常
return riskyOperation();
});
future.thenAccept(result -> {
// 如果上面抛出异常,这里不会执行
System.out.println(result);
});
// 正确示例:处理异常
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return riskyOperation();
})
.exceptionally(ex -> {
log.error(\"操作失败\", ex);
return \"默认值\";
})
.thenAccept(result -> {
System.out.println(result);
});
5.2.2 使用handle统一处理
对于复杂的异常处理场景,可以使用handle方法:
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return riskyOperation();
})
.handle((result, ex) -> {
if (ex != null) {
// 处理异常
log.error(\"操作失败\", ex);
// 根据异常类型返回不同的默认值
if (ex instanceof TimeoutException) {
return \"超时默认值\";
} else {
return \"通用默认值\";
}
}
// 处理正常结果
return processResult(result);
});
5.3 避免阻塞调用
5.3.1 避免在异步任务中阻塞
在CompletableFuture的异步任务中,应该避免长时间阻塞的操作:
// 错误示例:在异步任务中阻塞
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
// 阻塞调用,占用线程池线程
return blockingDatabaseCall();
}, executor);
// 正确示例:将阻塞操作放在专门的线程池
ExecutorService blockingExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat(\"blocking-task-%d\")
.build()
);
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return blockingDatabaseCall();
}, blockingExecutor);
5.3.2 使用get()时的超时设置
如果需要使用get()方法,应该设置超时时间:
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return longRunningOperation();
});
try {
// 设置超时时间,避免无限等待
String result = future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// 处理超时
future.cancel(true);
log.error(\"操作超时\", e);
} catch (Exception e) {
log.error(\"操作失败\", e);
}
5.4 任务链的设计
5.4.1 避免过长的任务链
过长的任务链会增加调试和维护的复杂度,应该合理拆分:
// 错误示例:任务链过长
CompletableFuture result = future1
.thenApply(v1 -> process1(v1))
.thenApply(v2 -> process2(v2))
.thenApply(v3 -> process3(v3))
.thenApply(v4 -> process4(v4))
.thenApply(v5 -> process5(v5))
.thenApply(v6 -> process6(v6));
// 正确示例:合理拆分
CompletableFuture stage1 = future1
.thenApply(v1 -> process1(v1))
.thenApply(v2 -> process2(v2))
.thenApply(v3 -> process3(v3));
CompletableFuture stage2 = stage1
.thenApply(v4 -> process4(v4))
.thenApply(v5 -> process5(v5))
.thenApply(v6 -> process6(v6));
5.4.2 合理使用同步和异步方法
根据业务需求,选择合适的同步或异步方法:
// 如果后续操作是轻量级的,可以使用同步方法
CompletableFuture result = future
.thenApply(v -> v.toUpperCase()) // 轻量级操作,使用同步方法
// 如果后续操作是重量级的,应该使用异步方法
CompletableFuture result2 = future
.thenApplyAsync(v -> heavyOperation(v), executor); // 重量级操作,使用异步方法
5.5 资源清理
5.5.1 及时取消不需要的任务
如果任务不再需要,应该及时取消,释放资源:
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return longRunningOperation();
});
// 如果任务不再需要
if (someCondition) {
future.cancel(true); // 取消任务
}
5.5.2 关闭线程池
在应用关闭时,应该关闭自定义的线程池:
@PreDestroy
public void destroy() {
if (executor != null && !executor.isShutdown()) {
executor.shutdown();
try {
// 等待任务完成,最多等待30秒
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
// 强制关闭
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
六、总结
CompletableFuture作为Java 8引入的异步编程工具,为开发者提供了强大的功能,以简化异步任务的管理和组合。通过合理地使用CompletableFuture,可以:
- 提升系统性能:通过并行执行多个异步任务,显著减少总耗时
- 提高资源利用率:避免线程阻塞,提高CPU利用率
- 改善代码可读性:使用链式调用,使异步代码更加清晰易懂
- 增强系统稳定性:通过完善的异常处理机制,提高系统的容错能力
在实际应用中,需要注意:
- 线程池管理:根据业务场景选择合适的线程池配置
- 异常处理:确保所有异常都得到适当处理
- 资源清理:及时取消不需要的任务,关闭线程池
- 性能监控:监控线程池状态和任务执行情况,及时发现问题
通过掌握CompletableFuture的原理和使用方法,结合实际业务场景进行合理应用,可以构建高效、稳定的异步系统。
参考资料
- CompletableFuture使用详解(全网看这一篇就行)
- CompletableFuture原理与实践-外卖商家端API的异步化
- CompletableFuture详解
- Java 8 CompletableFuture官方文档



还没有评论呢,快来抢沙发~