深入解析CompletableFuture源码实现(2)———双源输入

2025-12-04 0 974

前言

CompletableFuture(CF) 提供了一种灵活的方式来处理异步计算。通过其丰富的 API,开发者可以轻松地组合多个异步任务。然而,其内部实现涉及复杂的状态管理和线程安全机制。本文将通过源码解析,揭示 CompletableFuture 的内部工作原理。

上一篇文章深入解析CompletableFuture源码实现 中我们分析了thenApply的实现,这篇文章中将对双输入继续进行分析。

四、以双输入 BiApply 为例分析

回调数据类 BiApply 和执行回调方法 biApply

abstract static class BiCompletion extends UniCompletion {
    // 双源实现的数据类,通过继承实现。实际上也可以通过组合实现。对于子类来说两种实现没有差别,因为字段都是透明的,可以直接访问字段。snd 是缩写,个人感觉不如直接命名为second。
    CompletableFuture snd; // second source for action
    BiCompletion(Executor executor, CompletableFuture dep,
                 CompletableFuture src, CompletableFuture snd) {
        super(executor, dep, src); this.snd = snd;
    }
}

介绍一个简单的优化思路,字段单次读取:为了避免多次读取类中的同一个字段,可以通过声明 + 第一次读取时初始化临时变量实现,实现类似 if-let, switch-let 的效果。

static final class BiApply extends BiCompletion {
    // 通过继承实现,
    BiFunction<? super T,? super U,? extends V> fn;
    BiApply(Executor executor, CompletableFuture dep,
            CompletableFuture src, CompletableFuture snd,
            BiFunction<? super T,? super U,? extends V> fn) {
        // 封装了数据 + 回调
        super(executor, dep, src, snd); this.fn = fn;
    }
    // tryFire 方法触发回调
    final CompletableFuture tryFire(int mode) {
        // 字段单次读取
        CompletableFuture d;
        CompletableFuture a;
        CompletableFuture b;
        Object r, s; BiFunction<? super T,? super U,? extends V> f;
        // 已清理依赖,说明回调已经执行,不必继续执行,返回 NOOP,也就是 null
        if (   (a = src) == null || (r = a.result) == null
            || (b = snd) == null || (s = b.result) == null
            || (d = dep) == null || (f = fn) == null
            // 未清理依赖,执行biApply
            || !d.biApply(r, s, f, mode > 0 ? null : this))
        return null;
        src = null; snd = null; dep = null; fn = null;
        // 调用后续回调
        return d.postFire(a, b, mode);
    }
}

以上代码有一个规律,所有的tryFire实现,都会调用一个对应的CompletableFuture内部方法,这里是biApply。

看下 biApply 的实现,这个方法在 CompletableFuture 中,主要用来执行回调:

final  boolean biApply(Object r, Object s,
                            BiFunction<? super R,? super S,? extends T> f,
                            BiApply c) {
    Throwable x;
    // 用了标签——业务中使用不算多的特性,可以跳出多层嵌套。
    tryComplete: if (result == null) {
        // 两个数据源输入如果任一一个失败,回调方法不必执行,直接跳出嵌套
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                // 异常传播到结果r
                completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }
        // 同上
        if (s instanceof AltResult) {
            if ((x = ((AltResult)s).ex) != null) {
                completeThrowable(x, s);
                break tryComplete;
            }
            s = null;
        }
        try {
            // 仅当c不为空时,尝试获取执行许可
            // 也就是说:异步执行时(在执行器中执行时)mode = ASYNC = 1, 已经获得许可,执行回调
            if (c != null && !c.claim())
                return false;
            @SuppressWarnings(\"unchecked\") R rr = (R) r;
            @SuppressWarnings(\"unchecked\") S ss = (S) s;
            // 执行回调
            completeValue(f.apply(rr, ss));
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}

再来复习一下 Completion#claim 的实现,返回结果表示发放执行许可。

final boolean claim() {
        Executor e = executor;
        if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
            if (e == null)
                return true; // 就地执行,发放许可
            executor = null; // disable;及时清理
            e.execute(this); // 指定执行器执行,无需发放许可给后续执行,所以返回 false
        }
        return false; // if (false): cas失败:说明竞争失败,不发放许可
    }

触发过程

// 就地执行
public  CompletableFuture thenCombine(
    CompletionStage other,
    BiFunction<? super T,? super U,? extends V> fn) {
    return biApplyStage(null, other, fn);
}
// 异步执行,默认执行器
public  CompletableFuture thenCombineAsync(
    CompletionStage other,
    BiFunction<? super T,? super U,? extends V> fn) {
    return biApplyStage(defaultExecutor(), other, fn);
}
// 异步执行,指定执行器
public  CompletableFuture thenCombineAsync(
    CompletionStage other,
    BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
    return biApplyStage(screenExecutor(executor), other, fn);
}

以上方法均调用 biApplyStage:

private  CompletableFuture biApplyStage(
    Executor e, CompletionStage o,
    BiFunction<? super T,? super U,? extends V> f) {
    CompletableFuture b; Object r, s;
    if (f == null || (b = o.toCompletableFuture()) == null)
        throw new NullPointerException();
    CompletableFuture d = newIncompleteFuture();
    if ((r = result) == null || (s = b.result) == null)
        // 入栈,后续回调执行
        bipush(b, new BiApply(e, d, this, b, f));
    else if (e == null)
        // 原地执行
        d.biApply(r, s, f, null);
    else
        // 异步执行
        try {
            // 也就是执行 run ==== tryFire(ASYNC)
            e.execute(new BiApply(null, d, this, b, f));
        } catch (Throwable ex) {
            d.result = encodeThrowable(ex);
        }
    return d;
}

触发过程之入栈分析

final void bipush(CompletableFuture b, BiCompletion c) {
    if (c != null) {
        while (result == null) {
            // 入栈(第一个CompletableFuture)
            if (tryPushStack(c)) {
                if (b.result == null)
                    // 入栈(第二个CompletableFuture)
                    b.unipush(new CoCompletion(c));
                else if (result != null)
                    // 不入栈,有结果时可以直接执行
                    c.tryFire(SYNC);
                return;
            }
        }
        b.unipush(c);
    }
}

入栈的逻辑是:尝试入栈;如果入栈的过程中结果已知,放弃入栈,直接触发回调(tryFire)。unipush内部执行相似逻辑,不赘述。

CoCompletion 封装了BiCompletion,委托模式。

static final class CoCompletion extends Completion {
    BiCompletion base;
    CoCompletion(BiCompletion base) { this.base = base; }
    final CompletableFuture tryFire(int mode) {
        BiCompletion c; CompletableFuture d;
        if ((c = base) == null || (d = c.tryFire(mode)) == null)
            return null;
        base = null; // detach
        return d;
    }

这里tryFire会调用两次,由第一个CF和第二个CF触发,只有第二次触发时会执行回调。入栈后的触发模式为嵌套模式,和前文对应不再赘述。

总结 BiApply 执行流程

a, b, c 表示两个输入和一个输出,最终结果 end 表示 c.postFire,后续的触发不在这次流程分析中。

1. 立即执行

a, b已知结果 ->> biApplyStage() ->> c.biApply(a, b, …) ->> 执行回调函数 ->> end

2. 原地执行,指定执行器

回调函数在执行器 e 中执行,提交任务的动作在当前线程,这个任务是立即执行的。

t1: a, b 已知结果 ->> biApplyStage() ->> e.execute(new BiApply(…))

t2: BiApply#run, 也就是 BiApply#tryFire(ASYNC) ->> c.biApply(a, b, null: executor) ->> 执行回调函数 ->> end

3. 异步执行

回调函数在执行器 e 中执行,执行器e的提交动作由其他回调(也就是 Completion)完成。这里thisCompletion就是创建的BiApply。

t1: a,b 已知结果小于2 ->> biApplyStage() ->> 创建回调 Completion == BiApply ->> 入栈a,b
t2: a 已知结果(postComplete, 深度优先搜索通知到c)
t3: b 已知结果(postComplete, 深度优先搜索通知到c)
t2/t3: a或b第二次通知到c == BiApply#tryFire(NESTED) ->> c.biApply(a, b, e, thisCompletion) ->> c.claim() ->> CAS成功,e.execute(thisCompletion)
t4: BiApply#run, 也就是 BiApply#tryFire(ASYNC) ->> c.biApply(a, b, null: executor) ->> 执行回调函数 ->> end

异步执行流程可视化

核心概念:

  • Completion (BiApply): 一个回调对象,它等待两个前置 Completion (a, b) 完成,然后执行一个双参数函数。
  • Executor (e): 一个线程池或执行器,负责异步地执行提交给它的任务。
  • postComplete: 前置 Completion 完成后,通知其依赖者(这里是 BiApply)的机制。
  • claim(): BiApply 内部的原子操作,确保在多个前置 Completion 同时完成时,只有一个线程负责将 BiApply 提交到 Executor。
  • NESTED vs ASYNC: tryFire 的两种调用模式。NESTED 表示在通知链中直接调用,ASYNC 表示在 Executor 线程中作为 Runnable 调用。

参与者:

  • Completion A: 第一个前置任务。
  • Completion B: 第二个前置任务。
  • BiApply C (thisCompletion): 我们的回调函数,依赖于 A 和 B。
  • Executor e: 异步执行器。
  • Thread T_Notify (T_A / T_B): 完成 A 或 B 的线程。
  • Thread T_Executor (T_E): Executor e 中的工作线程。

流程图:

sequenceDiagram
    participant T_Notify_A as Thread T_Notify (Completes A)
    participant T_Notify_B as Thread T_Notify (Completes B)
    participant Completion_A as Completion A
    participant Completion_B as Completion B
    participant BiApply_C as BiApply C (thisCompletion)
    participant Executor_e as Executor e
    participant T_Executor as Thread T_Executor (from e)

    Note over T_Notify_A, T_Executor: **t1: 初始化阶段**
    T_Notify_A->>BiApply_C: biApplyStage()
    activate BiApply_C
    BiApply_C->>Completion_A: 注册为依赖 (入栈)
    BiApply_C->>Completion_B: 注册为依赖 (入栈)
    deactivate BiApply_C
    Note right of BiApply_C: BiApply C 等待 A 和 B 完成

    Note over T_Notify_A, T_Executor: **t2: Completion A 完成**
    T_Notify_A->>Completion_A: A.complete(resultA)
    activate Completion_A
    Completion_A->>BiApply_C: postComplete() -> tryFire(NESTED)
    activate BiApply_C
    Note right of BiApply_C: C 检查:A 已完成,B 未完成。
不满足执行条件,不提交。 deactivate BiApply_C deactivate Completion_A Note left of Completion_A: A 完成,通知 C,C 仍在等待 B Note over T_Notify_A, T_Executor: **t3: Completion B 完成 (触发提交)** T_Notify_B->>Completion_B: B.complete(resultB) activate Completion_B Completion_B->>BiApply_C: postComplete() -> tryFire(NESTED) activate BiApply_C Note right of BiApply_C: C 检查:A 已完成,B 已完成。
满足执行条件。 BiApply_C->>BiApply_C: c.claim() (CAS操作) alt CAS成功 (当前线程获得执行权) BiApply_C->>Executor_e: e.execute(thisCompletion)
(提交 BiApply C 到执行器) activate Executor_e Note right of Executor_e: BiApply C (作为一个 Runnable)
进入 Executor e 的任务队列。 else CAS失败 (其他线程已提交) Note right of BiApply_C: 另一个线程已提交,当前线程退出。 end deactivate BiApply_C deactivate Completion_B Note left of Completion_B: B 完成,通知 C,C 发现条件满足并提交自身到 Executor。 Note over T_Notify_A, T_Executor: **t4: BiApply C 在 Executor 线程中执行** Executor_e->>T_Executor: 从队列中取出 BiApply C deactivate Executor_e activate T_Executor T_Executor->>BiApply_C: BiApply C.run() activate BiApply_C BiApply_C->>BiApply_C: tryFire(ASYNC)
(内部调用 c.biApply(a, b, null)) Note right of BiApply_C: 执行 BiApply 的回调函数
(例如:(resA, resB) -> { /* 用户逻辑 */ }) BiApply_C-->>T_Executor: 回调执行完成 deactivate BiApply_C deactivate T_Executor Note over T_Notify_A, T_Executor: **流程结束**

关键点总结:

  • 异步性: BiApply 的实际回调函数执行与触发它的 Completion 完成事件发生在不同的线程上(通过 Executor e 进行线程切换)。
  • 通知与执行分离: postComplete 负责通知依赖者,实际的回调执行被推迟到 Executor 线程。
  • claim() 的作用: 确保在多个前置任务并发完成时,BiApply 的回调函数只被提交和执行一次。
  • tryFire(NESTED) vs tryFire(ASYNC): NESTED 用于在通知链中检查条件和提交任务;ASYNC 用于在 Executor 线程中实际执行回调。null: executorASYNC 阶段表示不再需要进一步提交,直接执行。

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

申明:本文由第三方发布,内容仅代表作者观点,与本网站无关。对本文以及其中全部或者部分内容的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。本网发布或转载文章出于传递更多信息之目的,并不意味着赞同其观点或证实其描述,也不代表本网对其真实性负责。

左子网 开发教程 深入解析CompletableFuture源码实现(2)———双源输入 https://www.zuozi.net/3329.html

常见问题
  • 1、自动:拍下后,点击(下载)链接即可下载;2、手动:拍下后,联系卖家发放即可或者联系官方找开发者发货。
查看详情
  • 1、源码默认交易周期:手动发货商品为1-3天,并且用户付款金额将会进入平台担保直到交易完成或者3-7天即可发放,如遇纠纷无限期延长收款金额直至纠纷解决或者退款!;
查看详情
  • 1、描述:源码描述(含标题)与实际源码不一致的(例:货不对板); 2、演示:有演示站时,与实际源码小于95%一致的(但描述中有”不保证完全一样、有变化的可能性”类似显著声明的除外); 3、发货:不发货可无理由退款; 4、安装:免费提供安装服务的源码但卖家不履行的; 5、收费:价格虚标,额外收取其他费用的(但描述中有显著声明或双方交易前有商定的除外); 6、其他:如质量方面的硬性常规问题BUG等。 注:经核实符合上述任一,均支持退款,但卖家予以积极解决问题则除外。
查看详情
  • 1、左子会对双方交易的过程及交易商品的快照进行永久存档,以确保交易的真实、有效、安全! 2、左子无法对如“永久包更新”、“永久技术支持”等类似交易之后的商家承诺做担保,请买家自行鉴别; 3、在源码同时有网站演示与图片演示,且站演与图演不一致时,默认按图演作为纠纷评判依据(特别声明或有商定除外); 4、在没有”无任何正当退款依据”的前提下,商品写有”一旦售出,概不支持退款”等类似的声明,视为无效声明; 5、在未拍下前,双方在QQ上所商定的交易内容,亦可成为纠纷评判依据(商定与描述冲突时,商定为准); 6、因聊天记录可作为纠纷评判依据,故双方联系时,只与对方在左子上所留的QQ、手机号沟通,以防对方不承认自我承诺。 7、虽然交易产生纠纷的几率很小,但一定要保留如聊天记录、手机短信等这样的重要信息,以防产生纠纷时便于左子介入快速处理。
查看详情

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务