JDK8 CompletableFuture异步编程实战
目录
- 一、CompletableFuture概述
- 二、创建CompletableFuture
- 三、结果处理与转换
- 四、多任务组合
- 五、异常处理
- 六、线程池配置
- 七、实战案例:电商系统
- 八、实战案例:批量任务处理
- 九、性能优化与最佳实践
- 十、常见问题与陷阱
- 总结
一、CompletableFuture概述
1.1 为什么需要CompletableFuture
在JDK 8之前,Java异步编程主要依赖Future和Callable,但存在诸多不足:
Future的局限性:
+---------------------------+
| 无法手动完成 |
| 无法链式调用 |
| 无法组合多个Future |
| 无法处理异常 |
| 阻塞式获取结果 |
+---------------------------+
CompletableFuture的优势:
+---------------------------+
| 支持手动完成 |
| 支持链式调用 |
| 多个Future组合 |
| 完善的异常处理 |
| 非阻塞式编程 |
+---------------------------+
package com.example.completablefuture;
import java.util.concurrent.*;
/**
* Future vs CompletableFuture对比
*/
public class FutureVsCompletableFuture {
public static void main(String[] args) throws Exception {
// === 1. 传统Future的问题 ===
System.out.println(\"=== 传统Future ===\");
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交任务
Future future = executor.submit(() -> {
Thread.sleep(1000);
return \"Hello Future\";
});
// 只能阻塞等待结果
System.out.println(\"阻塞等待结果...\");
String result = future.get(); // 阻塞
System.out.println(\"结果: \" + result);
// === 2. CompletableFuture的优势 ===
System.out.println(\"n=== CompletableFuture ===\");
// 异步执行
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return \"Hello CompletableFuture\";
});
// 非阻塞式处理结果
cf.thenAccept(res -> System.out.println(\"结果: \" + res));
// 链式调用
CompletableFuture chain = CompletableFuture
.supplyAsync(() -> \"Step 1\")
.thenApply(s -> s + \" -> Step 2\")
.thenApply(s -> s + \" -> Step 3\");
System.out.println(\"链式调用结果: \" + chain.get());
executor.shutdown();
// 等待异步任务完成
Thread.sleep(2000);
}
}
1.2 CompletableFuture核心方法分类
方法分类:
+------------------+------------------------+--------------------+
| 类型 | 方法 | 说明 |
+------------------+------------------------+--------------------+
| 创建 | supplyAsync | 有返回值 |
| | runAsync | 无返回值 |
+------------------+------------------------+--------------------+
| 转换 | thenApply | 同步转换 |
| | thenApplyAsync | 异步转换 |
+------------------+------------------------+--------------------+
| 消费 | thenAccept | 消费结果 |
| | thenRun | 不接收结果 |
+------------------+------------------------+--------------------+
| 组合 | thenCompose | 串行组合 |
| | thenCombine | 并行组合 |
| | allOf | 等待所有完成 |
| | anyOf | 等待任一完成 |
+------------------+------------------------+--------------------+
| 异常处理 | exceptionally | 异常处理 |
| | handle | 正常+异常处理 |
| | whenComplete | 完成时回调 |
+------------------+------------------------+--------------------+
二、创建CompletableFuture
2.1 基本创建方式
package com.example.completablefuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* CompletableFuture创建方式
*/
public class CreateCompletableFuture {
public static void main(String[] args) throws Exception {
// === 1. supplyAsync - 有返回值的异步任务 ===
System.out.println(\"=== supplyAsync ===\");
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
System.out.println(\"执行异步任务: \" + Thread.currentThread().getName());
return \"Hello\";
});
System.out.println(\"结果: \" + future1.get());
// === 2. runAsync - 无返回值的异步任务 ===
System.out.println(\"n=== runAsync ===\");
CompletableFuture future2 = CompletableFuture.runAsync(() -> {
System.out.println(\"执行无返回值任务: \" + Thread.currentThread().getName());
});
future2.get(); // 等待完成
// === 3. 使用自定义线程池 ===
System.out.println(\"n=== 自定义线程池 ===\");
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
System.out.println(\"自定义线程池: \" + Thread.currentThread().getName());
return \"Custom Thread Pool\";
}, executor);
System.out.println(\"结果: \" + future3.get());
// === 4. completedFuture - 已完成的Future ===
System.out.println(\"n=== completedFuture ===\");
CompletableFuture completed = CompletableFuture.completedFuture(\"立即完成\");
System.out.println(\"结果: \" + completed.get());
// === 5. 手动完成 ===
System.out.println(\"n=== 手动完成 ===\");
CompletableFuture manual = new CompletableFuture();
// 在另一个线程中完成
new Thread(() -> {
try {
Thread.sleep(1000);
manual.complete(\"手动完成的结果\");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println(\"等待手动完成...\");
System.out.println(\"结果: \" + manual.get());
executor.shutdown();
}
}
三、结果处理与转换
3.1 thenApply、thenAccept、thenRun
package com.example.completablefuture;
import java.util.concurrent.CompletableFuture;
/**
* 结果处理与转换
*/
public class ResultProcessing {
public static void main(String[] args) throws Exception {
// === 1. thenApply - 转换结果(有返回值) ===
System.out.println(\"=== thenApply ===\");
CompletableFuture future1 = CompletableFuture
.supplyAsync(() -> {
System.out.println(\"步骤1: 返回10\");
return 10;
})
.thenApply(num -> {
System.out.println(\"步骤2: \" + num + \" * 2\");
return num * 2;
})
.thenApply(num -> {
System.out.println(\"步骤3: \" + num + \" + 5\");
return num + 5;
});
System.out.println(\"最终结果: \" + future1.get()); // 25
// === 2. thenAccept - 消费结果(无返回值) ===
System.out.println(\"n=== thenAccept ===\");
CompletableFuture future2 = CompletableFuture
.supplyAsync(() -> \"Hello World\")
.thenAccept(str -> {
System.out.println(\"消费结果: \" + str);
});
future2.get();
// === 3. thenRun - 不关心结果(无返回值) ===
System.out.println(\"n=== thenRun ===\");
CompletableFuture future3 = CompletableFuture
.supplyAsync(() -> \"Some Result\")
.thenRun(() -> {
System.out.println(\"任务完成,执行后续操作\");
});
future3.get();
// === 4. 同步 vs 异步 ===
System.out.println(\"n=== 同步 vs 异步 ===\");
// thenApply: 同步执行(使用相同线程)
CompletableFuture.supplyAsync(() -> {
System.out.println(\"supplyAsync: \" + Thread.currentThread().getName());
return \"Result\";
}).thenApply(s -> {
System.out.println(\"thenApply: \" + Thread.currentThread().getName());
return s.toUpperCase();
}).get();
// thenApplyAsync: 异步执行(使用不同线程)
CompletableFuture.supplyAsync(() -> {
System.out.println(\"supplyAsync: \" + Thread.currentThread().getName());
return \"Result\";
}).thenApplyAsync(s -> {
System.out.println(\"thenApplyAsync: \" + Thread.currentThread().getName());
return s.toUpperCase();
}).get();
// === 5. 链式调用示例 ===
demonstrateChaining();
}
/**
* 链式调用示例
*/
private static void demonstrateChaining() throws Exception {
System.out.println(\"n=== 链式调用示例 ===\");
String result = CompletableFuture
.supplyAsync(() -> \"hello\")
.thenApply(String::toUpperCase)
.thenApply(s -> s + \" WORLD\")
.thenApply(s -> s + \"!\")
.get();
System.out.println(\"链式调用结果: \" + result);
}
}
四、多任务组合
4.1 thenCompose与thenCombine
package com.example.completablefuture;
import java.util.concurrent.CompletableFuture;
/**
* 多任务组合
*/
public class TaskCombination {
public static void main(String[] args) throws Exception {
// === 1. thenCompose - 串行组合(依赖) ===
System.out.println(\"=== thenCompose (串行) ===\");
CompletableFuture compose = CompletableFuture
.supplyAsync(() -> {
System.out.println(\"任务1: 查询用户ID\");
return \"USER_001\";
})
.thenCompose(userId -> CompletableFuture.supplyAsync(() -> {
System.out.println(\"任务2: 根据\" + userId + \"查询用户详情\");
return \"User{name=\'张三\', id=\'\" + userId + \"\'}\";
}));
System.out.println(\"结果: \" + compose.get());
// === 2. thenCombine - 并行组合(独立) ===
System.out.println(\"n=== thenCombine (并行) ===\");
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
System.out.println(\"任务1: 查询用户信息\");
return \"User: 张三\";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
System.out.println(\"任务2: 查询订单信息\");
return \"Order: ORD001\";
});
CompletableFuture combined = future1.thenCombine(future2, (user, order) -> {
System.out.println(\"合并结果\");
return user + \" | \" + order;
});
System.out.println(\"组合结果: \" + combined.get());
// === 3. allOf - 等待所有任务完成 ===
System.out.println(\"n=== allOf ===\");
CompletableFuture task1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return \"任务1完成\";
});
CompletableFuture task2 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return \"任务2完成\";
});
CompletableFuture task3 = CompletableFuture.supplyAsync(() -> {
sleep(1500);
return \"任务3完成\";
});
long start = System.currentTimeMillis();
CompletableFuture allOfFuture = CompletableFuture.allOf(task1, task2, task3);
allOfFuture.get(); // 等待所有完成
long end = System.currentTimeMillis();
System.out.println(\"所有任务完成,耗时: \" + (end - start) + \"ms\");
System.out.println(\"结果1: \" + task1.get());
System.out.println(\"结果2: \" + task2.get());
System.out.println(\"结果3: \" + task3.get());
// === 4. anyOf - 等待任一任务完成 ===
System.out.println(\"n=== anyOf ===\");
CompletableFuture slow = CompletableFuture.supplyAsync(() -> {
sleep(3000);
return \"慢任务\";
});
CompletableFuture fast = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return \"快任务\";
});
CompletableFuture
4.2 复杂组合示例
package com.example.completablefuture;
import java.util.concurrent.CompletableFuture;
/**
* 复杂任务组合示例
*/
public class ComplexCombination {
public static void main(String[] args) throws Exception {
// 场景: 电商商品详情页数据聚合
// 需要并行获取: 商品基本信息、库存信息、评价信息、推荐商品
System.out.println(\"=== 商品详情页数据聚合 ===n\");
long start = System.currentTimeMillis();
// 1. 查询商品基本信息
CompletableFuture productInfo = CompletableFuture.supplyAsync(() -> {
sleep(500);
System.out.println(\" 获取商品基本信息\");
return \"iPhone 15 Pro\";
});
// 2. 查询库存信息
CompletableFuture stockInfo = CompletableFuture.supplyAsync(() -> {
sleep(300);
System.out.println(\" 获取库存信息\");
return 100;
});
// 3. 查询评价信息
CompletableFuture reviewInfo = CompletableFuture.supplyAsync(() -> {
sleep(600);
System.out.println(\" 获取评价信息\");
return \"4.8分(1000评价)\";
});
// 4. 查询推荐商品
CompletableFuture recommendInfo = CompletableFuture.supplyAsync(() -> {
sleep(400);
System.out.println(\" 获取推荐商品\");
return \"推荐: AirPods Pro\";
});
// 组合所有结果
CompletableFuture result = productInfo
.thenCombine(stockInfo, (product, stock) -> {
ProductDetail detail = new ProductDetail();
detail.setProductName(product);
detail.setStock(stock);
return detail;
})
.thenCombine(reviewInfo, (detail, review) -> {
detail.setReview(review);
return detail;
})
.thenCombine(recommendInfo, (detail, recommend) -> {
detail.setRecommend(recommend);
return detail;
});
ProductDetail detail = result.get();
long end = System.currentTimeMillis();
System.out.println(\"n=== 商品详情 ===\");
System.out.println(detail);
System.out.println(\"n总耗时: \" + (end - start) + \"ms (并行执行)\");
System.out.println(\"如果串行执行需要: ~1800ms\");
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 商品详情VO
*/
class ProductDetail {
private String productName;
private Integer stock;
private String review;
private String recommend;
// Getters and Setters
public void setProductName(String productName) { this.productName = productName; }
public void setStock(Integer stock) { this.stock = stock; }
public void setReview(String review) { this.review = review; }
public void setRecommend(String recommend) { this.recommend = recommend; }
@Override
public String toString() {
return \"商品名称: \" + productName + \"n\" +
\"库存数量: \" + stock + \"n\" +
\"用户评价: \" + review + \"n\" +
\"推荐商品: \" + recommend;
}
}
五、异常处理
5.1 异常处理方法
package com.example.completablefuture;
import java.util.concurrent.CompletableFuture;
/**
* 异常处理
*/
public class ExceptionHandling {
public static void main(String[] args) throws Exception {
// === 1. exceptionally - 异常处理 ===
System.out.println(\"=== exceptionally ===\");
CompletableFuture future1 = CompletableFuture
.supplyAsync(() -> {
if (true) {
throw new RuntimeException(\"发生异常\");
}
return \"正常结果\";
})
.exceptionally(ex -> {
System.out.println(\"捕获异常: \" + ex.getMessage());
return \"异常时的默认值\";
});
System.out.println(\"结果: \" + future1.get());
// === 2. handle - 同时处理正常和异常 ===
System.out.println(\"n=== handle ===\");
CompletableFuture future2 = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException(\"随机异常\");
}
return \"成功\";
})
.handle((result, ex) -> {
if (ex != null) {
System.out.println(\"处理异常: \" + ex.getMessage());
return \"异常处理结果\";
}
System.out.println(\"处理正常结果: \" + result);
return result + \" (已处理)\";
});
System.out.println(\"结果: \" + future2.get());
// === 3. whenComplete - 完成时回调(不改变结果) ===
System.out.println(\"n=== whenComplete ===\");
CompletableFuture future3 = CompletableFuture
.supplyAsync(() -> \"原始结果\")
.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println(\"发生异常: \" + ex.getMessage());
} else {
System.out.println(\"正常完成: \" + result);
}
});
System.out.println(\"结果: \" + future3.get());
// === 4. 异常传播 ===
System.out.println(\"n=== 异常传播 ===\");
CompletableFuture future4 = CompletableFuture
.supplyAsync(() -> {
throw new RuntimeException(\"第一步异常\");
})
.thenApply(s -> {
System.out.println(\"这行不会执行\");
return s.toUpperCase();
})
.thenApply(s -> {
System.out.println(\"这行也不会执行\");
return s + \"!\";
})
.exceptionally(ex -> {
System.out.println(\"最终捕获: \" + ex.getMessage());
return \"恢复值\";
});
System.out.println(\"结果: \" + future4.get());
// === 5. 实战案例: API调用重试 ===
demonstrateRetry();
}
/**
* API调用重试示例
*/
private static void demonstrateRetry() throws Exception {
System.out.println(\"n=== API调用重试 ===\");
int maxRetries = 3;
CompletableFuture result = retryAsync(() -> {
// 模拟API调用
if (Math.random() < 0.7) { // 70%失败率
throw new RuntimeException(\"API调用失败\");
}
return \"API响应数据\";
}, maxRetries);
System.out.println(\"最终结果: \" + result.get());
}
/**
* 异步重试方法
*/
private static CompletableFuture retryAsync(
java.util.concurrent.Callable task, int maxRetries) {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
for (int i = 0; i < maxRetries; i++) {
final int attempt = i + 1;
future = future.exceptionally(ex -> {
System.out.println(\"第\" + attempt + \"次重试...\");
try {
Thread.sleep(1000 * attempt); // 递增延迟
return task.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
return future.exceptionally(ex -> {
System.out.println(\"重试\" + maxRetries + \"次后仍失败\");
throw new RuntimeException(\"最终失败: \" + ex.getMessage());
});
}
}
六、线程池配置
6.1 自定义线程池
package com.example.completablefuture;
import java.util.concurrent.*;
/**
* 线程池配置
*/
public class ThreadPoolConfiguration {
public static void main(String[] args) throws Exception {
// === 1. 默认线程池(ForkJoinPool) ===
System.out.println(\"=== 默认线程池 ===\");
System.out.println(\"处理器核心数: \" + Runtime.getRuntime().availableProcessors());
System.out.println(\"默认线程池大小: \" + ForkJoinPool.commonPool().getParallelism());
CompletableFuture.supplyAsync(() -> {
System.out.println(\"默认线程池: \" + Thread.currentThread().getName());
return \"Default\";
}).get();
// === 2. 自定义线程池 ===
System.out.println(\"n=== 自定义线程池 ===\");
// 创建自定义线程池
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
10, // 核心线程数
20, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue(100), // 任务队列
new ThreadFactory() { // 线程工厂
private int count = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, \"CustomThread-\" + count++);
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println(\"自定义线程池: \" + Thread.currentThread().getName());
return \"Custom\";
}, customExecutor);
System.out.println(\"结果: \" + future.get());
// === 3. 线程池最佳实践 ===
demonstrateBestPractices();
customExecutor.shutdown();
}
/**
* 线程池最佳实践
*/
private static void demonstrateBestPractices() {
System.out.println(\"n=== 线程池最佳实践 ===\");
// CPU密集型任务: 核心数 + 1
int cpuIntensive = Runtime.getRuntime().availableProcessors() + 1;
System.out.println(\"CPU密集型推荐线程数: \" + cpuIntensive);
// IO密集型任务: 核心数 * 2
int ioIntensive = Runtime.getRuntime().availableProcessors() * 2;
System.out.println(\"IO密集型推荐线程数: \" + ioIntensive);
// 混合型任务: 根据实际情况调整
System.out.println(\"n线程池配置建议:\");
System.out.println(\"1. CPU密集型: 核心数+1\");
System.out.println(\"2. IO密集型: 核心数*2\");
System.out.println(\"3. 队列大小: 根据内存限制\");
System.out.println(\"4. 拒绝策略: CallerRunsPolicy(生产推荐)\");
System.out.println(\"5. 线程名称: 自定义ThreadFactory便于排查问题\");
}
}
七、实战案例:电商系统
7.1 商品详情页聚合服务
package com.example.completablefuture.practice;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
/**
* 实战案例: 电商商品详情页
*/
public class ProductDetailService {
public static void main(String[] args) throws Exception {
ProductDetailService service = new ProductDetailService();
Long productId = 12345L;
System.out.println(\"=== 加载商品详情页 ===n\");
long start = System.currentTimeMillis();
ProductDetailVO detail = service.getProductDetail(productId);
long end = System.currentTimeMillis();
System.out.println(\"n\" + detail);
System.out.println(\"n总耗时: \" + (end - start) + \"ms\");
}
/**
* 获取商品详情(并行聚合多个数据源)
*/
public ProductDetailVO getProductDetail(Long productId) throws Exception {
// 1. 查询商品基本信息(必须)
CompletableFuture productFuture = CompletableFuture
.supplyAsync(() -> queryProductInfo(productId));
// 2. 查询价格信息(必须)
CompletableFuture priceFuture = CompletableFuture
.supplyAsync(() -> queryPriceInfo(productId));
// 3. 查询库存信息(必须)
CompletableFuture stockFuture = CompletableFuture
.supplyAsync(() -> queryStockInfo(productId));
// 4. 查询商家信息(必须,依赖商品信息)
CompletableFuture merchantFuture = productFuture
.thenCompose(product -> CompletableFuture.supplyAsync(
() -> queryMerchantInfo(product.getMerchantId())));
// 5. 查询评价信息(非必须)
CompletableFuture reviewFuture = CompletableFuture
.supplyAsync(() -> queryReviewInfo(productId))
.exceptionally(ex -> {
System.out.println(\"评价服务异常,使用默认值\");
return new ReviewInfo(0.0, 0);
});
// 6. 查询推荐商品(非必须)
CompletableFuture recommendFuture = CompletableFuture
.supplyAsync(() -> queryRecommendInfo(productId))
.exceptionally(ex -> {
System.out.println(\"推荐服务异常,使用默认值\");
return new RecommendInfo();
});
// 等待所有必须的数据
CompletableFuture allRequired = CompletableFuture.allOf(
productFuture, priceFuture, stockFuture, merchantFuture
);
// 等待必须数据完成
allRequired.get();
// 组装返回结果
ProductDetailVO vo = new ProductDetailVO();
vo.setProduct(productFuture.get());
vo.setPrice(priceFuture.get());
vo.setStock(stockFuture.get());
vo.setMerchant(merchantFuture.get());
// 非必须数据,尽量获取,超时则使用默认值
try {
vo.setReview(reviewFuture.getNow(new ReviewInfo(0.0, 0)));
vo.setRecommend(recommendFuture.getNow(new RecommendInfo()));
} catch (Exception e) {
// 使用默认值
}
return vo;
}
// === 模拟各种查询服务 ===
private ProductInfo queryProductInfo(Long productId) {
sleep(300);
System.out.println(\" 查询商品信息\");
return new ProductInfo(productId, \"iPhone 15 Pro\", 1001L);
}
private PriceInfo queryPriceInfo(Long productId) {
sleep(200);
System.out.println(\" 查询价格信息\");
return new PriceInfo(7999.0, 6999.0);
}
private StockInfo queryStockInfo(Long productId) {
sleep(250);
System.out.println(\" 查询库存信息\");
return new StockInfo(150);
}
private MerchantInfo queryMerchantInfo(Long merchantId) {
sleep(200);
System.out.println(\" 查询商家信息\");
return new MerchantInfo(merchantId, \"Apple官方旗舰店\");
}
private ReviewInfo queryReviewInfo(Long productId) {
sleep(400);
// 模拟偶尔异常
if (ThreadLocalRandom.current().nextInt(10) < 2) {
throw new RuntimeException(\"评价服务异常\");
}
System.out.println(\" 查询评价信息\");
return new ReviewInfo(4.8, 15000);
}
private RecommendInfo queryRecommendInfo(Long productId) {
sleep(350);
System.out.println(\" 查询推荐信息\");
return new RecommendInfo();
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// === 实体类 ===
class ProductDetailVO {
private ProductInfo product;
private PriceInfo price;
private StockInfo stock;
private MerchantInfo merchant;
private ReviewInfo review;
private RecommendInfo recommend;
// Setters
public void setProduct(ProductInfo product) { this.product = product; }
public void setPrice(PriceInfo price) { this.price = price; }
public void setStock(StockInfo stock) { this.stock = stock; }
public void setMerchant(MerchantInfo merchant) { this.merchant = merchant; }
public void setReview(ReviewInfo review) { this.review = review; }
public void setRecommend(RecommendInfo recommend) { this.recommend = recommend; }
@Override
public String toString() {
return \"=== 商品详情 ===n\" +
\"商品: \" + product.getName() + \"n\" +
\"价格: ¥\" + price.getSalePrice() + \" (原价¥\" + price.getOriginalPrice() + \")n\" +
\"库存: \" + stock.getQuantity() + \"n\" +
\"商家: \" + merchant.getName() + \"n\" +
\"评分: \" + review.getScore() + \"分 (\" + review.getCount() + \"条评价)\";
}
}
class ProductInfo {
private Long id;
private String name;
private Long merchantId;
public ProductInfo(Long id, String name, Long merchantId) {
this.id = id;
this.name = name;
this.merchantId = merchantId;
}
public String getName() { return name; }
public Long getMerchantId() { return merchantId; }
}
class PriceInfo {
private Double originalPrice;
private Double salePrice;
public PriceInfo(Double originalPrice, Double salePrice) {
this.originalPrice = originalPrice;
this.salePrice = salePrice;
}
public Double getOriginalPrice() { return originalPrice; }
public Double getSalePrice() { return salePrice; }
}
class StockInfo {
private Integer quantity;
public StockInfo(Integer quantity) {
this.quantity = quantity;
}
public Integer getQuantity() { return quantity; }
}
class MerchantInfo {
private Long id;
private String name;
public MerchantInfo(Long id, String name) {
this.id = id;
this.name = name;
}
public String getName() { return name; }
}
class ReviewInfo {
private Double score;
private Integer count;
public ReviewInfo(Double score, Integer count) {
this.score = score;
this.count = count;
}
public Double getScore() { return score; }
public Integer getCount() { return count; }
}
class RecommendInfo {
// 推荐商品列表
}
八、实战案例:批量任务处理
8.1 批量数据导入
package com.example.completablefuture.practice;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
* 实战案例: 批量数据导入
*/
public class BatchImportService {
private static final int BATCH_SIZE = 100; // 每批处理100条
public static void main(String[] args) throws Exception {
BatchImportService service = new BatchImportService();
// 模拟1000条数据
List records = new ArrayList();
for (int i = 1; i <= 1000; i++) {
records.add(new DataRecord(i, \"数据\" + i));
}
System.out.println(\"=== 批量数据导入 ===\");
System.out.println(\"总数据量: \" + records.size() + \"条n\");
long start = System.currentTimeMillis();
BatchImportResult result = service.batchImport(records);
long end = System.currentTimeMillis();
System.out.println(\"n=== 导入结果 ===\");
System.out.println(\"成功: \" + result.getSuccessCount() + \"条\");
System.out.println(\"失败: \" + result.getFailCount() + \"条\");
System.out.println(\"总耗时: \" + (end - start) + \"ms\");
}
/**
* 批量导入数据
*/
public BatchImportResult batchImport(List records) throws Exception {
// 分批处理
List<List> batches = splitBatch(records, BATCH_SIZE);
System.out.println(\"分为 \" + batches.size() + \" 批处理n\");
// 创建批量任务
List<CompletableFuture> futures = batches.stream()
.map(batch -> CompletableFuture.supplyAsync(() -> processBatch(batch)))
.collect(Collectors.toList());
// 等待所有批次完成
CompletableFuture allOf = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allOf.get();
// 汇总结果
int successCount = 0;
int failCount = 0;
for (CompletableFuture future : futures) {
BatchResult batchResult = future.get();
successCount += batchResult.getSuccessCount();
failCount += batchResult.getFailCount();
}
return new BatchImportResult(successCount, failCount);
}
/**
* 处理单批数据
*/
private BatchResult processBatch(List batch) {
System.out.println(\"[\" + Thread.currentThread().getName() +
\"] 处理批次,数量: \" + batch.size());
int success = 0;
int fail = 0;
for (DataRecord record : batch) {
try {
// 模拟数据处理
Thread.sleep(10);
// 模拟偶尔失败
if (Math.random() < 0.05) { // 5%失败率
throw new RuntimeException(\"处理失败\");
}
success++;
} catch (Exception e) {
fail++;
}
}
System.out.println(\"[\" + Thread.currentThread().getName() +
\"] 批次完成,成功: \" + success + \", 失败: \" + fail);
return new BatchResult(success, fail);
}
/**
* 分批
*/
private List<List> splitBatch(List records, int batchSize) {
List<List> batches = new ArrayList();
for (int i = 0; i < records.size(); i += batchSize) {
int end = Math.min(i + batchSize, records.size());
batches.add(records.subList(i, end));
}
return batches;
}
}
/**
* 数据记录
*/
class DataRecord {
private Integer id;
private String data;
public DataRecord(Integer id, String data) {
this.id = id;
this.data = data;
}
public Integer getId() { return id; }
public String getData() { return data; }
}
/**
* 批次结果
*/
class BatchResult {
private int successCount;
private int failCount;
public BatchResult(int successCount, int failCount) {
this.successCount = successCount;
this.failCount = failCount;
}
public int getSuccessCount() { return successCount; }
public int getFailCount() { return failCount; }
}
/**
* 导入结果
*/
class BatchImportResult {
private int successCount;
private int failCount;
public BatchImportResult(int successCount, int failCount) {
this.successCount = successCount;
this.failCount = failCount;
}
public int getSuccessCount() { return successCount; }
public int getFailCount() { return failCount; }
}
九、性能优化与最佳实践
9.1 性能优化技巧
package com.example.completablefuture.best;
import java.util.concurrent.*;
/**
* 性能优化与最佳实践
*/
public class PerformanceOptimization {
public static void main(String[] args) throws Exception {
System.out.println(\"=== CompletableFuture最佳实践 ===n\");
// === 1. 选择合适的方法 ===
System.out.println(\"1. 选择合适的方法\");
System.out.println(\" thenApply: 需要转换结果\");
System.out.println(\" thenAccept: 只消费结果\");
System.out.println(\" thenRun: 不关心结果\");
System.out.println(\" 选择最轻量的方法可以提升性能n\");
// === 2. 使用自定义线程池 ===
System.out.println(\"2. 使用自定义线程池\");
System.out.println(\" 避免使用ForkJoinPool.commonPool()\");
System.out.println(\" 根据任务类型配置线程池\");
System.out.println(\" 设置有界队列防止OOMn\");
// === 3. 避免阻塞操作 ===
System.out.println(\"3. 避免阻塞操作\");
System.out.println(\" 不要在回调中执行耗时IO操作\");
System.out.println(\" 使用thenComposeAsync处理异步依赖n\");
// === 4. 合理设置超时 ===
demonstrateTimeout();
// === 5. 批量任务优化 ===
demonstrateBatchOptimization();
// === 6. 异常处理 ===
System.out.println(\"n6. 异常处理最佳实践\");
System.out.println(\" 使用exceptionally或handle处理异常\");
System.out.println(\" 避免异常被吞没\");
System.out.println(\" 记录完整的异常堆栈\");
}
/**
* 超时控制
*/
private static void demonstrateTimeout() throws Exception {
System.out.println(\"4. 超时控制\");
CompletableFuture slowTask = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return \"慢任务结果\";
});
try {
// JDK 9+支持超时
// String result = slowTask.get(2, TimeUnit.SECONDS);
// JDK 8可以使用这种方式
String result = slowTask
.applyToEither(
timeout(2, TimeUnit.SECONDS),
s -> s
)
.get();
System.out.println(\" 结果: \" + result);
} catch (TimeoutException e) {
System.out.println(\" 任务超时,已取消\");
slowTask.cancel(true);
} catch (Exception e) {
System.out.println(\" 任务超时\");
}
}
/**
* 创建超时Future
*/
private static CompletableFuture timeout(long timeout, TimeUnit unit) {
CompletableFuture future = new CompletableFuture();
Executors.newScheduledThreadPool(1).schedule(
() -> future.completeExceptionally(new TimeoutException()),
timeout,
unit
);
return future;
}
/**
* 批量任务优化
*/
private static void demonstrateBatchOptimization() {
System.out.println(\"n5. 批量任务优化\");
System.out.println(\" 分批处理,避免创建过多线程\");
System.out.println(\" 使用allOf等待批次完成\");
System.out.println(\" 控制并发度,避免资源耗尽\");
System.out.println(\" 示例:\");
System.out.println(\" - 1000个任务\");
System.out.println(\" - 分10批,每批100个\");
System.out.println(\" - 逐批执行,降低并发压力\");
}
}
十、常见问题与陷阱
10.1 常见陷阱
package com.example.completablefuture.best;
import java.util.concurrent.CompletableFuture;
/**
* 常见问题与陷阱
*/
public class CommonPitfalls {
public static void main(String[] args) throws Exception {
System.out.println(\"=== CompletableFuture常见陷阱 ===n\");
// === 陷阱1: 忘记处理异常 ===
System.out.println(\"陷阱1: 异常被吞没\");
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException(\"异常\");
}).thenApply(s -> s.toUpperCase()); // 不会执行
try {
future1.get(); // 这里才会抛出异常
} catch (Exception e) {
System.out.println(\" 异常: \" + e.getCause().getMessage());
}
// 正确做法
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException(\"异常\");
}).exceptionally(ex -> {
System.out.println(\" 捕获并处理异常: \" + ex.getMessage());
return \"默认值\";
});
System.out.println(\" 结果: \" + future2.get());
// === 陷阱2: 在回调中阻塞 ===
System.out.println(\"n陷阱2: 在回调中阻塞\");
// 错误: 阻塞回调线程
CompletableFuture.supplyAsync(() -> \"Step1\")
.thenApply(s -> {
try {
Thread.sleep(5000); // 阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + \" Step2\";
});
System.out.println(\" 不要在回调中执行耗时操作\");
// 正确: 使用Async方法
CompletableFuture.supplyAsync(() -> \"Step1\")
.thenApplyAsync(s -> {
// 这会在另一个线程执行
return s + \" Step2\";
});
System.out.println(\" 使用thenApplyAsync\");
// === 陷阱3: 忘记指定线程池 ===
System.out.println(\"n陷阱3: 使用公共线程池\");
System.out.println(\" 默认使用ForkJoinPool.commonPool()\");
System.out.println(\" 可能被其他任务影响\");
System.out.println(\" 使用自定义线程池隔离资源\");
// === 陷阱4: 链式调用中断 ===
System.out.println(\"n陷阱4: 链式调用中断\");
CompletableFuture broken = CompletableFuture
.supplyAsync(() -> {
throw new RuntimeException(\"第一步失败\");
})
.thenApply(s -> {
System.out.println(\"这行不会执行\");
return s.toUpperCase();
})
.thenApply(s -> {
System.out.println(\"这行也不会执行\");
return s + \"!\";
});
System.out.println(\" 链式调用会因异常中断\");
System.out.println(\" 使用handle或exceptionally处理\");
// === 陷阱5: 忘记get()或join() ===
System.out.println(\"n陷阱5: 任务未执行完就退出\");
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100);
System.out.println(\" 这行可能不会执行\");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 如果主线程退出,异步任务可能未完成
// 正确做法: future.get() 或 future.join()
Thread.sleep(200); // 等待演示
// === 最佳实践总结 ===
printBestPractices();
}
/**
* 最佳实践总结
*/
private static void printBestPractices() {
System.out.println(\"n=== 最佳实践总结 ===\");
System.out.println(\"n 推荐做法:\");
System.out.println(\"1. 使用自定义线程池\");
System.out.println(\"2. 处理所有异常(exceptionally/handle)\");
System.out.println(\"3. 设置超时时间\");
System.out.println(\"4. 合理使用Async后缀方法\");
System.out.println(\"5. 批量任务分批处理\");
System.out.println(\"6. 记录详细日志\");
System.out.println(\"n 避免做法:\");
System.out.println(\"1. 在回调中执行阻塞操作\");
System.out.println(\"2. 忽略异常处理\");
System.out.println(\"3. 过度使用CompletableFuture\");
System.out.println(\"4. 创建过多线程\");
System.out.println(\"5. 循环依赖\");
}
}
总结
核心知识点回顾
本文深入探讨了JDK 8 CompletableFuture,主要内容包括:
-
基础概念
- Future的局限性
- CompletableFuture的优势
- 核心方法分类
-
基本操作
- 创建CompletableFuture
- 结果处理与转换
- 链式调用
-
高级特性
- 多任务组合(thenCompose/thenCombine/allOf/anyOf)
- 异常处理(exceptionally/handle/whenComplete)
- 线程池配置
-
实战应用
- 电商商品详情页数据聚合
- 批量数据导入处理
- API重试机制
-
性能优化
- 线程池配置建议
- 超时控制
- 批量任务优化
Future对比
| 维度 | Future | CompletableFuture |
|---|---|---|
| 手动完成 | ||
| 链式调用 | ||
| 多任务组合 | ||
| 异常处理 | ||
| 非阻塞 | ||
| 回调支持 |
使用场景
适用场景:
+---------------------------+
| 并行调用多个服务 |
| 数据聚合(商品详情页) |
| 批量任务处理 |
| 异步日志、通知 |
| 超时控制 |
| 降级与容错 |
+---------------------------+
不适用场景:
+---------------------------+
| 简单的顺序调用 |
| 需要事务一致性 |
| 复杂的状态管理 |
| 过度的嵌套依赖 |
+---------------------------+
线程池配置建议
// CPU密集型
int cpuThreads = Runtime.getRuntime().availableProcessors() + 1;
// IO密集型
int ioThreads = Runtime.getRuntime().availableProcessors() * 2;
// 自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, // 核心线程数
maximumPoolSize, // 最大线程数
60L, // 空闲存活时间
TimeUnit.SECONDS,
new LinkedBlockingQueue(1000), // 有界队列
new NamedThreadFactory(\"async-\"), // 自定义名称
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);



还没有评论呢,快来抢沙发~