首页 开发教程 CompletableFuture异步编程实战

CompletableFuture异步编程实战

开发教程 2025年12月4日
877 浏览

JDK8 CompletableFuture异步编程实战

目录

  • 一、CompletableFuture概述
  • 二、创建CompletableFuture
  • 三、结果处理与转换
  • 四、多任务组合
  • 五、异常处理
  • 六、线程池配置
  • 七、实战案例:电商系统
  • 八、实战案例:批量任务处理
  • 九、性能优化与最佳实践
  • 十、常见问题与陷阱
  • 总结

一、CompletableFuture概述

1.1 为什么需要CompletableFuture

在JDK 8之前,Java异步编程主要依赖FutureCallable,但存在诸多不足:

Future的局限性:
+---------------------------+
|  无法手动完成             |
|  无法链式调用             |
|  无法组合多个Future       |
|  无法处理异常             |
|  阻塞式获取结果           |
+---------------------------+

CompletableFuture的优势:
+---------------------------+
|  支持手动完成             |
|  支持链式调用             |
|  多个Future组合          |
|  完善的异常处理           |
|  非阻塞式编程             |
+---------------------------+
package com.example.completablefuture;

import java.util.concurrent.*;

/**
 * Future vs CompletableFuture对比
 */
public class FutureVsCompletableFuture {

    public static void main(String[] args) throws Exception {

        // === 1. 传统Future的问题 ===
        System.out.println(\"=== 传统Future ===\");

        ExecutorService executor = Executors.newFixedThreadPool(2);

        // 提交任务
        Future future = executor.submit(() -> {
            Thread.sleep(1000);
            return \"Hello Future\";
        });

        // 只能阻塞等待结果
        System.out.println(\"阻塞等待结果...\");
        String result = future.get(); // 阻塞
        System.out.println(\"结果: \" + result);

        // === 2. CompletableFuture的优势 ===
        System.out.println(\"n=== CompletableFuture ===\");

        // 异步执行
        CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return \"Hello CompletableFuture\";
        });

        // 非阻塞式处理结果
        cf.thenAccept(res -> System.out.println(\"结果: \" + res));

        // 链式调用
        CompletableFuture chain = CompletableFuture
            .supplyAsync(() -> \"Step 1\")
            .thenApply(s -> s + \" -> Step 2\")
            .thenApply(s -> s + \" -> Step 3\");

        System.out.println(\"链式调用结果: \" + chain.get());

        executor.shutdown();

        // 等待异步任务完成
        Thread.sleep(2000);
    }
}

1.2 CompletableFuture核心方法分类

方法分类:
+------------------+------------------------+--------------------+
|    类型          |       方法             |      说明          |
+------------------+------------------------+--------------------+
| 创建             | supplyAsync            | 有返回值           |
|                  | runAsync               | 无返回值           |
+------------------+------------------------+--------------------+
| 转换             | thenApply              | 同步转换           |
|                  | thenApplyAsync         | 异步转换           |
+------------------+------------------------+--------------------+
| 消费             | thenAccept             | 消费结果           |
|                  | thenRun                | 不接收结果         |
+------------------+------------------------+--------------------+
| 组合             | thenCompose            | 串行组合           |
|                  | thenCombine            | 并行组合           |
|                  | allOf                  | 等待所有完成       |
|                  | anyOf                  | 等待任一完成       |
+------------------+------------------------+--------------------+
| 异常处理         | exceptionally          | 异常处理           |
|                  | handle                 | 正常+异常处理      |
|                  | whenComplete           | 完成时回调         |
+------------------+------------------------+--------------------+

二、创建CompletableFuture

2.1 基本创建方式

package com.example.completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * CompletableFuture创建方式
 */
public class CreateCompletableFuture {

    public static void main(String[] args) throws Exception {

        // === 1. supplyAsync - 有返回值的异步任务 ===
        System.out.println(\"=== supplyAsync ===\");

        CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(\"执行异步任务: \" + Thread.currentThread().getName());
            return \"Hello\";
        });

        System.out.println(\"结果: \" + future1.get());

        // === 2. runAsync - 无返回值的异步任务 ===
        System.out.println(\"n=== runAsync ===\");

        CompletableFuture future2 = CompletableFuture.runAsync(() -> {
            System.out.println(\"执行无返回值任务: \" + Thread.currentThread().getName());
        });

        future2.get(); // 等待完成

        // === 3. 使用自定义线程池 ===
        System.out.println(\"n=== 自定义线程池 ===\");

        ExecutorService executor = Executors.newFixedThreadPool(5);

        CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println(\"自定义线程池: \" + Thread.currentThread().getName());
            return \"Custom Thread Pool\";
        }, executor);

        System.out.println(\"结果: \" + future3.get());

        // === 4. completedFuture - 已完成的Future ===
        System.out.println(\"n=== completedFuture ===\");

        CompletableFuture completed = CompletableFuture.completedFuture(\"立即完成\");
        System.out.println(\"结果: \" + completed.get());

        // === 5. 手动完成 ===
        System.out.println(\"n=== 手动完成 ===\");

        CompletableFuture manual = new CompletableFuture();

        // 在另一个线程中完成
        new Thread(() -> {
            try {
                Thread.sleep(1000);
                manual.complete(\"手动完成的结果\");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        System.out.println(\"等待手动完成...\");
        System.out.println(\"结果: \" + manual.get());

        executor.shutdown();
    }
}

三、结果处理与转换

3.1 thenApply、thenAccept、thenRun

package com.example.completablefuture;

import java.util.concurrent.CompletableFuture;

/**
 * 结果处理与转换
 */
public class ResultProcessing {

    public static void main(String[] args) throws Exception {

        // === 1. thenApply - 转换结果(有返回值) ===
        System.out.println(\"=== thenApply ===\");

        CompletableFuture future1 = CompletableFuture
            .supplyAsync(() -> {
                System.out.println(\"步骤1: 返回10\");
                return 10;
            })
            .thenApply(num -> {
                System.out.println(\"步骤2: \" + num + \" * 2\");
                return num * 2;
            })
            .thenApply(num -> {
                System.out.println(\"步骤3: \" + num + \" + 5\");
                return num + 5;
            });

        System.out.println(\"最终结果: \" + future1.get()); // 25

        // === 2. thenAccept - 消费结果(无返回值) ===
        System.out.println(\"n=== thenAccept ===\");

        CompletableFuture future2 = CompletableFuture
            .supplyAsync(() -> \"Hello World\")
            .thenAccept(str -> {
                System.out.println(\"消费结果: \" + str);
            });

        future2.get();

        // === 3. thenRun - 不关心结果(无返回值) ===
        System.out.println(\"n=== thenRun ===\");

        CompletableFuture future3 = CompletableFuture
            .supplyAsync(() -> \"Some Result\")
            .thenRun(() -> {
                System.out.println(\"任务完成,执行后续操作\");
            });

        future3.get();

        // === 4. 同步 vs 异步 ===
        System.out.println(\"n=== 同步 vs 异步 ===\");

        // thenApply: 同步执行(使用相同线程)
        CompletableFuture.supplyAsync(() -> {
            System.out.println(\"supplyAsync: \" + Thread.currentThread().getName());
            return \"Result\";
        }).thenApply(s -> {
            System.out.println(\"thenApply: \" + Thread.currentThread().getName());
            return s.toUpperCase();
        }).get();

        // thenApplyAsync: 异步执行(使用不同线程)
        CompletableFuture.supplyAsync(() -> {
            System.out.println(\"supplyAsync: \" + Thread.currentThread().getName());
            return \"Result\";
        }).thenApplyAsync(s -> {
            System.out.println(\"thenApplyAsync: \" + Thread.currentThread().getName());
            return s.toUpperCase();
        }).get();

        // === 5. 链式调用示例 ===
        demonstrateChaining();
    }

    /**
     * 链式调用示例
     */
    private static void demonstrateChaining() throws Exception {
        System.out.println(\"n=== 链式调用示例 ===\");

        String result = CompletableFuture
            .supplyAsync(() -> \"hello\")
            .thenApply(String::toUpperCase)
            .thenApply(s -> s + \" WORLD\")
            .thenApply(s -> s + \"!\")
            .get();

        System.out.println(\"链式调用结果: \" + result);
    }
}

四、多任务组合

4.1 thenCompose与thenCombine

package com.example.completablefuture;

import java.util.concurrent.CompletableFuture;

/**
 * 多任务组合
 */
public class TaskCombination {

    public static void main(String[] args) throws Exception {

        // === 1. thenCompose - 串行组合(依赖) ===
        System.out.println(\"=== thenCompose (串行) ===\");

        CompletableFuture compose = CompletableFuture
            .supplyAsync(() -> {
                System.out.println(\"任务1: 查询用户ID\");
                return \"USER_001\";
            })
            .thenCompose(userId -> CompletableFuture.supplyAsync(() -> {
                System.out.println(\"任务2: 根据\" + userId + \"查询用户详情\");
                return \"User{name=\'张三\', id=\'\" + userId + \"\'}\";
            }));

        System.out.println(\"结果: \" + compose.get());

        // === 2. thenCombine - 并行组合(独立) ===
        System.out.println(\"n=== thenCombine (并行) ===\");

        CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            System.out.println(\"任务1: 查询用户信息\");
            return \"User: 张三\";
        });

        CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            System.out.println(\"任务2: 查询订单信息\");
            return \"Order: ORD001\";
        });

        CompletableFuture combined = future1.thenCombine(future2, (user, order) -> {
            System.out.println(\"合并结果\");
            return user + \" | \" + order;
        });

        System.out.println(\"组合结果: \" + combined.get());

        // === 3. allOf - 等待所有任务完成 ===
        System.out.println(\"n=== allOf ===\");

        CompletableFuture task1 = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return \"任务1完成\";
        });

        CompletableFuture task2 = CompletableFuture.supplyAsync(() -> {
            sleep(2000);
            return \"任务2完成\";
        });

        CompletableFuture task3 = CompletableFuture.supplyAsync(() -> {
            sleep(1500);
            return \"任务3完成\";
        });

        long start = System.currentTimeMillis();

        CompletableFuture allOfFuture = CompletableFuture.allOf(task1, task2, task3);

        allOfFuture.get(); // 等待所有完成

        long end = System.currentTimeMillis();

        System.out.println(\"所有任务完成,耗时: \" + (end - start) + \"ms\");
        System.out.println(\"结果1: \" + task1.get());
        System.out.println(\"结果2: \" + task2.get());
        System.out.println(\"结果3: \" + task3.get());

        // === 4. anyOf - 等待任一任务完成 ===
        System.out.println(\"n=== anyOf ===\");

        CompletableFuture slow = CompletableFuture.supplyAsync(() -> {
            sleep(3000);
            return \"慢任务\";
        });

        CompletableFuture fast = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return \"快任务\";
        });

        CompletableFuture anyOfFuture = CompletableFuture.anyOf(slow, fast);

        System.out.println(\"最快完成的任务: \" + anyOfFuture.get());
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

4.2 复杂组合示例

package com.example.completablefuture;

import java.util.concurrent.CompletableFuture;

/**
 * 复杂任务组合示例
 */
public class ComplexCombination {

    public static void main(String[] args) throws Exception {

        // 场景: 电商商品详情页数据聚合
        // 需要并行获取: 商品基本信息、库存信息、评价信息、推荐商品

        System.out.println(\"=== 商品详情页数据聚合 ===n\");

        long start = System.currentTimeMillis();

        // 1. 查询商品基本信息
        CompletableFuture productInfo = CompletableFuture.supplyAsync(() -> {
            sleep(500);
            System.out.println(\" 获取商品基本信息\");
            return \"iPhone 15 Pro\";
        });

        // 2. 查询库存信息
        CompletableFuture stockInfo = CompletableFuture.supplyAsync(() -> {
            sleep(300);
            System.out.println(\" 获取库存信息\");
            return 100;
        });

        // 3. 查询评价信息
        CompletableFuture reviewInfo = CompletableFuture.supplyAsync(() -> {
            sleep(600);
            System.out.println(\" 获取评价信息\");
            return \"4.8分(1000评价)\";
        });

        // 4. 查询推荐商品
        CompletableFuture recommendInfo = CompletableFuture.supplyAsync(() -> {
            sleep(400);
            System.out.println(\" 获取推荐商品\");
            return \"推荐: AirPods Pro\";
        });

        // 组合所有结果
        CompletableFuture result = productInfo
            .thenCombine(stockInfo, (product, stock) -> {
                ProductDetail detail = new ProductDetail();
                detail.setProductName(product);
                detail.setStock(stock);
                return detail;
            })
            .thenCombine(reviewInfo, (detail, review) -> {
                detail.setReview(review);
                return detail;
            })
            .thenCombine(recommendInfo, (detail, recommend) -> {
                detail.setRecommend(recommend);
                return detail;
            });

        ProductDetail detail = result.get();

        long end = System.currentTimeMillis();

        System.out.println(\"n=== 商品详情 ===\");
        System.out.println(detail);
        System.out.println(\"n总耗时: \" + (end - start) + \"ms (并行执行)\");
        System.out.println(\"如果串行执行需要: ~1800ms\");
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

/**
 * 商品详情VO
 */
class ProductDetail {
    private String productName;
    private Integer stock;
    private String review;
    private String recommend;

    // Getters and Setters
    public void setProductName(String productName) { this.productName = productName; }
    public void setStock(Integer stock) { this.stock = stock; }
    public void setReview(String review) { this.review = review; }
    public void setRecommend(String recommend) { this.recommend = recommend; }

    @Override
    public String toString() {
        return \"商品名称: \" + productName + \"n\" +
               \"库存数量: \" + stock + \"n\" +
               \"用户评价: \" + review + \"n\" +
               \"推荐商品: \" + recommend;
    }
}

五、异常处理

5.1 异常处理方法

package com.example.completablefuture;

import java.util.concurrent.CompletableFuture;

/**
 * 异常处理
 */
public class ExceptionHandling {

    public static void main(String[] args) throws Exception {

        // === 1. exceptionally - 异常处理 ===
        System.out.println(\"=== exceptionally ===\");

        CompletableFuture future1 = CompletableFuture
            .supplyAsync(() -> {
                if (true) {
                    throw new RuntimeException(\"发生异常\");
                }
                return \"正常结果\";
            })
            .exceptionally(ex -> {
                System.out.println(\"捕获异常: \" + ex.getMessage());
                return \"异常时的默认值\";
            });

        System.out.println(\"结果: \" + future1.get());

        // === 2. handle - 同时处理正常和异常 ===
        System.out.println(\"n=== handle ===\");

        CompletableFuture future2 = CompletableFuture
            .supplyAsync(() -> {
                if (Math.random() > 0.5) {
                    throw new RuntimeException(\"随机异常\");
                }
                return \"成功\";
            })
            .handle((result, ex) -> {
                if (ex != null) {
                    System.out.println(\"处理异常: \" + ex.getMessage());
                    return \"异常处理结果\";
                }
                System.out.println(\"处理正常结果: \" + result);
                return result + \" (已处理)\";
            });

        System.out.println(\"结果: \" + future2.get());

        // === 3. whenComplete - 完成时回调(不改变结果) ===
        System.out.println(\"n=== whenComplete ===\");

        CompletableFuture future3 = CompletableFuture
            .supplyAsync(() -> \"原始结果\")
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    System.out.println(\"发生异常: \" + ex.getMessage());
                } else {
                    System.out.println(\"正常完成: \" + result);
                }
            });

        System.out.println(\"结果: \" + future3.get());

        // === 4. 异常传播 ===
        System.out.println(\"n=== 异常传播 ===\");

        CompletableFuture future4 = CompletableFuture
            .supplyAsync(() -> {
                throw new RuntimeException(\"第一步异常\");
            })
            .thenApply(s -> {
                System.out.println(\"这行不会执行\");
                return s.toUpperCase();
            })
            .thenApply(s -> {
                System.out.println(\"这行也不会执行\");
                return s + \"!\";
            })
            .exceptionally(ex -> {
                System.out.println(\"最终捕获: \" + ex.getMessage());
                return \"恢复值\";
            });

        System.out.println(\"结果: \" + future4.get());

        // === 5. 实战案例: API调用重试 ===
        demonstrateRetry();
    }

    /**
     * API调用重试示例
     */
    private static void demonstrateRetry() throws Exception {
        System.out.println(\"n=== API调用重试 ===\");

        int maxRetries = 3;

        CompletableFuture result = retryAsync(() -> {
            // 模拟API调用
            if (Math.random() < 0.7) { // 70%失败率
                throw new RuntimeException(\"API调用失败\");
            }
            return \"API响应数据\";
        }, maxRetries);

        System.out.println(\"最终结果: \" + result.get());
    }

    /**
     * 异步重试方法
     */
    private static  CompletableFuture retryAsync(
        java.util.concurrent.Callable task, int maxRetries) {

        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            try {
                return task.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });

        for (int i = 0; i < maxRetries; i++) {
            final int attempt = i + 1;
            future = future.exceptionally(ex -> {
                System.out.println(\"第\" + attempt + \"次重试...\");
                try {
                    Thread.sleep(1000 * attempt); // 递增延迟
                    return task.call();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }

        return future.exceptionally(ex -> {
            System.out.println(\"重试\" + maxRetries + \"次后仍失败\");
            throw new RuntimeException(\"最终失败: \" + ex.getMessage());
        });
    }
}

六、线程池配置

6.1 自定义线程池

package com.example.completablefuture;

import java.util.concurrent.*;

/**
 * 线程池配置
 */
public class ThreadPoolConfiguration {

    public static void main(String[] args) throws Exception {

        // === 1. 默认线程池(ForkJoinPool) ===
        System.out.println(\"=== 默认线程池 ===\");
        System.out.println(\"处理器核心数: \" + Runtime.getRuntime().availableProcessors());
        System.out.println(\"默认线程池大小: \" + ForkJoinPool.commonPool().getParallelism());

        CompletableFuture.supplyAsync(() -> {
            System.out.println(\"默认线程池: \" + Thread.currentThread().getName());
            return \"Default\";
        }).get();

        // === 2. 自定义线程池 ===
        System.out.println(\"n=== 自定义线程池 ===\");

        // 创建自定义线程池
        ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
            10,                      // 核心线程数
            20,                      // 最大线程数
            60L,                     // 空闲线程存活时间
            TimeUnit.SECONDS,        // 时间单位
            new LinkedBlockingQueue(100), // 任务队列
            new ThreadFactory() {    // 线程工厂
                private int count = 1;
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, \"CustomThread-\" + count++);
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );

        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            System.out.println(\"自定义线程池: \" + Thread.currentThread().getName());
            return \"Custom\";
        }, customExecutor);

        System.out.println(\"结果: \" + future.get());

        // === 3. 线程池最佳实践 ===
        demonstrateBestPractices();

        customExecutor.shutdown();
    }

    /**
     * 线程池最佳实践
     */
    private static void demonstrateBestPractices() {
        System.out.println(\"n=== 线程池最佳实践 ===\");

        // CPU密集型任务: 核心数 + 1
        int cpuIntensive = Runtime.getRuntime().availableProcessors() + 1;
        System.out.println(\"CPU密集型推荐线程数: \" + cpuIntensive);

        // IO密集型任务: 核心数 * 2
        int ioIntensive = Runtime.getRuntime().availableProcessors() * 2;
        System.out.println(\"IO密集型推荐线程数: \" + ioIntensive);

        // 混合型任务: 根据实际情况调整
        System.out.println(\"n线程池配置建议:\");
        System.out.println(\"1. CPU密集型: 核心数+1\");
        System.out.println(\"2. IO密集型: 核心数*2\");
        System.out.println(\"3. 队列大小: 根据内存限制\");
        System.out.println(\"4. 拒绝策略: CallerRunsPolicy(生产推荐)\");
        System.out.println(\"5. 线程名称: 自定义ThreadFactory便于排查问题\");
    }
}

七、实战案例:电商系统

7.1 商品详情页聚合服务

package com.example.completablefuture.practice;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 实战案例: 电商商品详情页
 */
public class ProductDetailService {

    public static void main(String[] args) throws Exception {

        ProductDetailService service = new ProductDetailService();

        Long productId = 12345L;

        System.out.println(\"=== 加载商品详情页 ===n\");

        long start = System.currentTimeMillis();

        ProductDetailVO detail = service.getProductDetail(productId);

        long end = System.currentTimeMillis();

        System.out.println(\"n\" + detail);
        System.out.println(\"n总耗时: \" + (end - start) + \"ms\");
    }

    /**
     * 获取商品详情(并行聚合多个数据源)
     */
    public ProductDetailVO getProductDetail(Long productId) throws Exception {

        // 1. 查询商品基本信息(必须)
        CompletableFuture productFuture = CompletableFuture
            .supplyAsync(() -> queryProductInfo(productId));

        // 2. 查询价格信息(必须)
        CompletableFuture priceFuture = CompletableFuture
            .supplyAsync(() -> queryPriceInfo(productId));

        // 3. 查询库存信息(必须)
        CompletableFuture stockFuture = CompletableFuture
            .supplyAsync(() -> queryStockInfo(productId));

        // 4. 查询商家信息(必须,依赖商品信息)
        CompletableFuture merchantFuture = productFuture
            .thenCompose(product -> CompletableFuture.supplyAsync(
                () -> queryMerchantInfo(product.getMerchantId())));

        // 5. 查询评价信息(非必须)
        CompletableFuture reviewFuture = CompletableFuture
            .supplyAsync(() -> queryReviewInfo(productId))
            .exceptionally(ex -> {
                System.out.println(\"评价服务异常,使用默认值\");
                return new ReviewInfo(0.0, 0);
            });

        // 6. 查询推荐商品(非必须)
        CompletableFuture recommendFuture = CompletableFuture
            .supplyAsync(() -> queryRecommendInfo(productId))
            .exceptionally(ex -> {
                System.out.println(\"推荐服务异常,使用默认值\");
                return new RecommendInfo();
            });

        // 等待所有必须的数据
        CompletableFuture allRequired = CompletableFuture.allOf(
            productFuture, priceFuture, stockFuture, merchantFuture
        );

        // 等待必须数据完成
        allRequired.get();

        // 组装返回结果
        ProductDetailVO vo = new ProductDetailVO();
        vo.setProduct(productFuture.get());
        vo.setPrice(priceFuture.get());
        vo.setStock(stockFuture.get());
        vo.setMerchant(merchantFuture.get());

        // 非必须数据,尽量获取,超时则使用默认值
        try {
            vo.setReview(reviewFuture.getNow(new ReviewInfo(0.0, 0)));
            vo.setRecommend(recommendFuture.getNow(new RecommendInfo()));
        } catch (Exception e) {
            // 使用默认值
        }

        return vo;
    }

    // === 模拟各种查询服务 ===

    private ProductInfo queryProductInfo(Long productId) {
        sleep(300);
        System.out.println(\" 查询商品信息\");
        return new ProductInfo(productId, \"iPhone 15 Pro\", 1001L);
    }

    private PriceInfo queryPriceInfo(Long productId) {
        sleep(200);
        System.out.println(\" 查询价格信息\");
        return new PriceInfo(7999.0, 6999.0);
    }

    private StockInfo queryStockInfo(Long productId) {
        sleep(250);
        System.out.println(\" 查询库存信息\");
        return new StockInfo(150);
    }

    private MerchantInfo queryMerchantInfo(Long merchantId) {
        sleep(200);
        System.out.println(\" 查询商家信息\");
        return new MerchantInfo(merchantId, \"Apple官方旗舰店\");
    }

    private ReviewInfo queryReviewInfo(Long productId) {
        sleep(400);
        // 模拟偶尔异常
        if (ThreadLocalRandom.current().nextInt(10) < 2) {
            throw new RuntimeException(\"评价服务异常\");
        }
        System.out.println(\" 查询评价信息\");
        return new ReviewInfo(4.8, 15000);
    }

    private RecommendInfo queryRecommendInfo(Long productId) {
        sleep(350);
        System.out.println(\" 查询推荐信息\");
        return new RecommendInfo();
    }

    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

// === 实体类 ===

class ProductDetailVO {
    private ProductInfo product;
    private PriceInfo price;
    private StockInfo stock;
    private MerchantInfo merchant;
    private ReviewInfo review;
    private RecommendInfo recommend;

    // Setters
    public void setProduct(ProductInfo product) { this.product = product; }
    public void setPrice(PriceInfo price) { this.price = price; }
    public void setStock(StockInfo stock) { this.stock = stock; }
    public void setMerchant(MerchantInfo merchant) { this.merchant = merchant; }
    public void setReview(ReviewInfo review) { this.review = review; }
    public void setRecommend(RecommendInfo recommend) { this.recommend = recommend; }

    @Override
    public String toString() {
        return \"=== 商品详情 ===n\" +
               \"商品: \" + product.getName() + \"n\" +
               \"价格: ¥\" + price.getSalePrice() + \" (原价¥\" + price.getOriginalPrice() + \")n\" +
               \"库存: \" + stock.getQuantity() + \"n\" +
               \"商家: \" + merchant.getName() + \"n\" +
               \"评分: \" + review.getScore() + \"分 (\" + review.getCount() + \"条评价)\";
    }
}

class ProductInfo {
    private Long id;
    private String name;
    private Long merchantId;

    public ProductInfo(Long id, String name, Long merchantId) {
        this.id = id;
        this.name = name;
        this.merchantId = merchantId;
    }

    public String getName() { return name; }
    public Long getMerchantId() { return merchantId; }
}

class PriceInfo {
    private Double originalPrice;
    private Double salePrice;

    public PriceInfo(Double originalPrice, Double salePrice) {
        this.originalPrice = originalPrice;
        this.salePrice = salePrice;
    }

    public Double getOriginalPrice() { return originalPrice; }
    public Double getSalePrice() { return salePrice; }
}

class StockInfo {
    private Integer quantity;

    public StockInfo(Integer quantity) {
        this.quantity = quantity;
    }

    public Integer getQuantity() { return quantity; }
}

class MerchantInfo {
    private Long id;
    private String name;

    public MerchantInfo(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public String getName() { return name; }
}

class ReviewInfo {
    private Double score;
    private Integer count;

    public ReviewInfo(Double score, Integer count) {
        this.score = score;
        this.count = count;
    }

    public Double getScore() { return score; }
    public Integer getCount() { return count; }
}

class RecommendInfo {
    // 推荐商品列表
}

八、实战案例:批量任务处理

8.1 批量数据导入

package com.example.completablefuture.practice;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

/**
 * 实战案例: 批量数据导入
 */
public class BatchImportService {

    private static final int BATCH_SIZE = 100; // 每批处理100条

    public static void main(String[] args) throws Exception {

        BatchImportService service = new BatchImportService();

        // 模拟1000条数据
        List records = new ArrayList();
        for (int i = 1; i <= 1000; i++) {
            records.add(new DataRecord(i, \"数据\" + i));
        }

        System.out.println(\"=== 批量数据导入 ===\");
        System.out.println(\"总数据量: \" + records.size() + \"条n\");

        long start = System.currentTimeMillis();

        BatchImportResult result = service.batchImport(records);

        long end = System.currentTimeMillis();

        System.out.println(\"n=== 导入结果 ===\");
        System.out.println(\"成功: \" + result.getSuccessCount() + \"条\");
        System.out.println(\"失败: \" + result.getFailCount() + \"条\");
        System.out.println(\"总耗时: \" + (end - start) + \"ms\");
    }

    /**
     * 批量导入数据
     */
    public BatchImportResult batchImport(List records) throws Exception {

        // 分批处理
        List<List> batches = splitBatch(records, BATCH_SIZE);

        System.out.println(\"分为 \" + batches.size() + \" 批处理n\");

        // 创建批量任务
        List<CompletableFuture> futures = batches.stream()
            .map(batch -> CompletableFuture.supplyAsync(() -> processBatch(batch)))
            .collect(Collectors.toList());

        // 等待所有批次完成
        CompletableFuture allOf = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        );

        allOf.get();

        // 汇总结果
        int successCount = 0;
        int failCount = 0;

        for (CompletableFuture future : futures) {
            BatchResult batchResult = future.get();
            successCount += batchResult.getSuccessCount();
            failCount += batchResult.getFailCount();
        }

        return new BatchImportResult(successCount, failCount);
    }

    /**
     * 处理单批数据
     */
    private BatchResult processBatch(List batch) {
        System.out.println(\"[\" + Thread.currentThread().getName() +
                          \"] 处理批次,数量: \" + batch.size());

        int success = 0;
        int fail = 0;

        for (DataRecord record : batch) {
            try {
                // 模拟数据处理
                Thread.sleep(10);

                // 模拟偶尔失败
                if (Math.random() < 0.05) { // 5%失败率
                    throw new RuntimeException(\"处理失败\");
                }

                success++;
            } catch (Exception e) {
                fail++;
            }
        }

        System.out.println(\"[\" + Thread.currentThread().getName() +
                          \"] 批次完成,成功: \" + success + \", 失败: \" + fail);

        return new BatchResult(success, fail);
    }

    /**
     * 分批
     */
    private List<List> splitBatch(List records, int batchSize) {
        List<List> batches = new ArrayList();

        for (int i = 0; i < records.size(); i += batchSize) {
            int end = Math.min(i + batchSize, records.size());
            batches.add(records.subList(i, end));
        }

        return batches;
    }
}

/**
 * 数据记录
 */
class DataRecord {
    private Integer id;
    private String data;

    public DataRecord(Integer id, String data) {
        this.id = id;
        this.data = data;
    }

    public Integer getId() { return id; }
    public String getData() { return data; }
}

/**
 * 批次结果
 */
class BatchResult {
    private int successCount;
    private int failCount;

    public BatchResult(int successCount, int failCount) {
        this.successCount = successCount;
        this.failCount = failCount;
    }

    public int getSuccessCount() { return successCount; }
    public int getFailCount() { return failCount; }
}

/**
 * 导入结果
 */
class BatchImportResult {
    private int successCount;
    private int failCount;

    public BatchImportResult(int successCount, int failCount) {
        this.successCount = successCount;
        this.failCount = failCount;
    }

    public int getSuccessCount() { return successCount; }
    public int getFailCount() { return failCount; }
}

九、性能优化与最佳实践

9.1 性能优化技巧

package com.example.completablefuture.best;

import java.util.concurrent.*;

/**
 * 性能优化与最佳实践
 */
public class PerformanceOptimization {

    public static void main(String[] args) throws Exception {

        System.out.println(\"=== CompletableFuture最佳实践 ===n\");

        // === 1. 选择合适的方法 ===
        System.out.println(\"1. 选择合适的方法\");
        System.out.println(\"   thenApply: 需要转换结果\");
        System.out.println(\"   thenAccept: 只消费结果\");
        System.out.println(\"   thenRun: 不关心结果\");
        System.out.println(\"   选择最轻量的方法可以提升性能n\");

        // === 2. 使用自定义线程池 ===
        System.out.println(\"2. 使用自定义线程池\");
        System.out.println(\"    避免使用ForkJoinPool.commonPool()\");
        System.out.println(\"    根据任务类型配置线程池\");
        System.out.println(\"    设置有界队列防止OOMn\");

        // === 3. 避免阻塞操作 ===
        System.out.println(\"3. 避免阻塞操作\");
        System.out.println(\"    不要在回调中执行耗时IO操作\");
        System.out.println(\"    使用thenComposeAsync处理异步依赖n\");

        // === 4. 合理设置超时 ===
        demonstrateTimeout();

        // === 5. 批量任务优化 ===
        demonstrateBatchOptimization();

        // === 6. 异常处理 ===
        System.out.println(\"n6. 异常处理最佳实践\");
        System.out.println(\"    使用exceptionally或handle处理异常\");
        System.out.println(\"    避免异常被吞没\");
        System.out.println(\"    记录完整的异常堆栈\");
    }

    /**
     * 超时控制
     */
    private static void demonstrateTimeout() throws Exception {
        System.out.println(\"4. 超时控制\");

        CompletableFuture slowTask = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return \"慢任务结果\";
        });

        try {
            // JDK 9+支持超时
            // String result = slowTask.get(2, TimeUnit.SECONDS);

            // JDK 8可以使用这种方式
            String result = slowTask
                .applyToEither(
                    timeout(2, TimeUnit.SECONDS),
                    s -> s
                )
                .get();

            System.out.println(\"   结果: \" + result);
        } catch (TimeoutException e) {
            System.out.println(\"    任务超时,已取消\");
            slowTask.cancel(true);
        } catch (Exception e) {
            System.out.println(\"    任务超时\");
        }
    }

    /**
     * 创建超时Future
     */
    private static  CompletableFuture timeout(long timeout, TimeUnit unit) {
        CompletableFuture future = new CompletableFuture();
        Executors.newScheduledThreadPool(1).schedule(
            () -> future.completeExceptionally(new TimeoutException()),
            timeout,
            unit
        );
        return future;
    }

    /**
     * 批量任务优化
     */
    private static void demonstrateBatchOptimization() {
        System.out.println(\"n5. 批量任务优化\");
        System.out.println(\"    分批处理,避免创建过多线程\");
        System.out.println(\"    使用allOf等待批次完成\");
        System.out.println(\"    控制并发度,避免资源耗尽\");
        System.out.println(\"   示例:\");
        System.out.println(\"   - 1000个任务\");
        System.out.println(\"   - 分10批,每批100个\");
        System.out.println(\"   - 逐批执行,降低并发压力\");
    }
}

十、常见问题与陷阱

10.1 常见陷阱

package com.example.completablefuture.best;

import java.util.concurrent.CompletableFuture;

/**
 * 常见问题与陷阱
 */
public class CommonPitfalls {

    public static void main(String[] args) throws Exception {

        System.out.println(\"=== CompletableFuture常见陷阱 ===n\");

        // === 陷阱1: 忘记处理异常 ===
        System.out.println(\"陷阱1: 异常被吞没\");

        CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException(\"异常\");
        }).thenApply(s -> s.toUpperCase()); // 不会执行

        try {
            future1.get(); // 这里才会抛出异常
        } catch (Exception e) {
            System.out.println(\"   异常: \" + e.getCause().getMessage());
        }

        // 正确做法
        CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException(\"异常\");
        }).exceptionally(ex -> {
            System.out.println(\"   捕获并处理异常: \" + ex.getMessage());
            return \"默认值\";
        });

        System.out.println(\"  结果: \" + future2.get());

        // === 陷阱2: 在回调中阻塞 ===
        System.out.println(\"n陷阱2: 在回调中阻塞\");

        //  错误: 阻塞回调线程
        CompletableFuture.supplyAsync(() -> \"Step1\")
            .thenApply(s -> {
                try {
                    Thread.sleep(5000); // 阻塞
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return s + \" Step2\";
            });

        System.out.println(\"   不要在回调中执行耗时操作\");

        //  正确: 使用Async方法
        CompletableFuture.supplyAsync(() -> \"Step1\")
            .thenApplyAsync(s -> {
                // 这会在另一个线程执行
                return s + \" Step2\";
            });

        System.out.println(\"   使用thenApplyAsync\");

        // === 陷阱3: 忘记指定线程池 ===
        System.out.println(\"n陷阱3: 使用公共线程池\");
        System.out.println(\"   默认使用ForkJoinPool.commonPool()\");
        System.out.println(\"   可能被其他任务影响\");
        System.out.println(\"   使用自定义线程池隔离资源\");

        // === 陷阱4: 链式调用中断 ===
        System.out.println(\"n陷阱4: 链式调用中断\");

        CompletableFuture broken = CompletableFuture
            .supplyAsync(() -> {
                throw new RuntimeException(\"第一步失败\");
            })
            .thenApply(s -> {
                System.out.println(\"这行不会执行\");
                return s.toUpperCase();
            })
            .thenApply(s -> {
                System.out.println(\"这行也不会执行\");
                return s + \"!\";
            });

        System.out.println(\"   链式调用会因异常中断\");
        System.out.println(\"   使用handle或exceptionally处理\");

        // === 陷阱5: 忘记get()或join() ===
        System.out.println(\"n陷阱5: 任务未执行完就退出\");

        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(100);
                System.out.println(\"  这行可能不会执行\");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 如果主线程退出,异步任务可能未完成
        // 正确做法: future.get() 或 future.join()

        Thread.sleep(200); // 等待演示

        // === 最佳实践总结 ===
        printBestPractices();
    }

    /**
     * 最佳实践总结
     */
    private static void printBestPractices() {
        System.out.println(\"n=== 最佳实践总结 ===\");

        System.out.println(\"n 推荐做法:\");
        System.out.println(\"1. 使用自定义线程池\");
        System.out.println(\"2. 处理所有异常(exceptionally/handle)\");
        System.out.println(\"3. 设置超时时间\");
        System.out.println(\"4. 合理使用Async后缀方法\");
        System.out.println(\"5. 批量任务分批处理\");
        System.out.println(\"6. 记录详细日志\");

        System.out.println(\"n 避免做法:\");
        System.out.println(\"1. 在回调中执行阻塞操作\");
        System.out.println(\"2. 忽略异常处理\");
        System.out.println(\"3. 过度使用CompletableFuture\");
        System.out.println(\"4. 创建过多线程\");
        System.out.println(\"5. 循环依赖\");
    }
}

总结

核心知识点回顾

本文深入探讨了JDK 8 CompletableFuture,主要内容包括:

  1. 基础概念

    • Future的局限性
    • CompletableFuture的优势
    • 核心方法分类
  2. 基本操作

    • 创建CompletableFuture
    • 结果处理与转换
    • 链式调用
  3. 高级特性

    • 多任务组合(thenCompose/thenCombine/allOf/anyOf)
    • 异常处理(exceptionally/handle/whenComplete)
    • 线程池配置
  4. 实战应用

    • 电商商品详情页数据聚合
    • 批量数据导入处理
    • API重试机制
  5. 性能优化

    • 线程池配置建议
    • 超时控制
    • 批量任务优化

Future对比

维度 Future CompletableFuture
手动完成
链式调用
多任务组合
异常处理
非阻塞
回调支持

使用场景

适用场景:
+---------------------------+
|  并行调用多个服务        |
|  数据聚合(商品详情页)    |
|  批量任务处理            |
|  异步日志、通知          |
|  超时控制                |
|  降级与容错              |
+---------------------------+

不适用场景:
+---------------------------+
|  简单的顺序调用          |
|  需要事务一致性          |
|  复杂的状态管理          |
|  过度的嵌套依赖          |
+---------------------------+

线程池配置建议

// CPU密集型
int cpuThreads = Runtime.getRuntime().availableProcessors() + 1;

// IO密集型
int ioThreads = Runtime.getRuntime().availableProcessors() * 2;

// 自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize,           // 核心线程数
    maximumPoolSize,        // 最大线程数
    60L,                    // 空闲存活时间
    TimeUnit.SECONDS,
    new LinkedBlockingQueue(1000),  // 有界队列
    new NamedThreadFactory(\"async-\"),  // 自定义名称
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
);

发表评论
暂无评论

还没有评论呢,快来抢沙发~

客服

点击联系客服 点击联系客服

在线时间:09:00-18:00

关注微信公众号

关注微信公众号
客服电话

400-888-8888

客服邮箱 122325244@qq.com

手机

扫描二维码

手机访问本站

扫描二维码
© 2025 左子网 - WWW.ZUOOZI.NET & WordPress Theme. All rights reserved