行业资讯 2025年08月6日
0 收藏 0 点赞 190 浏览 3852 个字
摘要 :

文章目录 NoBlocking 非阻塞队列 构造方法 入队方法 出队方法 成员方法 NoBlocking 非阻塞队列 并发编程中,需要用到安全的队列,实现安全队列可以使用 2 种方式: ……




  • NoBlocking
    • 阻塞队列
    • 构造方法
    • 入队方法
    • 出队方法
    • 成员方法

    NoBlocking

    非阻塞队列

    并发编程中,需要用到安全的队列,实现安全队列可以使用 2 种方式:

    • 加锁,这种实现方式是阻塞队列
    • 使用循环 CAS 算法实现,这种方式是非阻塞队列

    ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,采用先进先出的规则对节点进行排序,当添加一个元素时,会添加到队列的尾部,当获取一个元素时,会返回队列头部的元素
    补充:ConcurrentLinkedDeque 是双向链表结构的无界并发队列
    ConcurrentLinkedQueue 使用约定:

    1. 不允许 null 入列
    2. 队列中所有未删除的节点的 item 都不能为 null 且都能从 head 节点遍历到
    3. 删除节点是将 item 设置为 null,队列迭代时跳过 item 为 null 节点
    4. head 节点跟 tail 不一定指向头节点或尾节点,可能存在滞后性

    ConcurrentLinkedQueue 由 head 节点和 tail 节点组成,每个节点由节点元素和指向下一个节点的引用组成,组成一张链表结构的队列

    private transient volatile Node<E> head;
    private transient volatile Node<E> tail;
    
    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
        //.....
    }
    

    构造方法

    • 无参构造方法:
      public ConcurrentLinkedQueue() {
          // 默认情况下 head 节点存储的元素为空,dummy 节点,tail 节点等于 head 节点
          head = tail = new Node<E>(null);
      }
      
    • 有参构造方法
      public ConcurrentLinkedQueue(Collection<? extends E> c) {
          Node<E> h = null, t = null;
          // 遍历节点
          for (E e : c) {
              checkNotNull(e);
              Node<E> newNode = new Node<E>(e);
              if (h == null)
                  h = t = newNode;
              else {
                  // 单向链表
                  t.lazySetNext(newNode);
                  t = newNode;
              }
          }
          if (h == null)
              h = t = new Node<E>(null);
          head = h;
          tail = t;
      }
      

    入队方法

    与传统的链表不同,单线程入队的工作流程:

    • 将入队节点设置成当前队列尾节点的下一个节点
    • 更新 tail 节点,如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点;如果 tail 节点的 next 节点为空,则将入队节点设置成 tail 的 next 节点,所以 tail 节点不总是尾节点,存在滞后性
    public boolean offer(E e) {
        checkNotNull(e);
        // 创建入队节点
        final Node<E> newNode = new Node<E>(e);
        
        // 循环 CAS 直到入队成功
        for (Node<E> t = tail, p = t;;) {
            // p 用来表示队列的尾节点,初始情况下等于 tail 节点,q 是 p 的 next 节点
            Node<E> q = p.next;
            // 条件成立说明 p 是尾节点
            if (q == null) {
                // p 是尾节点,设置 p 节点的下一个节点为新节点
                // 设置成功则 casNext 返回 true,否则返回 false,说明有其他线程更新过尾节点,继续寻找尾节点,继续 CAS
                if (p.casNext(null, newNode)) {
                    // 首次添加时,p 等于 t,不进行尾节点更新,所以尾节点存在滞后性
                    if (p != t)
                        // 将 tail 设置成新入队的节点,设置失败表示其他线程更新了 tail 节点
                        casTail(t, newNode); 
                    return true;
                }
            }
            else if (p == q)
                // 当 tail 不指向最后节点时,如果执行出列操作,可能将 tail 也移除,tail 不在链表中 
                // 此时需要对 tail 节点进行复位,复位到 head 节点
                p = (t != (t = tail)) ? t : head;
            else
                // 推动 tail 尾节点往队尾移动
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
    

    图解入队:
    NoBlocking非阻塞队列详解-黑马深入学习Java并发编程笔记
    NoBlocking非阻塞队列详解-黑马深入学习Java并发编程笔记
    NoBlocking非阻塞队列详解-黑马深入学习Java并发编程笔记
    当 tail 节点和尾节点的距离大于等于 1 时(每入队两次)更新 tail,可以减少 CAS 更新 tail 节点的次数,提高入队效率
    线程安全问题:

    • 线程 1 线程 2 同时入队,无论从哪个位置开始并发入队,都可以循环 CAS,直到入队成功,线程安全
    • 线程 1 遍历,线程 2 入队,所以造成 ConcurrentLinkedQueue 的 size 是变化,需要加锁保证安全
    • 线程 1 线程 2 同时出列,线程也是安全的

    出队方法

    出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用,并不是每次出队都更新 head 节点

    • 当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点
    • 当 head 节点里没有元素时,出队操作才会更新 head 节点

    批处理方式可以减少使用 CAS 更新 head 节点的消耗,从而提高出队效率

    public E poll() {
        restartFromHead:
        for (;;) {
            // p 节点表示首节点,即需要出队的节点,FIFO
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                // 如果 p 节点的元素不为 null,则通过 CAS 来设置 p 节点引用元素为 null,成功返回 item
                if (item != null && p.casItem(item, null)) {
                    if (p != h)    
                           // 对 head 进行移动
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                   // 逻辑到这说明头节点的元素为空或头节点发生了变化,头节点被另外一个线程修改了
                // 那么获取 p 节点的下一个节点,如果 p 节点的下一节点也为 null,则表明队列已经空了
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                  // 第一轮操作失败,下一轮继续,调回到循环前
                else if (p == q)
                    continue restartFromHead;
                // 如果下一个元素不为空,则将头节点的下一个节点设置成头节点
                else
                    p = q;
            }
        }
    }
    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            // 将旧结点 h 的 next 域指向为 h,help gc
            h.lazySetNext(h);
    }
    

    在更新完 head 之后,会将旧的头结点 h 的 next 域指向为 h,图中所示的虚线也就表示这个节点的自引用,被移动的节点(item 为 null 的节点)会被 GC 回收
    NoBlocking非阻塞队列详解-黑马深入学习Java并发编程笔记
    NoBlocking非阻塞队列详解-黑马深入学习Java并发编程笔记
    NoBlocking非阻塞队列详解-黑马深入学习Java并发编程笔记
    如果这时,有一个线程来添加元素,通过 tail 获取的 next 节点则仍然是它本身,这就出现了p == q 的情况,出现该种情况之后,则会触发执行 head 的更新,将 p 节点重新指向为 head
    参考文章:https://www.jianshu.com/p/231caf90f30b


    成员方法

    • peek():会改变 head 指向,执行 peek() 方法后 head 会指向第一个具有非空元素的节点
      // 获取链表的首部元素,只读取而不移除
      public E peek() {
          restartFromHead:
          for (;;) {
              for (Node<E> h = head, p = h, q;;) {
                  E item = p.item;
                  if (item != null || (q = p.next) == null) {
                      // 更改h的位置为非空元素节点
                      updateHead(h, p);
                      return item;
                  }
                  else if (p == q)
                      continue restartFromHead;
                  else
                      p = q;
              }
          }
      }
      
    • size():用来获取当前队列的元素个数,因为整个过程都没有加锁,在并发环境中从调用 size 方法到返回结果期间有可能增删元素,导致统计的元素个数不精确
      public int size() {
          int count = 0;
          // first() 获取第一个具有非空元素的节点,若不存在,返回 null
          // succ(p) 方法获取 p 的后继节点,若 p == p.next,则返回 head
          // 类似遍历链表
          for (Node<E> p = first(); p != null; p = succ(p))
              if (p.item != null)
                  // 最大返回Integer.MAX_VALUE
                  if (++count == Integer.MAX_VALUE)
                      break;
          return count;
      }
      
    • remove():移除元素
      public boolean remove(Object o) {
          // 删除的元素不能为null
          if (o != null) {
              Node<E> next, pred = null;
              for (Node<E> p = first(); p != null; pred = p, p = next) {
                  boolean removed = false;
                  E item = p.item;
                  // 节点元素不为null
                  if (item != null) {
                      // 若不匹配,则获取next节点继续匹配
                      if (!o.equals(item)) {
                          next = succ(p);
                          continue;
                      }
                      // 若匹配,则通过 CAS 操作将对应节点元素置为 null
                      removed = p.casItem(item, null);
                  }
                  // 获取删除节点的后继节点
                  next = succ(p);
                  // 将被删除的节点移除队列
                  if (pred != null && next != null) // unlink
                      pred.casNext(p, next);
                  if (removed)
                      return true;
              }
          }
          return false;
      }
      

微信扫一扫

支付宝扫一扫

版权: 转载请注明出处:https://www.zuozi.net/10425.html

管理员

相关推荐
2025-08-06

文章目录 一、Reader 接口概述 1.1 什么是 Reader 接口? 1.2 Reader 与 InputStream 的区别 1.3 …

988
2025-08-06

文章目录 一、事件溯源 (一)核心概念 (二)Kafka与Golang的优势 (三)完整代码实现 二、命令…

465
2025-08-06

文章目录 一、证明GC期间执行native函数的线程仍在运行 二、native线程操作Java对象的影响及处理方…

348
2025-08-06

文章目录 一、事务基础概念 二、MyBatis事务管理机制 (一)JDBC原生事务管理(JdbcTransaction)…

456
2025-08-06

文章目录 一、SnowFlake算法核心原理 二、SnowFlake算法工作流程详解 三、SnowFlake算法的Java代码…

517
2025-08-06

文章目录 一、本地Jar包的加载操作 二、本地Class的加载方法 三、远程Jar包的加载方式 你知道Groo…

832
发表评论
暂无评论

还没有评论呢,快来抢沙发~

助力内容变现

将您的收入提升到一个新的水平

点击联系客服

在线时间:08:00-23:00

客服QQ

122325244

客服电话

400-888-8888

客服邮箱

122325244@qq.com

扫描二维码

关注微信客服号