首页 开发教程 开始改变第五天 Java并发(1)

开始改变第五天 Java并发(1)

开发教程 2025年12月4日
972 浏览

Java并发与集合核心知识点详解:从基础原理到实战应用

在Java开发中,并发编程和集合框架是每个开发者必须掌握的核心技能。无论是构建高并发的后端服务,还是优化系统性能,深入理解这些知识点都至关重要。本文将详细解析AQS、CAS、Lock锁、多线程通信、HashMap源码、ConcurrentHashMap以及线程池等关键知识点。

1. AQS、CAS与Lock锁原理深度解析

1.1 CAS(Compare And Swap)无锁编程基石

CAS是一种基于硬件指令实现的无锁原子操作,它是现代并发编程的基石。CAS操作包含三个基本操作数:内存位置(V)、期望的原值(A)和新值(B)。

核心思想:我认为位置V的值应该是A,如果是的话,我就将它更新为B,否则不做任何修改,并告诉我当前的实际值是多少。

public class CASPrinciple {
    /**
     * CAS操作模拟(实际由CPU硬件指令保证原子性)
     * 伪代码表示的真实CAS流程:
     * 1. 读取内存值
     * 2. 比较内存值与期望值
     * 3. 如果相等则写入新值,返回成功
     * 4. 如果不相等则返回失败
     */
    
    // 实际开发中使用Atomic系列类
    private AtomicInteger atomicValue = new AtomicInteger(0);
    
    public void demonstrateCAS() {
        System.out.println(\"=== CAS原理演示 ===\");
        
        // 第一次CAS:期望0,更新为1,应该成功
        boolean firstAttempt = atomicValue.compareAndSet(0, 1);
        System.out.println(\"第一次CAS(期望0→更新1): \" + firstAttempt + \", 当前值: \" + atomicValue.get());
        
        // 第二次CAS:期望0,更新为2,应该失败(因为当前值是1)
        boolean secondAttempt = atomicValue.compareAndSet(0, 2);
        System.out.println(\"第二次CAS(期望0→更新2): \" + secondAttempt + \", 当前值: \" + atomicValue.get());
        
        // 第三次CAS:期望1,更新为3,应该成功
        boolean thirdAttempt = atomicValue.compareAndSet(1, 3);
        System.out.println(\"第三次CAS(期望1→更新3): \" + thirdAttempt + \", 当前值: \" + atomicValue.get());
    }
    
    public static void main(String[] args) {
        new CASPrinciple().demonstrateCAS();
    }
}

CAS的三大问题及解决方案:

  1. ABA问题:值从A→B→A,CAS无法感知中间变化

    • 解决方案:使用AtomicStampedReference添加版本号
  2. 循环时间长开销大:CAS失败时会自旋重试,消耗CPU

    • 解决方案:设置合理的自旋次数或使用锁
  3. 只能保证单个变量原子性

    • 解决方案:使用AtomicReference包装多个变量

1.2 AQS(AbstractQueuedSynchronizer)同步器框架

AQS是Java并发包的核心基础框架,它提供了一个FIFO等待队列和一个原子状态管理机制。大多数同步器(如ReentrantLock、CountDownLatch等)都是基于AQS构建的。

AQS核心组成:

  • state:volatile int,表示同步状态
  • CLH队列:线程等待队列
  • ConditionObject:条件变量
/**
 * 自定义共享锁实现 - 基于AQS
 * 这个示例展示如何用AQS构建一个允许多个线程同时访问的共享锁
 */
class CustomSharedLock {
    private final Sync sync;
    
    public CustomSharedLock(int permits) {
        this.sync = new Sync(permits);
    }
    
    // 内部同步器类,继承AQS
    private static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            setState(permits); // 初始化许可数量
        }
        
        /**
         * 尝试获取共享锁
         * @param acquires 请求的许可数量
         * @return 返回剩余许可,负数表示获取失败
         */
        @Override
        protected int tryAcquireShared(int acquires) {
            // 自旋CAS操作
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                
                // 如果剩余许可为负,或者CAS设置成功,返回结果
                if (remaining < 0 || compareAndSetState(available, remaining)) {
                    return remaining;
                }
            }
        }
        
        /**
         * 释放共享锁
         */
        @Override
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) { // 溢出检查
                    throw new Error(\"Maximum permit count exceeded\");
                }
                if (compareAndSetState(current, next)) {
                    return true;
                }
            }
        }
    }
    
    public void lock() {
        sync.acquireShared(1);
    }
    
    public void unlock() {
        sync.releaseShared(1);
    }
    
    public int getAvailablePermits() {
        return sync.getState();
    }
}

// 测试自定义共享锁
public class AQSDemo {
    public static void main(String[] args) throws InterruptedException {
        System.out.println(\"=== AQS自定义共享锁演示 ===\");
        
        CustomSharedLock sharedLock = new CustomSharedLock(3); // 允许3个线程同时访问
        
        // 创建多个线程测试共享锁
        for (int i = 1; i <= 5; i++) {
            final int threadId = i;
            new Thread(() -> {
                sharedLock.lock();
                try {
                    System.out.println(\"线程\" + threadId + \"获取锁,剩余许可: \" + sharedLock.getAvailablePermits());
                    Thread.sleep(1000); // 模拟业务操作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    sharedLock.unlock();
                    System.out.println(\"线程\" + threadId + \"释放锁\");
                }
            }).start();
            Thread.sleep(200); // 错开线程启动时间
        }
    }
}

1.3 Lock锁实现原理

Lock接口提供了比synchronized更灵活的锁操作,ReentrantLock是其主要实现。

Lock的优势:

  • 可中断的锁获取
  • 超时锁获取
  • 尝试锁获取
  • 公平性选择
public class LockDeepDive {
    private final ReentrantLock lock = new ReentrantLock(true); // 公平锁
    
    public void performTask(String taskName) {
        System.out.println(Thread.currentThread().getName() + \" 尝试获取锁执行: \" + taskName);
        
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + \"  获取锁成功\");
            // 模拟业务处理
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName() + \" 完成任务: \" + taskName);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
            System.out.println(Thread.currentThread().getName() + \" 释放锁\");
        }
    }
    
    // 演示可中断锁
    public void interruptibleTask() throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + \" 尝试可中断锁获取\");
        
        try {
            lock.lockInterruptibly(); // 可中断的锁获取
            try {
                System.out.println(Thread.currentThread().getName() + \" 获取可中断锁成功\");
                Thread.sleep(2000);
            } finally {
                lock.unlock();
            }
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + \" 锁获取被中断\");
            throw e;
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        System.out.println(\"=== Lock锁深度解析 ===\");
        
        LockDeepDive demo = new LockDeepDive();
        
        // 测试基本锁功能
        ExecutorService executor = Executors.newFixedThreadPool(3);
        for (int i = 1; i <= 3; i++) {
            final int taskId = i;
            executor.submit(() -> demo.performTask(\"任务\" + taskId));
        }
        
        Thread.sleep(1000);
        
        // 测试可中断锁
        Thread interruptThread = new Thread(() -> {
            try {
                demo.interruptibleTask();
            } catch (InterruptedException e) {
                System.out.println(\"主线程中断了任务线程\");
            }
        });
        
        interruptThread.start();
        Thread.sleep(100);
        interruptThread.interrupt(); // 中断等待锁的线程
        
        executor.shutdown();
    }
}

2. 多线程通信与生产者-消费者模型

2.1 传统的wait/notify机制

wait/notify是Java最基础的线程间通信机制,但使用起来需要格外小心。

使用要点:

  • 必须在同步代码块中使用
  • wait()会释放锁
  • 要用while循环检查条件,避免虚假唤醒
/**
 * 基于wait/notify的生产者-消费者模型
 * 这个模型演示了经典的线程协作模式
 */
class TraditionalMessageQueue {
    private final List queue = new LinkedList();
    private final int maxSize;
    
    public TraditionalMessageQueue(int maxSize) {
        this.maxSize = maxSize;
    }
    
    /**
     * 生产消息
     */
    public synchronized void produce(String message) throws InterruptedException {
        // 必须用while而不是if,防止虚假唤醒
        while (queue.size() == maxSize) {
            System.out.println(\"队列已满,生产者等待...\");
            wait(); // 释放锁,等待
        }
        
        queue.add(message);
        System.out.println(\" 生产: \" + message + \" | 队列大小: \" + queue.size());
        
        notifyAll(); // 通知所有等待的消费者
    }
    
    /**
     * 消费消息
     */
    public synchronized String consume() throws InterruptedException {
        while (queue.isEmpty()) {
            System.out.println(\"队列为空,消费者等待...\");
            wait(); // 释放锁,等待
        }
        
        String message = queue.remove(0);
        System.out.println(\" 消费: \" + message + \" | 队列大小: \" + queue.size());
        
        notifyAll(); // 通知所有等待的生产者
        return message;
    }
    
    public synchronized int getSize() {
        return queue.size();
    }
}

2.2 更现代的Lock和Condition机制

Lock和Condition提供了更灵活、更安全的线程通信方式。

Condition的优势:

  • 支持多个等待条件
  • 更清晰的API
  • 更好的性能
/**
 * 基于Lock和Condition的增强版生产者-消费者
 * 使用两个Condition分别管理生产者和消费者的等待
 */
class EnhancedMessageQueue {
    private final Lock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();  // 非空条件
    private final Condition notFull = lock.newCondition();   // 非满条件
    private final Queue queue = new LinkedList();
    private final int capacity;
    private int producedCount = 0;
    private int consumedCount = 0;
    
    public EnhancedMessageQueue(int capacity) {
        this.capacity = capacity;
    }
    
    public void produce(String message) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                System.out.println(\" 队列满,生产者[\" + Thread.currentThread().getName() + \"]等待\");
                notFull.await(); // 在notFull条件上等待
            }
            
            queue.offer(message);
            producedCount++;
            System.out.println(\" 生产: \" + message + \" | 队列: \" + queue.size() + 
                             \"/\" + capacity + \" | 总生产: \" + producedCount);
            
            notEmpty.signal(); // 唤醒一个消费者
        } finally {
            lock.unlock();
        }
    }
    
    public String consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                System.out.println(\" 队列空,消费者[\" + Thread.currentThread().getName() + \"]等待\");
                notEmpty.await(); // 在notEmpty条件上等待
            }
            
            String message = queue.poll();
            consumedCount++;
            System.out.println(\" 消费: \" + message + \" | 队列: \" + queue.size() + 
                             \"/\" + capacity + \" | 总消费: \" + consumedCount);
            
            notFull.signal(); // 唤醒一个生产者
            return message;
        } finally {
            lock.unlock();
        }
    }
}

2.3 完整的生产者-消费者演示

public class ProducerConsumerShowcase {
    public static void main(String[] args) throws InterruptedException {
        System.out.println(\"=== 生产者-消费者模型完整演示 ===\");
        
        EnhancedMessageQueue queue = new EnhancedMessageQueue(3);
        
        // 创建多个生产者
        Thread producer1 = new Thread(createProducer(queue, \"P1\"), \"Producer-1\");
        Thread producer2 = new Thread(createProducer(queue, \"P2\"), \"Producer-2\");
        
        // 创建多个消费者
        Thread consumer1 = new Thread(createConsumer(queue, \"C1\"), \"Consumer-1\");
        Thread consumer2 = new Thread(createConsumer(queue, \"C2\"), \"Consumer-2\");
        
        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        
        // 运行10秒后停止
        Thread.sleep(10000);
        
        producer1.interrupt();
        producer2.interrupt();
        consumer1.interrupt();
        consumer2.interrupt();
        
        System.out.println(\"演示结束\");
    }
    
    private static Runnable createProducer(EnhancedMessageQueue queue, String prefix) {
        return () -> {
            int count = 0;
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    String message = prefix + \"-消息-\" + (++count);
                    queue.produce(message);
                    Thread.sleep(800 + (long)(Math.random() * 400)); // 随机睡眠
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.out.println(\"生产者 \" + prefix + \" 被中断\");
                }
            }
        };
    }
    
    private static Runnable createConsumer(EnhancedMessageQueue queue, String prefix) {
        return () -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    queue.consume();
                    Thread.sleep(1000 + (long)(Math.random() * 600)); // 随机睡眠
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.out.println(\"消费者 \" + prefix + \" 被中断\");
                }
            }
        };
    }
}

3. HashMap源码深度解析与ConcurrentHashMap对比

3.1 HashMap核心架构

HashMap是Java中最常用的哈希表实现,其设计精巧且性能优秀。

Java 8+ HashMap的重大改进:

  • 数组+链表+红黑树
  • 链表长度≥8时转红黑树
  • 红黑树节点≤6时转链表
public class HashMapDeepDive {
    
    /**
     * HashMap关键参数解析
     */
    public static void analyzeHashMapParams() {
        System.out.println(\"=== HashMap核心参数 ===\");
        System.out.println(\"默认初始容量: \" + (1 << 4) + \" (16)\"); // DEFAULT_INITIAL_CAPACITY
        System.out.println(\"最大容量: \" + (1 << 30) + \" (2^30)\");   // MAXIMUM_CAPACITY
        System.out.println(\"默认负载因子: 0.75\");                   // DEFAULT_LOAD_FACTOR
        System.out.println(\"树化阈值: 8\");                         // TREEIFY_THRESHOLD
        System.out.println(\"链化阈值: 6\");                         // UNTREEIFY_THRESHOLD
        System.out.println(\"最小树化容量: 64\");                    // MIN_TREEIFY_CAPACITY
    }
    
    /**
     * 演示HashMap的哈希计算和索引定位
     */
    public static void demonstrateHashCalculation() {
        System.out.println(\"n=== HashMap哈希计算过程 ===\");
        
        String[] keys = {\"hello\", \"world\", \"java\", \"hashmap\", \"concurrent\"};
        int capacity = 16; // 默认容量
        
        for (String key : keys) {
            int hashCode = key.hashCode();
            int hash = hashCode ^ (hashCode >>> 16); // HashMap的hash方法
            int index = (capacity - 1) & hash;       // 索引计算
            
            System.out.println(\"Key: \" + key + 
                             \" | HashCode: \" + hashCode +
                             \" | HashMapHash: \" + hash +
                             \" | 桶索引: \" + index);
        }
    }
    
    /**
     * HashMap扩容机制演示
     */
    public static void demonstrateResize() {
        System.out.println(\"n=== HashMap扩容机制 ===\");
        
        Map map = new HashMap(8); // 指定小容量便于观察扩容
        
        // 添加元素触发扩容
        for (int i = 1; i <= 12; i++) {
            String key = \"key\" + i;
            map.put(key, i);
            
            // 模拟HashMap的扩容判断
            if (i == 6) { // 8 * 0.75 = 6,达到扩容阈值
                System.out.println(\"元素数量达到阈值,即将扩容...\");
            }
            
            System.out.println(\"添加 \" + key + \" | 大小: \" + map.size());
        }
        
        System.out.println(\"最终Map: \" + map);
    }
}

// 测试HashMap行为
class HashMapBehaviorTest {
    public static void main(String[] args) {
        HashMapDeepDive.analyzeHashMapParams();
        HashMapDeepDive.demonstrateHashCalculation();
        HashMapDeepDive.demonstrateResize();
        
        // 演示哈希冲突
        demonstrateCollision();
    }
    
    public static void demonstrateCollision() {
        System.out.println(\"n=== 哈希冲突演示 ===\");
        
        Map map = new HashMap(16);
        
        // 精心选择可能产生冲突的key
        String[] collisionKeys = {\"Aa\", \"BB\", \"AaAa\", \"BBBB\", \"AaBB\", \"BBAa\"};
        
        for (String key : collisionKeys) {
            int hash = key.hashCode();
            int index = (16 - 1) & (hash ^ (hash >>> 16));
            map.put(key, hash);
            System.out.println(\"Key: \" + key + \" | Hash: \" + hash + \" | 索引: \" + index);
        }
        
        System.out.println(\"冲突测试Map: \" + map);
    }
}

3.2 HashMap vs ConcurrentHashMap深度对比

public class MapComparison {
    
    /**
     * 线程安全性对比测试
     */
    public static void threadSafetyTest() throws InterruptedException {
        System.out.println(\"=== HashMap vs ConcurrentHashMap 线程安全测试 ===\");
        
        // HashMap测试(线程不安全)
        Map hashMap = new HashMap();
        testMapConcurrency(hashMap, \"HashMap\");
        
        Thread.sleep(1000);
        
        // ConcurrentHashMap测试(线程安全)
        Map concurrentMap = new ConcurrentHashMap();
        testMapConcurrency(concurrentMap, \"ConcurrentHashMap\");
    }
    
    private static void testMapConcurrency(Map map, String mapType) {
        int threadCount = 10;
        int operationsPerThread = 1000;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        
        long startTime = System.currentTimeMillis();
        
        // 创建多个线程同时操作Map
        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            executor.submit(() -> {
                for (int j = 0; j < operationsPerThread; j++) {
                    int key = threadId * operationsPerThread + j;
                    map.put(key, key);
                    
                    // 偶尔读取操作
                    if (j % 100 == 0) {
                        map.get(key);
                    }
                }
            });
        }
        
        executor.shutdown();
        try {
            executor.awaitTermination(1, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        long endTime = System.currentTimeMillis();
        
        System.out.println(mapType + \" 测试结果:\");
        System.out.println(\"  - 期望大小: \" + (threadCount * operationsPerThread));
        System.out.println(\"  - 实际大小: \" + map.size());
        System.out.println(\"  - 耗时: \" + (endTime - startTime) + \"ms\");
        System.out.println(\"  - 数据完整性: \" + 
                          (map.size() == threadCount * operationsPerThread ? \" 完整\" : \" 丢失数据\"));
    }
    
    /**
     * 性能对比测试
     */
    public static void performanceComparison() {
        System.out.println(\"n=== 性能对比测试 ===\");
        
        int elementCount = 100000;
        
        // HashMap性能测试
        long hashMapStart = System.currentTimeMillis();
        Map hashMap = new HashMap();
        for (int i = 0; i < elementCount; i++) {
            hashMap.put(i, \"value\" + i);
        }
        long hashMapTime = System.currentTimeMillis() - hashMapStart;
        
        // ConcurrentHashMap性能测试
        long concurrentMapStart = System.currentTimeMillis();
        Map concurrentMap = new ConcurrentHashMap();
        for (int i = 0; i < elementCount; i++) {
            concurrentMap.put(i, \"value\" + i);
        }
        long concurrentMapTime = System.currentTimeMillis() - concurrentMapStart;
        
        System.out.println(\"单线程插入 \" + elementCount + \" 个元素:\");
        System.out.println(\"  - HashMap: \" + hashMapTime + \"ms\");
        System.out.println(\"  - ConcurrentHashMap: \" + concurrentMapTime + \"ms\");
        System.out.println(\"  - 性能差异: \" + 
                          ((double)concurrentMapTime/hashMapTime) + \"倍\");
    }
    
    /**
     * 特性对比表格
     */
    public static void featureComparison() {
        System.out.println(\"n=== 特性对比 ===\");
        
        System.out.println(\"┌─────────────────┬──────────┬────────────────────┐\");
        System.out.println(\"│     特性        │  HashMap │ ConcurrentHashMap  │\");
        System.out.println(\"├─────────────────┼──────────┼────────────────────┤\");
        System.out.println(\"│   线程安全      │   否     │         是         │\");
        System.out.println(\"│   锁粒度        │   无锁   │      分段锁/桶锁   │\");
        System.out.println(\"│   null键值      │   允许   │        不允许      │\");
        System.out.println(\"│   迭代器        │ fail-fast│ weakly consistent  │\");
        System.out.println(\"│   读性能        │   很高   │         高         │\");
        System.out.println(\"│   写性能        │   高     │        中等        │\");
        System.out.println(\"└─────────────────┴──────────┴────────────────────┘\");
    }
    
    public static void main(String[] args) throws InterruptedException {
        threadSafetyTest();
        performanceComparison();
        featureComparison();
    }
}

4. 线程池参数详解与实现原理

4.1 ThreadPoolExecutor核心参数深度解析

public class ThreadPoolDeepAnalysis {
    
    /**
     * 线程池参数详细说明
     */
    public static void explainThreadPoolParams() {
        System.out.println(\"=== 线程池七大核心参数 ===\");
        
        System.out.println(\"1. corePoolSize - 核心线程数\");
        System.out.println(\"   - 线程池长期维持的线程数量\");
        System.out.println(\"   - 即使线程空闲也不会被回收\");
        System.out.println(\"   - 除非设置allowCoreThreadTimeOut=true\");
        
        System.out.println(\"n2. maximumPoolSize - 最大线程数\");
        System.out.println(\"   - 线程池允许创建的最大线程数量\");
        System.out.println(\"   - 当工作队列满且核心线程都在忙时,会创建新线程\");
        
        System.out.println(\"n3. keepAliveTime - 空闲线程存活时间\");
        System.out.println(\"   - 非核心线程空闲时的最长存活时间\");
        System.out.println(\"   - 超过这个时间,空闲的非核心线程会被回收\");
        
        System.out.println(\"n4. TimeUnit - 时间单位\");
        System.out.println(\"   - keepAliveTime的时间单位\");
        System.out.println(\"   - 如TimeUnit.SECONDS、TimeUnit.MILLISECONDS\");
        
        System.out.println(\"n5. workQueue - 工作队列\");
        System.out.println(\"   - 用于保存等待执行的任务的阻塞队列\");
        System.out.println(\"   - 常用实现:\");
        System.out.println(\"     • ArrayBlockingQueue - 有界队列\");
        System.out.println(\"     • LinkedBlockingQueue - 无界队列\");
        System.out.println(\"     • SynchronousQueue - 同步移交队列\");
        System.out.println(\"     • PriorityBlockingQueue - 优先级队列\");
        
        System.out.println(\"n6. threadFactory - 线程工厂\");
        System.out.println(\"   - 用于创建新线程\");
        System.out.println(\"   - 可以定制线程名称、优先级、守护状态等\");
        
        System.out.println(\"n7. handler - 拒绝策略\");
        System.out.println(\"   - 当线程池和工作队列都满时的处理策略\");
        System.out.println(\"   - 内置策略:\");
        System.out.println(\"     • AbortPolicy - 抛出RejectedExecutionException\");
        System.out.println(\"     • CallerRunsPolicy - 由调用者线程执行\");
        System.out.println(\"     • DiscardPolicy - 静默丢弃任务\");
        System.out.println(\"     • DiscardOldestPolicy - 丢弃队列中最老的任务\");
    }
    
    /**
     * 创建定制化的线程池
     */
    public static ThreadPoolExecutor createCustomThreadPool() {
        int corePoolSize = 2;
        int maxPoolSize = 5;
        long keepAliveTime = 30;
        int queueCapacity = 10;
        
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            corePoolSize,
            maxPoolSize,
            keepAliveTime,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue(queueCapacity),
            new CustomThreadFactory(\"CustomPool\"),
            new CustomRejectionHandler()
        );
        
        // 允许核心线程超时
        executor.allowCoreThreadTimeOut(true);
        
        return executor;
    }
    
    // 自定义线程工厂
    static class CustomThreadFactory implements ThreadFactory {
        private final String namePrefix;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        
        CustomThreadFactory(String poolName) {
            namePrefix = poolName + \"-thread-\";
        }
        
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
            t.setDaemon(false);
            t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    
    // 自定义拒绝策略
    static class CustomRejectionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println(\"任务被拒绝,执行降级策略: \" + r);
            // 可以记录日志、持久化任务、或者执行其他降级逻辑
        }
    }
}

4.2 线程池工作流程完整演示

public class ThreadPoolWorkflowDemo {
    
    public static void demonstrateWorkflow() throws InterruptedException {
        System.out.println(\"=== 线程池工作流程演示 ===\");
        
        // 创建小型线程池便于观察
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2,      // 核心线程2个
            4,      // 最大线程4个  
            5,      // 空闲线程存活5秒
            TimeUnit.SECONDS,
            new ArrayBlockingQueue(3),  // 容量3的工作队列
            new ThreadPoolExecutor.CallerRunsPolicy() // 调用者运行策略
        );
        
        System.out.println(\"线程池状态: 核心2, 最大4, 队列容量3\");
        System.out.println(\"开始提交任务...n\");
        
        // 提交15个任务,观察线程池行为
        for (int i = 1; i <= 15; i++) {
            final int taskId = i;
            try {
                executor.execute(() -> {
                    try {
                        System.out.println(\" 任务\" + taskId + \" 执行中 [线程: \" + 
                                         Thread.currentThread().getName() + 
                                         \", 活跃线程: \" + executor.getActiveCount() +
                                         \", 队列大小: \" + executor.getQueue().size() + \"]\");
                        Thread.sleep(2000); // 模拟任务执行
                        System.out.println(\" 任务\" + taskId + \" 完成\");
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
                
                System.out.println(\" 提交任务\" + taskId + \" → \" +
                                 \"池大小: \" + executor.getPoolSize() +
                                 \", 队列: \" + executor.getQueue().size());
                
                Thread.sleep(300); // 控制提交速度
                
            } catch (RejectedExecutionException e) {
                System.out.println(\" 任务\" + taskId + \" 被拒绝策略处理\");
            }
        }
        
        // 监控线程池状态
        monitorThreadPool(executor);
        
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
    }
    
    private static void monitorThreadPool(ThreadPoolExecutor executor) {
        new Thread(() -> {
            try {
                while (!executor.isTerminated()) {
                    System.out.println(\"n 线程池监控:\" +
                                     \" 核心线程: \" + executor.getCorePoolSize() +
                                     \", 池大小: \" + executor.getPoolSize() +
                                     \", 活跃线程: \" + executor.getActiveCount() +
                                     \", 队列大小: \" + executor.getQueue().size() +
                                     \", 完成任务: \" + executor.getCompletedTaskCount());
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
    
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolDeepAnalysis.explainThreadPoolParams();
        System.out.println(\"n\" + \"=\".repeat(50) + \"n\");
        demonstrateWorkflow();
    }
}

4.3 四种拒绝策略详细对比

public class RejectionPolicyComparison {
    
    public static void compareAllPolicies() throws InterruptedException {
        System.out.println(\"=== 四种拒绝策略对比演示 ===\");
        
        // 1. AbortPolicy - 默认策略
        testRejectionPolicy(new ThreadPoolExecutor.AbortPolicy(), \"AbortPolicy\");
        
        // 2. CallerRunsPolicy - 调用者运行
        testRejectionPolicy(new ThreadPoolExecutor.CallerRunsPolicy(), \"CallerRunsPolicy\");
        
        // 3. DiscardPolicy - 静默丢弃
        testRejectionPolicy(new ThreadPoolExecutor.DiscardPolicy(), \"DiscardPolicy\");
        
        // 4. DiscardOldestPolicy - 丢弃最老任务
        testRejectionPolicy(new ThreadPoolExecutor.DiscardOldestPolicy(), \"DiscardOldestPolicy\");
    }
    
    private static void testRejectionPolicy(RejectedExecutionHandler policy, String policyName) 
            throws InterruptedException {
        System.out.println(\"n 测试策略: \" + policyName);
        
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 0, TimeUnit.SECONDS,
            new ArrayBlockingQueue(1), policy
        );
        
        System.out.println(\"创建线程池: 核心1, 最大1, 队列容量1\");
        System.out.println(\"提交4个任务(超出容量)...\");
        
        int completed = 0;
        int rejected = 0;
        
        for (int i = 1; i <= 4; i++) {
            final int taskId = i;
            try {
                executor.execute(() -> {
                    System.out.println(\"   执行任务 \" + taskId + \" [线程: \" + 
                                     Thread.currentThread().getName() + \"]\");
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
                completed++;
            } catch (Exception e) {
                rejected++;
                System.out.println(\"   任务 \" + taskId + \" 被拒绝: \" + e.getClass().getSimpleName());
            }
        }
        
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.SECONDS);
        
        System.out.println(\" 结果: 完成=\" + completed + \", 拒绝=\" + rejected);
        System.out.println(\"特点: \" + getPolicyDescription(policyName));
    }
    
    private static String getPolicyDescription(String policyName) {
        switch (policyName) {
            case \"AbortPolicy\":
                return \"抛出RejectedExecutionException,确保任务不丢失\";
            case \"CallerRunsPolicy\":
                return \"由调用者线程执行,降低提交速度,提供反馈\";
            case \"DiscardPolicy\":
                return \"静默丢弃,适合不重要的任务\";
            case \"DiscardOldestPolicy\":
                return \"丢弃队列中最老的任务,适合实时性要求高的场景\";
            default:
                return \"未知策略\";
        }
    }
    
    /**
     * 实际应用场景建议
     */
    public static void usageRecommendations() {
        System.out.println(\"n=== 拒绝策略使用建议 ===\");
        
        System.out.println(\"1. AbortPolicy (默认)\");
        System.out.println(\"   - 适用场景: 需要明确知道任务被拒绝的场景\");
        System.out.println(\"   - 优点: 任务不丢失,有明确异常\");
        System.out.println(\"   - 缺点: 需要调用方处理异常\");
        
        System.out.println(\"n2. CallerRunsPolicy\");
        System.out.println(\"   - 适用场景: 不允许任务丢失,且可接受降级的场景\");
        System.out.println(\"   - 优点: 实现简单降级,不会丢失任务\");
        System.out.println(\"   - 缺点: 可能阻塞调用方\");
        
        System.out.println(\"n3. DiscardPolicy\");
        System.out.println(\"   - 适用场景: 不重要的统计、日志等任务\");
        System.out.println(\"   - 优点: 不影响主流程\");
        System.out.println(\"   - 缺点: 任务静默丢失\");
        
        System.out.println(\"n4. DiscardOldestPolicy\");
        System.out.println(\"   - 适用场景: 实时性要求高,新任务比老任务重要\");
        System.out.println(\"   - 优点: 保证新任务有机会执行\");
        System.out.println(\"   - 缺点: 可能丢失重要任务\");
    }
    
    public static void main(String[] args) throws InterruptedException {
        compareAllPolicies();
        usageRecommendations();
    }
}

总结

通过本文的详细解析,我们深入探讨了Java并发编程和集合框架的核心知识点:

关键要点回顾:

  1. AQS/CAS/Lock锁机制

    • CAS是实现无锁编程的基础,但要警惕ABA问题
    • AQS是并发框架的基石,理解其状态管理和队列机制
    • Lock提供了比synchronized更灵活的锁操作
  2. 线程通信与生产者-消费者

    • wait/notify是基础,但Condition更灵活安全
    • 生产者-消费者模型是解决线程协作的经典模式
    • 合理设置队列容量和线程数量是关键
  3. HashMap与ConcurrentHashMap

    • HashMap在单线程下性能优秀,但线程不安全
    • ConcurrentHashMap通过分段锁保证线程安全
    • 根据场景选择合适的Map实现
  4. 线程池最佳实践

    • 合理配置核心参数是性能优化的关键
    • 根据任务特性选择合适的拒绝策略
    • 监控线程池状态,及时调整配置

实战建议:

  • 并发控制:优先考虑使用java.util.concurrent包的工具类
  • 集合选择:并发环境下优先选择ConcurrentHashMap等线程安全集合
  • 线程池:避免使用Executors的快捷方法,根据实际情况定制线程池
  • 资源管理:确保正确释放锁和关闭线程池

掌握这些核心知识点,能够帮助开发者编写出更高效、更稳定的并发程序,应对各种复杂的多线程场景。

发表评论
暂无评论

还没有评论呢,快来抢沙发~

客服

点击联系客服 点击联系客服

在线时间:09:00-18:00

关注微信公众号

关注微信公众号
客服电话

400-888-8888

客服邮箱 122325244@qq.com

手机

扫描二维码

手机访问本站

扫描二维码
搜索