行业资讯 2025年08月6日
0 收藏 0 点赞 451 浏览 4445 个字
摘要 :

文章目录 首先定义扩展线程池ExtThreadPoolExecutor 定义任务队列WorkQueue 本文主要讲解Java多线程:ThreadPoolExecutor+BlockingQueue线程池示例,我们来一起看下代……




  • 首先定义扩展线程池ExtThreadPoolExecutor
    • 定义任务队列WorkQueue

    本文主要讲解Java多线程:ThreadPoolExecutor+BlockingQueue线程池示例,我们来一起看下代码吧!

    首先定义扩展线程池ExtThreadPoolExecutor

    ExtThreadPoolExecutor作用是对线程池的增强,如在初始化线程池时、在线程执行前、执行后等处可添加自定义逻辑。

    public class ExtThreadPoolExecutor extends ThreadPoolExecutor{
       
         
    
        public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
            init();
        }
    
        public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
            init();
        }
    
        public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
            init();
        }
    
        public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
            init();
        }
    
        private void init(){
            System.out.println(\"ExtThreadPoolExecutor init......\");
        }
    
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            System.out.println(\"beforeExecute......   begin\" );
            super.beforeExecute(t, r);
            System.out.println(\"beforeExecute......   end\" );
        }
    
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            System.out.println(\"afterExecute......   begin\" );
            super.afterExecute(r, t);
            System.out.println(\"afterExecute......   end\" );
        }
    
    }
    

    定义任务队列WorkQueue

    通过BlockingQueue存放任务线程,该处使用生产者、消费者模式。

    public class WorkQueue {
        private volatile static BlockingQueue<WorkEvent> queue;
    
        private WorkQueue(){}
    
        /**
         * 初始化队列,延迟初始化,其实也可使用内部类单例模式
         */
        private static void init(){
    
            if(queue == null){
                System.out.println(\"WorkQueue.queue null  init........\");
                synchronized (WorkQueue.class) {
                    System.out.println(\"WorkQueue.queue after synchronized still null  init........\");
                    if (queue == null) {
                        queue = new LinkedBlockingDeque<WorkEvent>();
                    }
                }
            }
        }
    
        public static  void putWorkEvent(WorkEvent workEvent){
            init();
            try {
                queue.put(workEvent);
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println(\"WorkQueue.putWorkEvent  fail........\");
            }
        }
    
        public static BlockingQueue<WorkEvent> getQueue() {
            return queue;
        }
    
    }
    

    业务处理

    public class EventHandler {
    
        /**
         * 处理业务
         * @param workEvent
         */
        public static void handle(WorkEvent workEvent){
            System.out.println(\"正在处理,workNo=[\" + workEvent.getWorkNo() + \"]\");
        }
    }
    

    工作线程:
    消费者端,阻塞接收消息,并将消息传给实际需要者。

    public class WorkThread implements Runnable{
       
         
        @Override
        public void run() {
            while (true) {
                try {
                    WorkEvent workEvent = WorkQueue.getQueue().take();
                    System.out.println(\"ThreadName[\"  +  Thread.currentThread().getName() +  \"], 获取到workEvent,workNo=[\" + workEvent.getWorkNo() + \"], ready handle\");
                    EventHandler.handle(workEvent);
                    System.out.println(\"ThreadName[\"  +  Thread.currentThread().getName() +  \"], 获取到workEvent,workNo=[\" + workEvent.getWorkNo() + \"], finish handle\");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
        }
    }
    

    消息实体

    public class WorkEvent implements Serializable{
       
         
    
        private static final long serialVersionUID = -1739230985770176506L;
    
        /**
         * 任务编号
         */
        private String workNo;
    
        /**
         * 执行次数
         */
        private AtomicInteger num;
    
        public WorkEvent(String workNo) {
            this.workNo = workNo;
            this.num = new AtomicInteger(0);
        }
    
        public String getWorkNo() {
            return workNo;
        }
    
        public void setWorkNo(String workNo) {
            this.workNo = workNo;
        }
    
        public AtomicInteger getNum() {
            return num;
        }
    
        public void setNum() {
            this.num.incrementAndGet();
        }
    
    }
    

    调用示例:

    public class StartWork {
        public static void main(String[] args) {
    
            System.out.println(\"准备放任务线程\");
            int workNum = 6;
            for (int i = 0; i < workNum; i++) {
                WorkEvent workEvent = new WorkEvent(\"任务线程\" + i);
                WorkQueue.putWorkEvent(workEvent);
            }
    
            // 初始化线程池
            ExtThreadPoolExecutor executor = new ExtThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
            // 先准备工作线程
            System.out.println(\"准备五个工作线程\");
            executor.execute(new WorkThread());
            executor.execute(new WorkThread());
            executor.execute(new WorkThread());
            executor.execute(new WorkThread());
            executor.execute(new WorkThread());
    
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println(\"10s后 。。。 准备放任务线程\");
            for (int i = 0; i < workNum; i++) {
                WorkEvent workEvent = new WorkEvent(\"10s 后 任务线程\" + i);
                WorkQueue.putWorkEvent(workEvent);
            }
        }
    }
    

    结果输出:Java多线程:ThreadPoolExecutor+BlockingQueue线程池示例详解代码大体流程:消息定义成实体WorkEvent,放入WorkQueue中,然后由ExtThreadPoolExecutor线程池开启接收端线程WorkThread,由WorkThread获取消息,并通知实际需要者EventHandler,EventHandler处理消息。

    以上就是Java多线程:ThreadPoolExecutor+BlockingQueue线程池示例详解的全部内容,希望对你有帮助!

微信扫一扫

支付宝扫一扫

版权: 转载请注明出处:https://www.zuozi.net/9717.html

管理员

相关推荐
2025-08-06

文章目录 一、Reader 接口概述 1.1 什么是 Reader 接口? 1.2 Reader 与 InputStream 的区别 1.3 …

988
2025-08-06

文章目录 一、事件溯源 (一)核心概念 (二)Kafka与Golang的优势 (三)完整代码实现 二、命令…

465
2025-08-06

文章目录 一、证明GC期间执行native函数的线程仍在运行 二、native线程操作Java对象的影响及处理方…

348
2025-08-06

文章目录 一、事务基础概念 二、MyBatis事务管理机制 (一)JDBC原生事务管理(JdbcTransaction)…

456
2025-08-06

文章目录 一、SnowFlake算法核心原理 二、SnowFlake算法工作流程详解 三、SnowFlake算法的Java代码…

517
2025-08-06

文章目录 一、本地Jar包的加载操作 二、本地Class的加载方法 三、远程Jar包的加载方式 你知道Groo…

832
发表评论
暂无评论

还没有评论呢,快来抢沙发~

助力内容变现

将您的收入提升到一个新的水平

点击联系客服

在线时间:08:00-23:00

客服QQ

122325244

客服电话

400-888-8888

客服邮箱

122325244@qq.com

扫描二维码

关注微信客服号