行业资讯 2025年08月6日
0 收藏 0 点赞 304 浏览 4304 个字
摘要 :

文章目录 生产者–消费者模式所遵循的规则 生产者-消费者模型的实现 1、通过阻塞队列方式实现 2、通过wait和notifyAll来实现 3、通过ReentrantLock和Condition来……




  • 生产者消费者模式所遵循的规则
  • 生产者-消费者模型的实现
    • 1、通过阻塞队列方式实现
    • 2、通过wait和notifyAll来实现
    • 3、通过ReentrantLock和Condition来实现
  •  总结

在Java中,负责生产数据的是生产者,负责使用数据的是消费者。没有数据时,消费者等待;数据满时,生产者等待。

生产者-消费者模型(Producer-Consumer problem)是一个非常经典的多线程并发协作的模型。

比如某个模块负责生产数据,而另一个模块负责处理数据。产生数据的模块就形象地被称为生产者;而处理数据的模块,则被称为消费者。

生产者和消费者在同一段时间内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。Java多线程:线程间通信(生产者消费者模式)

生产者-消费者模式所遵循的规则

  • 生产者仅仅在缓冲区未满时生产,缓冲区满则停止生产。
  • 消费者仅仅在缓冲区有产品时才能消费,缓冲区为空则停止消费。
  • 当消费者发现缓冲区没有可消费的产品时会通知生产者。
  • 当生产者生产出可消费的产品时,应该通知等待的消费者去消费。

生产者-消费者模型的实现

生产者-消费者模型的实现可以有多重实现方式,这里我们重点介绍3种,对于初学者,可以先重点掌握第2种方式。

1、通过阻塞队列方式实现

public class ProducerConsumerDemo1 {
 
    /**
     * 缓冲队列
     */
    private final ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
 
    /**
     * 生产者
     */
    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.put(i);
            }
        }
    }
 
    /**
     * 消费者
     */
    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
            }
        }
    }
 
    public Producer getProducer() {
        return new Producer();
    }
 
    public Consumer getConsumer() {
        return new Consumer();
    }
 
 
    public static void main(String[] args) {
        ProducerConsumerDemo1 producerConsumerDemo1 = new ProducerConsumerDemo1();
        new Thread(producerConsumerDemo1.getProducer()).start();
        new Thread(producerConsumerDemo1.getConsumer()).start();
    }
}

2、通过wait和notifyAll来实现

    /**
     * 缓冲区(仓库)
     */
    private final List<Object> list = new ArrayList<>();
 
    private final int bufferCount = 10;
 
    public final Object lock = new Object();
 
    /**
     * 生产者
     */
    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
 
                synchronized (lock) {
                    while (list.size() >= bufferCount) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    //仓库未满,继续生产产品
                    list.add(new Object());
                    //唤醒消费者去消费产品
                    lock.notifyAll();
                }
            }
        }
    }
 
    /**
     * 消费者
     */
    class Consumer implements Runnable {
 
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
 
                synchronized (lock) {
                    while (list.size() == 0) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    //仓库不是空,继续消费
                    list.remove(0);
                    //唤醒生产者去生产产品
                    lock.notifyAll();
                }
            }
        }
    }
 
    public Producer getProducer(){
        return new Producer();
    }
 
    public Consumer getConsumer(){
        return new Consumer();
    }
 
    public static void main(String[] args) {
        ProducerConsumerDemo2 producerConsumerDemo2=new ProducerConsumerDemo2();
        new Thread(producerConsumerDemo2.getProducer()).start();
        new Thread(producerConsumerDemo2.getConsumer()).start();
    }
}

注意:为什么使用notifyAll()而不使用notify()?

因为假设只有一个生产者和消费者:那么生产者和消费者将按顺序执行。

但是如果有多个生产者和消费者,那么可能出现假死现象:

  • 1)一个消费者唤醒了一个生产者,但是在这个生产者拿到锁之前,另一个消费者抢先拿到了锁;
  • 2)三个生产者全部等待,某个消费者唤醒的不是生产者,而是另一个消费者;

而解决上述假死现象的方法是使用notifyAll()替换notify(),保证消费者唤醒了生产者,生产者唤醒了消费者。

3、通过ReentrantLock和Condition来实现

public class ProducerConsumerDemo3 {
 
    /**
     * 缓冲区(仓库)
     */
    private final List<Object> list = new ArrayList<>();
    /**
     * 缓冲区大小
     */
    private final int bufferCount = 10;
    public ReentrantLock lock = new ReentrantLock();
    //创建两个条件变量
    private final Condition condition1 = lock.newCondition();
    private final Condition condition2 = lock.newCondition();
 
 
    /**
     * 生产者
     */
    class Producer implements Runnable {
 
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);//模拟生产操作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
 
                try {
                    lock.lock();
                    while (list.size() >= bufferCount) {
                        condition1.await();//当仓库数据数量超过缓冲区设定的最大数量,则让生产线程进入等待状态
                    }
 
                    list.add(new Object());
                    System.out.println(Thread.currentThread().getName() + \"-生产者生产,数量为:\" + list.size());
                    condition2.signal();//唤醒消费线程
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
 
            }
        }
    }
 
    class Consumer implements Runnable {
 
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
 
                try {
                    lock.lock();
                    while (list.size() == 0) {
                        condition2.await();//当仓库中数据为空时,则让消费线程进入等待状态
                    }
                    list.remove(0);
                    System.out.println(Thread.currentThread().getName() + \"-消费者消费,数量为:\" + list.size());
                    condition1.signal();//唤醒生产线程
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
 
    public Producer getProducer() {
        return new Producer();
    }
 
    public Consumer getConsumer() {
        return new Consumer();
    }
 
    public static void main(String[] args) {
        ProducerConsumerDemo3 producerConsumerDemo3 = new ProducerConsumerDemo3();
        new Thread(producerConsumerDemo3.getProducer()).start();
        new Thread(producerConsumerDemo3.getConsumer()).start();
    }
}

 总结

以上就是Java多线程:线程间通信(生产者消费者模式)的全部内容。

微信扫一扫

支付宝扫一扫

版权: 转载请注明出处:https://www.zuozi.net/9681.html

管理员

相关推荐
2025-08-06

文章目录 一、Reader 接口概述 1.1 什么是 Reader 接口? 1.2 Reader 与 InputStream 的区别 1.3 …

988
2025-08-06

文章目录 一、事件溯源 (一)核心概念 (二)Kafka与Golang的优势 (三)完整代码实现 二、命令…

465
2025-08-06

文章目录 一、证明GC期间执行native函数的线程仍在运行 二、native线程操作Java对象的影响及处理方…

348
2025-08-06

文章目录 一、事务基础概念 二、MyBatis事务管理机制 (一)JDBC原生事务管理(JdbcTransaction)…

456
2025-08-06

文章目录 一、SnowFlake算法核心原理 二、SnowFlake算法工作流程详解 三、SnowFlake算法的Java代码…

517
2025-08-06

文章目录 一、本地Jar包的加载操作 二、本地Class的加载方法 三、远程Jar包的加载方式 你知道Groo…

832
发表评论
暂无评论

还没有评论呢,快来抢沙发~

助力内容变现

将您的收入提升到一个新的水平

点击联系客服

在线时间:08:00-23:00

客服QQ

122325244

客服电话

400-888-8888

客服邮箱

122325244@qq.com

扫描二维码

关注微信客服号