Java并发与集合核心知识点详解:从基础原理到实战应用
在Java开发中,并发编程和集合框架是每个开发者必须掌握的核心技能。无论是构建高并发的后端服务,还是优化系统性能,深入理解这些知识点都至关重要。本文将详细解析AQS、CAS、Lock锁、多线程通信、HashMap源码、ConcurrentHashMap以及线程池等关键知识点。
1. AQS、CAS与Lock锁原理深度解析
1.1 CAS(Compare And Swap)无锁编程基石
CAS是一种基于硬件指令实现的无锁原子操作,它是现代并发编程的基石。CAS操作包含三个基本操作数:内存位置(V)、期望的原值(A)和新值(B)。
核心思想:我认为位置V的值应该是A,如果是的话,我就将它更新为B,否则不做任何修改,并告诉我当前的实际值是多少。
public class CASPrinciple {
/**
* CAS操作模拟(实际由CPU硬件指令保证原子性)
* 伪代码表示的真实CAS流程:
* 1. 读取内存值
* 2. 比较内存值与期望值
* 3. 如果相等则写入新值,返回成功
* 4. 如果不相等则返回失败
*/
// 实际开发中使用Atomic系列类
private AtomicInteger atomicValue = new AtomicInteger(0);
public void demonstrateCAS() {
System.out.println(\"=== CAS原理演示 ===\");
// 第一次CAS:期望0,更新为1,应该成功
boolean firstAttempt = atomicValue.compareAndSet(0, 1);
System.out.println(\"第一次CAS(期望0→更新1): \" + firstAttempt + \", 当前值: \" + atomicValue.get());
// 第二次CAS:期望0,更新为2,应该失败(因为当前值是1)
boolean secondAttempt = atomicValue.compareAndSet(0, 2);
System.out.println(\"第二次CAS(期望0→更新2): \" + secondAttempt + \", 当前值: \" + atomicValue.get());
// 第三次CAS:期望1,更新为3,应该成功
boolean thirdAttempt = atomicValue.compareAndSet(1, 3);
System.out.println(\"第三次CAS(期望1→更新3): \" + thirdAttempt + \", 当前值: \" + atomicValue.get());
}
public static void main(String[] args) {
new CASPrinciple().demonstrateCAS();
}
}
CAS的三大问题及解决方案:
-
ABA问题:值从A→B→A,CAS无法感知中间变化
- 解决方案:使用
AtomicStampedReference添加版本号
- 解决方案:使用
-
循环时间长开销大:CAS失败时会自旋重试,消耗CPU
- 解决方案:设置合理的自旋次数或使用锁
-
只能保证单个变量原子性
- 解决方案:使用
AtomicReference包装多个变量
- 解决方案:使用
1.2 AQS(AbstractQueuedSynchronizer)同步器框架
AQS是Java并发包的核心基础框架,它提供了一个FIFO等待队列和一个原子状态管理机制。大多数同步器(如ReentrantLock、CountDownLatch等)都是基于AQS构建的。
AQS核心组成:
state:volatile int,表示同步状态- CLH队列:线程等待队列
- ConditionObject:条件变量
/**
* 自定义共享锁实现 - 基于AQS
* 这个示例展示如何用AQS构建一个允许多个线程同时访问的共享锁
*/
class CustomSharedLock {
private final Sync sync;
public CustomSharedLock(int permits) {
this.sync = new Sync(permits);
}
// 内部同步器类,继承AQS
private static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits); // 初始化许可数量
}
/**
* 尝试获取共享锁
* @param acquires 请求的许可数量
* @return 返回剩余许可,负数表示获取失败
*/
@Override
protected int tryAcquireShared(int acquires) {
// 自旋CAS操作
for (;;) {
int available = getState();
int remaining = available - acquires;
// 如果剩余许可为负,或者CAS设置成功,返回结果
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
/**
* 释放共享锁
*/
@Override
protected boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) { // 溢出检查
throw new Error(\"Maximum permit count exceeded\");
}
if (compareAndSetState(current, next)) {
return true;
}
}
}
}
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}
public int getAvailablePermits() {
return sync.getState();
}
}
// 测试自定义共享锁
public class AQSDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println(\"=== AQS自定义共享锁演示 ===\");
CustomSharedLock sharedLock = new CustomSharedLock(3); // 允许3个线程同时访问
// 创建多个线程测试共享锁
for (int i = 1; i <= 5; i++) {
final int threadId = i;
new Thread(() -> {
sharedLock.lock();
try {
System.out.println(\"线程\" + threadId + \"获取锁,剩余许可: \" + sharedLock.getAvailablePermits());
Thread.sleep(1000); // 模拟业务操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
sharedLock.unlock();
System.out.println(\"线程\" + threadId + \"释放锁\");
}
}).start();
Thread.sleep(200); // 错开线程启动时间
}
}
}
1.3 Lock锁实现原理
Lock接口提供了比synchronized更灵活的锁操作,ReentrantLock是其主要实现。
Lock的优势:
- 可中断的锁获取
- 超时锁获取
- 尝试锁获取
- 公平性选择
public class LockDeepDive {
private final ReentrantLock lock = new ReentrantLock(true); // 公平锁
public void performTask(String taskName) {
System.out.println(Thread.currentThread().getName() + \" 尝试获取锁执行: \" + taskName);
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + \" 获取锁成功\");
// 模拟业务处理
Thread.sleep(500);
System.out.println(Thread.currentThread().getName() + \" 完成任务: \" + taskName);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + \" 释放锁\");
}
}
// 演示可中断锁
public void interruptibleTask() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + \" 尝试可中断锁获取\");
try {
lock.lockInterruptibly(); // 可中断的锁获取
try {
System.out.println(Thread.currentThread().getName() + \" 获取可中断锁成功\");
Thread.sleep(2000);
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + \" 锁获取被中断\");
throw e;
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println(\"=== Lock锁深度解析 ===\");
LockDeepDive demo = new LockDeepDive();
// 测试基本锁功能
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 3; i++) {
final int taskId = i;
executor.submit(() -> demo.performTask(\"任务\" + taskId));
}
Thread.sleep(1000);
// 测试可中断锁
Thread interruptThread = new Thread(() -> {
try {
demo.interruptibleTask();
} catch (InterruptedException e) {
System.out.println(\"主线程中断了任务线程\");
}
});
interruptThread.start();
Thread.sleep(100);
interruptThread.interrupt(); // 中断等待锁的线程
executor.shutdown();
}
}
2. 多线程通信与生产者-消费者模型
2.1 传统的wait/notify机制
wait/notify是Java最基础的线程间通信机制,但使用起来需要格外小心。
使用要点:
- 必须在同步代码块中使用
- wait()会释放锁
- 要用while循环检查条件,避免虚假唤醒
/**
* 基于wait/notify的生产者-消费者模型
* 这个模型演示了经典的线程协作模式
*/
class TraditionalMessageQueue {
private final List queue = new LinkedList();
private final int maxSize;
public TraditionalMessageQueue(int maxSize) {
this.maxSize = maxSize;
}
/**
* 生产消息
*/
public synchronized void produce(String message) throws InterruptedException {
// 必须用while而不是if,防止虚假唤醒
while (queue.size() == maxSize) {
System.out.println(\"队列已满,生产者等待...\");
wait(); // 释放锁,等待
}
queue.add(message);
System.out.println(\" 生产: \" + message + \" | 队列大小: \" + queue.size());
notifyAll(); // 通知所有等待的消费者
}
/**
* 消费消息
*/
public synchronized String consume() throws InterruptedException {
while (queue.isEmpty()) {
System.out.println(\"队列为空,消费者等待...\");
wait(); // 释放锁,等待
}
String message = queue.remove(0);
System.out.println(\" 消费: \" + message + \" | 队列大小: \" + queue.size());
notifyAll(); // 通知所有等待的生产者
return message;
}
public synchronized int getSize() {
return queue.size();
}
}
2.2 更现代的Lock和Condition机制
Lock和Condition提供了更灵活、更安全的线程通信方式。
Condition的优势:
- 支持多个等待条件
- 更清晰的API
- 更好的性能
/**
* 基于Lock和Condition的增强版生产者-消费者
* 使用两个Condition分别管理生产者和消费者的等待
*/
class EnhancedMessageQueue {
private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition(); // 非空条件
private final Condition notFull = lock.newCondition(); // 非满条件
private final Queue queue = new LinkedList();
private final int capacity;
private int producedCount = 0;
private int consumedCount = 0;
public EnhancedMessageQueue(int capacity) {
this.capacity = capacity;
}
public void produce(String message) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
System.out.println(\" 队列满,生产者[\" + Thread.currentThread().getName() + \"]等待\");
notFull.await(); // 在notFull条件上等待
}
queue.offer(message);
producedCount++;
System.out.println(\" 生产: \" + message + \" | 队列: \" + queue.size() +
\"/\" + capacity + \" | 总生产: \" + producedCount);
notEmpty.signal(); // 唤醒一个消费者
} finally {
lock.unlock();
}
}
public String consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println(\" 队列空,消费者[\" + Thread.currentThread().getName() + \"]等待\");
notEmpty.await(); // 在notEmpty条件上等待
}
String message = queue.poll();
consumedCount++;
System.out.println(\" 消费: \" + message + \" | 队列: \" + queue.size() +
\"/\" + capacity + \" | 总消费: \" + consumedCount);
notFull.signal(); // 唤醒一个生产者
return message;
} finally {
lock.unlock();
}
}
}
2.3 完整的生产者-消费者演示
public class ProducerConsumerShowcase {
public static void main(String[] args) throws InterruptedException {
System.out.println(\"=== 生产者-消费者模型完整演示 ===\");
EnhancedMessageQueue queue = new EnhancedMessageQueue(3);
// 创建多个生产者
Thread producer1 = new Thread(createProducer(queue, \"P1\"), \"Producer-1\");
Thread producer2 = new Thread(createProducer(queue, \"P2\"), \"Producer-2\");
// 创建多个消费者
Thread consumer1 = new Thread(createConsumer(queue, \"C1\"), \"Consumer-1\");
Thread consumer2 = new Thread(createConsumer(queue, \"C2\"), \"Consumer-2\");
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
// 运行10秒后停止
Thread.sleep(10000);
producer1.interrupt();
producer2.interrupt();
consumer1.interrupt();
consumer2.interrupt();
System.out.println(\"演示结束\");
}
private static Runnable createProducer(EnhancedMessageQueue queue, String prefix) {
return () -> {
int count = 0;
while (!Thread.currentThread().isInterrupted()) {
try {
String message = prefix + \"-消息-\" + (++count);
queue.produce(message);
Thread.sleep(800 + (long)(Math.random() * 400)); // 随机睡眠
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(\"生产者 \" + prefix + \" 被中断\");
}
}
};
}
private static Runnable createConsumer(EnhancedMessageQueue queue, String prefix) {
return () -> {
while (!Thread.currentThread().isInterrupted()) {
try {
queue.consume();
Thread.sleep(1000 + (long)(Math.random() * 600)); // 随机睡眠
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(\"消费者 \" + prefix + \" 被中断\");
}
}
};
}
}
3. HashMap源码深度解析与ConcurrentHashMap对比
3.1 HashMap核心架构
HashMap是Java中最常用的哈希表实现,其设计精巧且性能优秀。
Java 8+ HashMap的重大改进:
- 数组+链表+红黑树
- 链表长度≥8时转红黑树
- 红黑树节点≤6时转链表
public class HashMapDeepDive {
/**
* HashMap关键参数解析
*/
public static void analyzeHashMapParams() {
System.out.println(\"=== HashMap核心参数 ===\");
System.out.println(\"默认初始容量: \" + (1 << 4) + \" (16)\"); // DEFAULT_INITIAL_CAPACITY
System.out.println(\"最大容量: \" + (1 << 30) + \" (2^30)\"); // MAXIMUM_CAPACITY
System.out.println(\"默认负载因子: 0.75\"); // DEFAULT_LOAD_FACTOR
System.out.println(\"树化阈值: 8\"); // TREEIFY_THRESHOLD
System.out.println(\"链化阈值: 6\"); // UNTREEIFY_THRESHOLD
System.out.println(\"最小树化容量: 64\"); // MIN_TREEIFY_CAPACITY
}
/**
* 演示HashMap的哈希计算和索引定位
*/
public static void demonstrateHashCalculation() {
System.out.println(\"n=== HashMap哈希计算过程 ===\");
String[] keys = {\"hello\", \"world\", \"java\", \"hashmap\", \"concurrent\"};
int capacity = 16; // 默认容量
for (String key : keys) {
int hashCode = key.hashCode();
int hash = hashCode ^ (hashCode >>> 16); // HashMap的hash方法
int index = (capacity - 1) & hash; // 索引计算
System.out.println(\"Key: \" + key +
\" | HashCode: \" + hashCode +
\" | HashMapHash: \" + hash +
\" | 桶索引: \" + index);
}
}
/**
* HashMap扩容机制演示
*/
public static void demonstrateResize() {
System.out.println(\"n=== HashMap扩容机制 ===\");
Map map = new HashMap(8); // 指定小容量便于观察扩容
// 添加元素触发扩容
for (int i = 1; i <= 12; i++) {
String key = \"key\" + i;
map.put(key, i);
// 模拟HashMap的扩容判断
if (i == 6) { // 8 * 0.75 = 6,达到扩容阈值
System.out.println(\"元素数量达到阈值,即将扩容...\");
}
System.out.println(\"添加 \" + key + \" | 大小: \" + map.size());
}
System.out.println(\"最终Map: \" + map);
}
}
// 测试HashMap行为
class HashMapBehaviorTest {
public static void main(String[] args) {
HashMapDeepDive.analyzeHashMapParams();
HashMapDeepDive.demonstrateHashCalculation();
HashMapDeepDive.demonstrateResize();
// 演示哈希冲突
demonstrateCollision();
}
public static void demonstrateCollision() {
System.out.println(\"n=== 哈希冲突演示 ===\");
Map map = new HashMap(16);
// 精心选择可能产生冲突的key
String[] collisionKeys = {\"Aa\", \"BB\", \"AaAa\", \"BBBB\", \"AaBB\", \"BBAa\"};
for (String key : collisionKeys) {
int hash = key.hashCode();
int index = (16 - 1) & (hash ^ (hash >>> 16));
map.put(key, hash);
System.out.println(\"Key: \" + key + \" | Hash: \" + hash + \" | 索引: \" + index);
}
System.out.println(\"冲突测试Map: \" + map);
}
}
3.2 HashMap vs ConcurrentHashMap深度对比
public class MapComparison {
/**
* 线程安全性对比测试
*/
public static void threadSafetyTest() throws InterruptedException {
System.out.println(\"=== HashMap vs ConcurrentHashMap 线程安全测试 ===\");
// HashMap测试(线程不安全)
Map hashMap = new HashMap();
testMapConcurrency(hashMap, \"HashMap\");
Thread.sleep(1000);
// ConcurrentHashMap测试(线程安全)
Map concurrentMap = new ConcurrentHashMap();
testMapConcurrency(concurrentMap, \"ConcurrentHashMap\");
}
private static void testMapConcurrency(Map map, String mapType) {
int threadCount = 10;
int operationsPerThread = 1000;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
long startTime = System.currentTimeMillis();
// 创建多个线程同时操作Map
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
executor.submit(() -> {
for (int j = 0; j < operationsPerThread; j++) {
int key = threadId * operationsPerThread + j;
map.put(key, key);
// 偶尔读取操作
if (j % 100 == 0) {
map.get(key);
}
}
});
}
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println(mapType + \" 测试结果:\");
System.out.println(\" - 期望大小: \" + (threadCount * operationsPerThread));
System.out.println(\" - 实际大小: \" + map.size());
System.out.println(\" - 耗时: \" + (endTime - startTime) + \"ms\");
System.out.println(\" - 数据完整性: \" +
(map.size() == threadCount * operationsPerThread ? \" 完整\" : \" 丢失数据\"));
}
/**
* 性能对比测试
*/
public static void performanceComparison() {
System.out.println(\"n=== 性能对比测试 ===\");
int elementCount = 100000;
// HashMap性能测试
long hashMapStart = System.currentTimeMillis();
Map hashMap = new HashMap();
for (int i = 0; i < elementCount; i++) {
hashMap.put(i, \"value\" + i);
}
long hashMapTime = System.currentTimeMillis() - hashMapStart;
// ConcurrentHashMap性能测试
long concurrentMapStart = System.currentTimeMillis();
Map concurrentMap = new ConcurrentHashMap();
for (int i = 0; i < elementCount; i++) {
concurrentMap.put(i, \"value\" + i);
}
long concurrentMapTime = System.currentTimeMillis() - concurrentMapStart;
System.out.println(\"单线程插入 \" + elementCount + \" 个元素:\");
System.out.println(\" - HashMap: \" + hashMapTime + \"ms\");
System.out.println(\" - ConcurrentHashMap: \" + concurrentMapTime + \"ms\");
System.out.println(\" - 性能差异: \" +
((double)concurrentMapTime/hashMapTime) + \"倍\");
}
/**
* 特性对比表格
*/
public static void featureComparison() {
System.out.println(\"n=== 特性对比 ===\");
System.out.println(\"┌─────────────────┬──────────┬────────────────────┐\");
System.out.println(\"│ 特性 │ HashMap │ ConcurrentHashMap │\");
System.out.println(\"├─────────────────┼──────────┼────────────────────┤\");
System.out.println(\"│ 线程安全 │ 否 │ 是 │\");
System.out.println(\"│ 锁粒度 │ 无锁 │ 分段锁/桶锁 │\");
System.out.println(\"│ null键值 │ 允许 │ 不允许 │\");
System.out.println(\"│ 迭代器 │ fail-fast│ weakly consistent │\");
System.out.println(\"│ 读性能 │ 很高 │ 高 │\");
System.out.println(\"│ 写性能 │ 高 │ 中等 │\");
System.out.println(\"└─────────────────┴──────────┴────────────────────┘\");
}
public static void main(String[] args) throws InterruptedException {
threadSafetyTest();
performanceComparison();
featureComparison();
}
}
4. 线程池参数详解与实现原理
4.1 ThreadPoolExecutor核心参数深度解析
public class ThreadPoolDeepAnalysis {
/**
* 线程池参数详细说明
*/
public static void explainThreadPoolParams() {
System.out.println(\"=== 线程池七大核心参数 ===\");
System.out.println(\"1. corePoolSize - 核心线程数\");
System.out.println(\" - 线程池长期维持的线程数量\");
System.out.println(\" - 即使线程空闲也不会被回收\");
System.out.println(\" - 除非设置allowCoreThreadTimeOut=true\");
System.out.println(\"n2. maximumPoolSize - 最大线程数\");
System.out.println(\" - 线程池允许创建的最大线程数量\");
System.out.println(\" - 当工作队列满且核心线程都在忙时,会创建新线程\");
System.out.println(\"n3. keepAliveTime - 空闲线程存活时间\");
System.out.println(\" - 非核心线程空闲时的最长存活时间\");
System.out.println(\" - 超过这个时间,空闲的非核心线程会被回收\");
System.out.println(\"n4. TimeUnit - 时间单位\");
System.out.println(\" - keepAliveTime的时间单位\");
System.out.println(\" - 如TimeUnit.SECONDS、TimeUnit.MILLISECONDS\");
System.out.println(\"n5. workQueue - 工作队列\");
System.out.println(\" - 用于保存等待执行的任务的阻塞队列\");
System.out.println(\" - 常用实现:\");
System.out.println(\" • ArrayBlockingQueue - 有界队列\");
System.out.println(\" • LinkedBlockingQueue - 无界队列\");
System.out.println(\" • SynchronousQueue - 同步移交队列\");
System.out.println(\" • PriorityBlockingQueue - 优先级队列\");
System.out.println(\"n6. threadFactory - 线程工厂\");
System.out.println(\" - 用于创建新线程\");
System.out.println(\" - 可以定制线程名称、优先级、守护状态等\");
System.out.println(\"n7. handler - 拒绝策略\");
System.out.println(\" - 当线程池和工作队列都满时的处理策略\");
System.out.println(\" - 内置策略:\");
System.out.println(\" • AbortPolicy - 抛出RejectedExecutionException\");
System.out.println(\" • CallerRunsPolicy - 由调用者线程执行\");
System.out.println(\" • DiscardPolicy - 静默丢弃任务\");
System.out.println(\" • DiscardOldestPolicy - 丢弃队列中最老的任务\");
}
/**
* 创建定制化的线程池
*/
public static ThreadPoolExecutor createCustomThreadPool() {
int corePoolSize = 2;
int maxPoolSize = 5;
long keepAliveTime = 30;
int queueCapacity = 10;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new ArrayBlockingQueue(queueCapacity),
new CustomThreadFactory(\"CustomPool\"),
new CustomRejectionHandler()
);
// 允许核心线程超时
executor.allowCoreThreadTimeOut(true);
return executor;
}
// 自定义线程工厂
static class CustomThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger threadNumber = new AtomicInteger(1);
CustomThreadFactory(String poolName) {
namePrefix = poolName + \"-thread-\";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
// 自定义拒绝策略
static class CustomRejectionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(\"任务被拒绝,执行降级策略: \" + r);
// 可以记录日志、持久化任务、或者执行其他降级逻辑
}
}
}
4.2 线程池工作流程完整演示
public class ThreadPoolWorkflowDemo {
public static void demonstrateWorkflow() throws InterruptedException {
System.out.println(\"=== 线程池工作流程演示 ===\");
// 创建小型线程池便于观察
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程2个
4, // 最大线程4个
5, // 空闲线程存活5秒
TimeUnit.SECONDS,
new ArrayBlockingQueue(3), // 容量3的工作队列
new ThreadPoolExecutor.CallerRunsPolicy() // 调用者运行策略
);
System.out.println(\"线程池状态: 核心2, 最大4, 队列容量3\");
System.out.println(\"开始提交任务...n\");
// 提交15个任务,观察线程池行为
for (int i = 1; i <= 15; i++) {
final int taskId = i;
try {
executor.execute(() -> {
try {
System.out.println(\" 任务\" + taskId + \" 执行中 [线程: \" +
Thread.currentThread().getName() +
\", 活跃线程: \" + executor.getActiveCount() +
\", 队列大小: \" + executor.getQueue().size() + \"]\");
Thread.sleep(2000); // 模拟任务执行
System.out.println(\" 任务\" + taskId + \" 完成\");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
System.out.println(\" 提交任务\" + taskId + \" → \" +
\"池大小: \" + executor.getPoolSize() +
\", 队列: \" + executor.getQueue().size());
Thread.sleep(300); // 控制提交速度
} catch (RejectedExecutionException e) {
System.out.println(\" 任务\" + taskId + \" 被拒绝策略处理\");
}
}
// 监控线程池状态
monitorThreadPool(executor);
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}
private static void monitorThreadPool(ThreadPoolExecutor executor) {
new Thread(() -> {
try {
while (!executor.isTerminated()) {
System.out.println(\"n 线程池监控:\" +
\" 核心线程: \" + executor.getCorePoolSize() +
\", 池大小: \" + executor.getPoolSize() +
\", 活跃线程: \" + executor.getActiveCount() +
\", 队列大小: \" + executor.getQueue().size() +
\", 完成任务: \" + executor.getCompletedTaskCount());
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
public static void main(String[] args) throws InterruptedException {
ThreadPoolDeepAnalysis.explainThreadPoolParams();
System.out.println(\"n\" + \"=\".repeat(50) + \"n\");
demonstrateWorkflow();
}
}
4.3 四种拒绝策略详细对比
public class RejectionPolicyComparison {
public static void compareAllPolicies() throws InterruptedException {
System.out.println(\"=== 四种拒绝策略对比演示 ===\");
// 1. AbortPolicy - 默认策略
testRejectionPolicy(new ThreadPoolExecutor.AbortPolicy(), \"AbortPolicy\");
// 2. CallerRunsPolicy - 调用者运行
testRejectionPolicy(new ThreadPoolExecutor.CallerRunsPolicy(), \"CallerRunsPolicy\");
// 3. DiscardPolicy - 静默丢弃
testRejectionPolicy(new ThreadPoolExecutor.DiscardPolicy(), \"DiscardPolicy\");
// 4. DiscardOldestPolicy - 丢弃最老任务
testRejectionPolicy(new ThreadPoolExecutor.DiscardOldestPolicy(), \"DiscardOldestPolicy\");
}
private static void testRejectionPolicy(RejectedExecutionHandler policy, String policyName)
throws InterruptedException {
System.out.println(\"n 测试策略: \" + policyName);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue(1), policy
);
System.out.println(\"创建线程池: 核心1, 最大1, 队列容量1\");
System.out.println(\"提交4个任务(超出容量)...\");
int completed = 0;
int rejected = 0;
for (int i = 1; i <= 4; i++) {
final int taskId = i;
try {
executor.execute(() -> {
System.out.println(\" 执行任务 \" + taskId + \" [线程: \" +
Thread.currentThread().getName() + \"]\");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
completed++;
} catch (Exception e) {
rejected++;
System.out.println(\" 任务 \" + taskId + \" 被拒绝: \" + e.getClass().getSimpleName());
}
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
System.out.println(\" 结果: 完成=\" + completed + \", 拒绝=\" + rejected);
System.out.println(\"特点: \" + getPolicyDescription(policyName));
}
private static String getPolicyDescription(String policyName) {
switch (policyName) {
case \"AbortPolicy\":
return \"抛出RejectedExecutionException,确保任务不丢失\";
case \"CallerRunsPolicy\":
return \"由调用者线程执行,降低提交速度,提供反馈\";
case \"DiscardPolicy\":
return \"静默丢弃,适合不重要的任务\";
case \"DiscardOldestPolicy\":
return \"丢弃队列中最老的任务,适合实时性要求高的场景\";
default:
return \"未知策略\";
}
}
/**
* 实际应用场景建议
*/
public static void usageRecommendations() {
System.out.println(\"n=== 拒绝策略使用建议 ===\");
System.out.println(\"1. AbortPolicy (默认)\");
System.out.println(\" - 适用场景: 需要明确知道任务被拒绝的场景\");
System.out.println(\" - 优点: 任务不丢失,有明确异常\");
System.out.println(\" - 缺点: 需要调用方处理异常\");
System.out.println(\"n2. CallerRunsPolicy\");
System.out.println(\" - 适用场景: 不允许任务丢失,且可接受降级的场景\");
System.out.println(\" - 优点: 实现简单降级,不会丢失任务\");
System.out.println(\" - 缺点: 可能阻塞调用方\");
System.out.println(\"n3. DiscardPolicy\");
System.out.println(\" - 适用场景: 不重要的统计、日志等任务\");
System.out.println(\" - 优点: 不影响主流程\");
System.out.println(\" - 缺点: 任务静默丢失\");
System.out.println(\"n4. DiscardOldestPolicy\");
System.out.println(\" - 适用场景: 实时性要求高,新任务比老任务重要\");
System.out.println(\" - 优点: 保证新任务有机会执行\");
System.out.println(\" - 缺点: 可能丢失重要任务\");
}
public static void main(String[] args) throws InterruptedException {
compareAllPolicies();
usageRecommendations();
}
}
总结
通过本文的详细解析,我们深入探讨了Java并发编程和集合框架的核心知识点:
关键要点回顾:
-
AQS/CAS/Lock锁机制
- CAS是实现无锁编程的基础,但要警惕ABA问题
- AQS是并发框架的基石,理解其状态管理和队列机制
- Lock提供了比synchronized更灵活的锁操作
-
线程通信与生产者-消费者
- wait/notify是基础,但Condition更灵活安全
- 生产者-消费者模型是解决线程协作的经典模式
- 合理设置队列容量和线程数量是关键
-
HashMap与ConcurrentHashMap
- HashMap在单线程下性能优秀,但线程不安全
- ConcurrentHashMap通过分段锁保证线程安全
- 根据场景选择合适的Map实现
-
线程池最佳实践
- 合理配置核心参数是性能优化的关键
- 根据任务特性选择合适的拒绝策略
- 监控线程池状态,及时调整配置
实战建议:
- 并发控制:优先考虑使用java.util.concurrent包的工具类
- 集合选择:并发环境下优先选择ConcurrentHashMap等线程安全集合
- 线程池:避免使用Executors的快捷方法,根据实际情况定制线程池
- 资源管理:确保正确释放锁和关闭线程池
掌握这些核心知识点,能够帮助开发者编写出更高效、更稳定的并发程序,应对各种复杂的多线程场景。



还没有评论呢,快来抢沙发~