课程概述
Day 2 深入探索 Java 内存模型 (JMM),理解多线程环境下数据不一致的根源,掌握 volatile 关键字的原理与应用,并通过实战代码理解可见性、原子性和有序性问题。
学习目标
- 深入理解 Java 内存模型的抽象结构
- 掌握 JMM 三大特性:原子性、可见性、有序性
- 熟练掌握 volatile 关键字的原理和使用场景
- 理解 happens-before 原则和内存屏障机制
- 掌握指令重排序问题和解决方案
- 完成可见性问题的实战练习
理论基础
1. Java 内存模型 (JMM) 概述
1.1 JMM 的抽象概念
Java 内存模型 (Java Memory Model, JMM) 是一个抽象的概念,它描述了 Java 程序中各种变量(线程共享变量)的访问规则,以及在 JVM 中将变量存储到内存和从内存中读取变量这样的底层细节。
1.2 JMM 的三大特性
| 特性 | 定义 | 问题场景 | 解决方案 |
|---|---|---|---|
| 原子性 | 一个或多个操作,要么全部执行且执行的过程不会被任何因素打断,要么就都不执行 | i++ 操作非原子性 |
synchronized、Atomic 类 |
| 可见性 | 当一个线程修改了共享变量的值,其他线程能够立即得知这个修改 | 线程缓存导致变量不可见 | volatile、synchronized |
| 有序性 | 程序执行的顺序按照代码的先后顺序执行 | 指令重排序导致执行顺序改变 | volatile、synchronized |
1.3 内存间的交互操作
JMM 定义了 8 种原子操作来完成主内存和工作内存的交互:
| 操作 | 作用 | 解释 |
|---|---|---|
| lock (锁定) | 作用于主内存 | 把一个变量标识为一条线程独占状态 |
| unlock (解锁) | 作用于主内存 | 把一个处于锁定状态的变量释放出来 |
| read (读取) | 作用于主内存 | 把一个变量的值从主内存传输到线程的工作内存 |
| load (载入) | 作用于工作内存 | 把 read 操作从主内存中得到的变量值放入工作内存的变量副本中 |
| use (使用) | 作用于工作内存 | 把工作内存中一个变量的值传递给执行引擎 |
| assign (赋值) | 作用于工作内存 | 把一个从执行引擎接收到的值赋给工作内存的变量 |
| store (存储) | 作用于工作内存 | 把工作内存中一个变量的值传送到主内存中 |
| write (写入) | 作用于主内存 | 把 store 操作从工作内存中得到的变量的值放入主内存的变量中 |
2. happens-before 原则
2.1 什么是 happens-before
happens-before 是 JMM 中最重要的原则之一,它定义了两个操作之间的偏序关系。如果操作 A happens-before 操作 B,那么 A 操作的结果对 B 操作是可见的。
2.2 八大 happens-before 规则
- 程序次序规则:在一个线程内,书写在前面的代码 happens-before 书写在后面的代码
- 管程锁定规则:一个 unlock 操作 happens-before 后面对同一个锁的 lock 操作
- volatile 变量规则:对一个 volatile 变量的写操作 happens-before 后面对这个变量的读操作
- 线程启动规则:Thread 对象的 start() 方法 happens-before 于此线程的每一个动作
- 线程终止规则:线程中的所有操作都 happens-before 对此线程的终止检测
- 线程中断规则:对线程 interrupt() 方法的调用 happens-before 发生于被中断线程的代码检测到中断事件
- 对象终结规则:一个对象的初始化完成 happens-before 于它的 finalize() 方法的开始
- 传递性:如果 A happens-before B,且 B happens-before C,那么 A happens-before C
public class HappensBeforeDemo {
private int value = 0;
private volatile boolean flag = false;
private final Object lock = new Object();
// 示例1:程序次序规则
public void programOrderRule() {
int a = 1; // 1
int b = 2; // 2 - 1 happens-before 2
int c = a + b; // 3 - 2 happens-before 3
System.out.println(c); // 4 - 3 happens-before 4
}
// 示例2:管程锁定规则
public void monitorLockRule() {
synchronized (lock) {
value = 42; // 写操作
} // unlock happens-before 后续的 lock
synchronized (lock) {
System.out.println(value); // 能看到 value = 42
}
}
// 示例3:volatile变量规则
public void volatileRule() {
flag = true; // volatile写 - happens-before 后续的volatile读
if (flag) { // volatile读 - 能看到上面的写入
System.out.println(\"Flag is true\");
}
}
// 示例4:线程启动规则
public void threadStartRule() {
value = 100; // 1
Thread thread = new Thread(() -> {
System.out.println(value); // 2 - 能看到 value = 100
});
thread.start(); // start() happens-before 线程中的动作
}
// 示例5:传递性规则
public void transitivityRule() {
// A happens-before B
synchronized (lock) {
value = 200; // A
}
// B happens-before C
Thread thread = new Thread(() -> {
synchronized (lock) {
System.out.println(value); // C - 能看到 value = 200
}
});
thread.start(); // 启动线程
}
}
可见性问题实战
1. 可见性问题演示
1.1 经典的可见性问题
/**
* 可见性问题演示:子线程可能永远看不到主线程对flag的修改
*/
public class VisibilityProblem {
// 如果不加volatile,子线程可能永远看不到flag的变化
private static boolean flag = false;
public static void main(String[] args) throws InterruptedException {
System.out.println(\"=== 可见性问题演示 ===\");
// 创建子线程
Thread workerThread = new Thread(() -> {
System.out.println(\"子线程启动,开始监控flag...\");
int localCount = 0;
while (!flag) { // 如果没有volatile,可能无限循环
localCount++;
// 偶尔打印,避免控制台刷屏
if (localCount % 100_000_000 == 0) {
System.out.println(\"子线程仍在循环,计数: \" + localCount);
}
}
System.out.println(\"子线程检测到flag = true,退出循环\");
System.out.println(\"子线程最终计数: \" + localCount);
}, \"Worker-Thread\");
workerThread.start();
// 主线程休眠1秒后修改flag
Thread.sleep(1000);
System.out.println(\"主线程将设置 flag = true\");
flag = true;
System.out.println(\"主线程已设置 flag = true\");
// 再等待2秒,看子线程是否能正常退出
Thread.sleep(2000);
if (workerThread.isAlive()) {
System.out.println(\"警告:子线程仍在运行,可能存在可见性问题!\");
// 强制中断(仅用于演示)
workerThread.interrupt();
} else {
System.out.println(\"子线程已正常退出\");
}
}
}
1.2 深度分析:为什么会出现可见性问题?
/**
* 深度分析可见性问题的根源
*/
public class VisibilityAnalysis {
// 模拟缓存行和内存一致性问题
private static class SharedData {
// 不使用volatile,模拟缓存一致性协议失效
public int counter = 0;
public boolean ready = false;
// 使用volatile,确保可见性
public volatile boolean volatileReady = false;
}
private static final SharedData data = new SharedData();
public static void main(String[] args) throws InterruptedException {
System.out.println(\"=== 可见性问题深度分析 ===\");
// 测试1:无volatile的情况
System.out.println(\"n--- 测试1:无volatile修饰的变量 ---\");
testWithoutVolatile();
Thread.sleep(1000);
// 测试2:有volatile的情况
System.out.println(\"n--- 测试2:使用volatile修饰的变量 ---\");
testWithVolatile();
}
private static void testWithoutVolatile() throws InterruptedException {
data.ready = false;
data.counter = 0;
Thread writerThread = new Thread(() -> {
try {
Thread.sleep(500); // 确保读取线程先开始
data.counter = 100;
data.ready = true;
System.out.println(\"写入线程:ready = true, counter = 100\");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, \"Writer-Thread\");
Thread readerThread = new Thread(() -> {
int localCount = 0;
while (!data.ready) {
localCount++;
// 空循环,依赖CPU缓存中的ready值
}
System.out.println(\"读取线程:检测到ready = true\");
System.out.println(\"读取线程:counter = \" + data.counter +
\" (可能看不到更新后的值)\");
}, \"Reader-Thread\");
readerThread.start();
writerThread.start();
// 设置超时,避免无限等待
readerThread.join(3000);
if (readerThread.isAlive()) {
System.out.println(\"读取线程超时,可能存在可见性问题\");
readerThread.interrupt();
}
}
private static void testWithVolatile() throws InterruptedException {
data.volatileReady = false;
data.counter = 0;
Thread writerThread = new Thread(() -> {
try {
Thread.sleep(500);
data.counter = 200;
data.volatileReady = true;
System.out.println(\"写入线程:volatileReady = true, counter = 200\");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, \"Volatile-Writer\");
Thread readerThread = new Thread(() -> {
int localCount = 0;
while (!data.volatileReady) {
localCount++;
}
System.out.println(\"读取线程:检测到volatileReady = true\");
System.out.println(\"读取线程:counter = \" + data.counter +
\" (应该能看到更新后的值)\");
}, \"Volatile-Reader\");
readerThread.start();
writerThread.start();
readerThread.join(3000);
if (readerThread.isAlive()) {
System.out.println(\"读取线程超时,这不应该发生\");
readerThread.interrupt();
} else {
System.out.println(\"读取线程正常退出,volatile保证了可见性\");
}
}
}
2. volatile 关键字深度解析
2.1 volatile 的基本使用
/**
* volatile关键字的基本使用和特性演示
*/
public class VolatileBasics {
// volatile变量:保证可见性和有序性,但不保证原子性
private volatile boolean flag = false;
private volatile int counter = 0;
// 非volatile变量:用于对比
private boolean normalFlag = false;
private int normalCounter = 0;
public static void main(String[] args) throws InterruptedException {
VolatileBasics demo = new VolatileBasics();
System.out.println(\"=== volatile关键字基础演示 ===\");
// 测试1:volatile保证可见性
demo.testVisibility();
Thread.sleep(1000);
// 测试2:volatile不保证原子性
demo.testAtomicity();
Thread.sleep(1000);
// 测试3:volatile防止指令重排序
demo.testOrdering();
}
// 测试可见性
private void testVisibility() throws InterruptedException {
System.out.println(\"n--- 测试1:volatile可见性保证 ---\");
Thread writer = new Thread(() -> {
try {
Thread.sleep(500);
flag = true;
normalFlag = true;
System.out.println(\"写入线程:设置 flag = true, normalFlag = true\");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread volatileReader = new Thread(() -> {
int count = 0;
while (!flag) {
count++;
}
System.out.println(\"volatile读取线程:检测到flag = true,循环次数: \" + count);
});
Thread normalReader = new Thread(() -> {
int count = 0;
while (!normalFlag) {
count++;
}
System.out.println(\"普通读取线程:检测到normalFlag = true,循环次数: \" + count);
});
volatileReader.start();
normalReader.start();
writer.start();
volatileReader.join(2000);
normalReader.join(2000);
if (volatileReader.isAlive()) {
volatileReader.interrupt();
System.out.println(\"volatile读取线程超时(不应该发生)\");
}
if (normalReader.isAlive()) {
normalReader.interrupt();
System.out.println(\"普通读取线程超时(可能发生)\");
}
}
// 测试原子性(演示volatile不能保证原子性)
private void testAtomicity() throws InterruptedException {
System.out.println(\"n--- 测试2:volatile不能保证原子性 ---\");
final int THREAD_COUNT = 10;
final int INCREMENTS_PER_THREAD = 1000;
Thread[] threads = new Thread[THREAD_COUNT];
// 重置计数器
counter = 0;
normalCounter = 0;
// 创建多个线程增加volatile计数器
for (int i = 0; i < THREAD_COUNT; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < INCREMENTS_PER_THREAD; j++) {
counter++; // 非原子操作:读取-修改-写入
normalCounter++;
}
});
}
// 启动所有线程
for (Thread thread : threads) {
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
thread.join();
}
int expectedCount = THREAD_COUNT * INCREMENTS_PER_THREAD;
System.out.println(\"预期计数: \" + expectedCount);
System.out.println(\"volatile计数器实际值: \" + counter);
System.out.println(\"普通计数器实际值: \" + normalCounter);
System.out.println(\"volatile计数器是否正确: \" + (counter == expectedCount));
System.out.println(\"普通计数器是否正确: \" + (normalCounter == expectedCount));
}
// 测试有序性(防止指令重排序)
private void testOrdering() {
System.out.println(\"n--- 测试3:volatile防止指令重排序 ---\");
// 这个例子演示了volatile如何防止指令重排序
// 在单例模式的懒汉式中,volatile是必需的
Singleton instance1 = Singleton.getInstance();
Singleton instance2 = Singleton.getInstance();
System.out.println(\"单例模式测试完成\");
System.out.println(\"实例1是否等于实例2: \" + (instance1 == instance2));
}
// 演示volatile在单例模式中的应用
private static class Singleton {
private static volatile Singleton instance;
private Singleton() {
// 防止反射创建实例
if (instance != null) {
throw new IllegalStateException(\"Singleton already initialized\");
}
}
public static Singleton getInstance() {
if (instance == null) { // 第一次检查
synchronized (Singleton.class) {
if (instance == null) { // 第二次检查
instance = new Singleton();
}
}
}
return instance;
}
}
}
2.2 volatile 的底层实现原理
/**
* volatile底层实现原理演示和分析
*/
public class VolatileImplementation {
// volatile变量
private volatile int volatileVar = 0;
// 普通变量
private int normalVar = 0;
public static void main(String[] args) throws InterruptedException {
System.out.println(\"=== volatile底层实现原理分析 ===\");
VolatileImplementation demo = new VolatileImplementation();
// 演示内存屏障的效果
demo.testMemoryBarriers();
Thread.sleep(500);
// 演示缓存一致性协议
demo.testCacheCoherence();
}
// 测试内存屏障的效果
private void testMemoryBarriers() throws InterruptedException {
System.out.println(\"n--- 内存屏障效果演示 ---\");
// 设置变量
volatileVar = 1;
normalVar = 1;
Thread reader1 = new Thread(() -> {
int value1 = volatileVar; // LoadLoad屏障
int value2 = normalVar; // 可能被重排序到volatileVar读取之前
System.out.println(\"线程1读取:volatileVar=\" + value1 + \", normalVar=\" + value2);
});
Thread reader2 = new Thread(() -> {
int value3 = normalVar; // 普通读取
int value4 = volatileVar; // LoadLoad屏障确保normalVar读取在volatileVar之前
System.out.println(\"线程2读取:normalVar=\" + value3 + \", volatileVar=\" + value4);
});
reader1.start();
reader2.start();
reader1.join();
reader2.join();
System.out.println(\"内存屏障确保了volatile读操作的有序性\");
}
// 测试缓存一致性协议
private void testCacheCoherence() throws InterruptedException {
System.out.println(\"n--- 缓存一致性协议演示 ---\");
final int ITERATIONS = 100;
Thread writer = new Thread(() -> {
for (int i = 1; i <= ITERATIONS; i++) {
volatileVar = i;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
if (i % 20 == 0) {
System.out.println(\"写入线程:volatileVar = \" + i);
}
}
});
Thread reader = new Thread(() -> {
int lastValue = 0;
int readCount = 0;
while (readCount < ITERATIONS) {
int currentValue = volatileVar;
if (currentValue != lastValue) {
readCount++;
lastValue = currentValue;
System.out.println(\"读取线程:检测到新值 \" + currentValue +
\" (第\" + readCount + \"次读取)\");
}
// 模拟其他工作
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
writer.start();
reader.start();
writer.join();
reader.join();
System.out.println(\"缓存一致性协议确保了所有线程看到最新的volatile值\");
}
}
/**
* volatile字节码分析助手
*/
public class VolatileBytecodeAnalysis {
// 编译命令:javap -c VolatileBytecodeAnalysis
public static void main(String[] args) {
System.out.println(\"请使用 javap -c VolatileBytecodeAnalysis 查看字节码\");
System.out.println(\"对比volatile变量和普通变量的字节码差异\");
}
private volatile int volatileField;
private int normalField;
public void writeVolatile() {
volatileField = 1; // 字节码中会有putfield指令,但JVM会插入内存屏障
}
public void writeNormal() {
normalField = 1; // 普通的putfield指令
}
public int readVolatile() {
return volatileField; // 字节码中会有getfield指令,但JVM会插入内存屏障
}
public int readNormal() {
return normalField; // 普通的getfield指令
}
}
3. 指令重排序与内存屏障
3.1 指令重排序问题演示
/**
* 指令重排序问题演示
*/
public class ReorderingProblem {
private static int x = 0, y = 0;
private static int a = 0, b = 0;
public static void main(String[] args) throws InterruptedException {
System.out.println(\"=== 指令重排序问题演示 ===\");
int reorderCount = 0;
int totalTests = 1000000;
for (int i = 0; i < totalTests; i++) {
// 重置变量
x = y = a = b = 0;
// 创建两个线程
Thread t1 = new Thread(() -> {
a = 1; // 操作1
x = b; // 操作2 - 可能被重排序到操作1之前
});
Thread t2 = new Thread(() -> {
b = 1; // 操作3
y = a; // 操作4 - 可能被重排序到操作3之前
});
t1.start();
t2.start();
t1.join();
t2.join();
// 检查是否发生了指令重排序
if (x == 0 && y == 0) {
reorderCount++;
System.out.println(\"第\" + (i + 1) + \"次测试检测到指令重排序!x=\" + x + \", y=\" + y);
// 只显示前几次
if (reorderCount >= 5) {
break;
}
}
}
System.out.println(\"总测试次数: \" + Math.min(totalTests, reorderCount == 0 ? totalTests : 5000));
System.out.println(\"检测到重排序次数: \" + reorderCount);
if (reorderCount > 0) {
System.out.println(\"指令重排序确实发生了!这说明需要内存屏障来保证有序性。\");
} else {
System.out.println(\"在当前测试中未检测到明显的指令重排序(可能需要更多测试)\");
}
}
}
3.2 内存屏障的详细分析
/**
* 内存屏障详细分析和演示
*/
public class MemoryBarrierAnalysis {
// 使用volatile来演示内存屏障的插入
private volatile boolean flag = false;
private int data = 0;
private int ready = 0;
public static void main(String[] args) throws InterruptedException {
MemoryBarrierAnalysis demo = new MemoryBarrierAnalysis();
System.out.println(\"=== 内存屏障详细分析 ===\");
// 演示不同类型的内存屏障
demo.demoLoadLoadBarriers();
Thread.sleep(500);
demo.demoStoreStoreBarriers();
Thread.sleep(500);
demo.demoLoadStoreBarriers();
Thread.sleep(500);
demo.demoStoreLoadBarriers();
}
// LoadLoad屏障演示
private void demoLoadLoadBarriers() throws InterruptedException {
System.out.println(\"n--- LoadLoad屏障演示 ---\");
System.out.println(\"LoadLoad屏障:确保Load1的读取操作先于Load2及后续读取操作\");
data = 42;
flag = true; // volatile写
Thread reader = new Thread(() -> {
// LoadLoad屏障在这里插入(由volatile读触发)
boolean flagValue = flag; // Load1
int dataValue = data; // Load2 - 确保在Load1之后
System.out.println(\"LoadLoad屏障效果:flag=\" + flagValue + \", data=\" + dataValue);
System.out.println(\"确保了data的读取在flag读取之后进行\");
});
reader.start();
reader.join();
}
// StoreStore屏障演示
private void demoStoreStoreBarriers() throws InterruptedException {
System.out.println(\"n--- StoreStore屏障演示 ---\");
System.out.println(\"StoreStore屏障:确保Store1立刻对其他处理器可见,先于Store2及后续存储\");
Thread writer = new Thread(() -> {
data = 100; // Store1
// StoreStore屏障在这里插入(由volatile写触发)
ready = 1; // Store2 - volatile写
System.out.println(\"StoreStore屏障确保data=100的写入对ready=1的写入之前可见\");
});
Thread observer = new Thread(() -> {
while (ready != 1) {
Thread.yield();
}
int dataValue = data;
System.out.println(\"观察者线程看到ready=1,data=\" + dataValue +
\" (应该能看到data=100)\");
});
observer.start();
writer.start();
writer.join();
observer.join();
}
// LoadStore屏障演示
private void demoLoadStoreBarriers() throws InterruptedException {
System.out.println(\"n--- LoadStore屏障演示 ---\");
System.out.println(\"LoadStore屏障:确保Load1数据装载先于Store2及后续存储刷新到内存\");
flag = false; // 重置
data = 0;
Thread processor = new Thread(() -> {
boolean flagValue = flag; // Load1
// LoadStore屏障在这里插入
data = 200; // Store1 - 确保在Load之后
System.out.println(\"LoadStore屏障:先读取flag,后写入data\");
});
Thread flagSetter = new Thread(() -> {
try {
Thread.sleep(100);
flag = true; // 设置flag
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
flagSetter.start();
processor.start();
processor.join();
flagSetter.join();
}
// StoreLoad屏障演示
private void demoStoreLoadBarriers() throws InterruptedException {
System.out.println(\"n--- StoreLoad屏障演示 ---\");
System.out.println(\"StoreLoad屏障:确保Store1立刻对其他处理器可见,先于Load2及后续装载\");
final boolean[] stopFlag = {false};
Thread writer = new Thread(() -> {
int counter = 0;
while (!stopFlag[0]) {
counter++;
data = counter; // Store1
// StoreLoad屏障在这里插入(由volatile读写触发)
boolean currentFlag = flag; // Load1
if (counter % 100000 == 0) {
System.out.println(\"写入线程:data=\" + counter + \", flag=\" + currentFlag);
}
}
});
Thread reader = new Thread(() -> {
try {
Thread.sleep(100);
flag = true; // volatile写
Thread.sleep(100);
stopFlag[0] = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
writer.start();
reader.start();
writer.join();
reader.join();
System.out.println(\"StoreLoad屏障确保存储操作对后续加载操作可见\");
}
}
实战练习
1. 基础练习 ⭐
任务: 使用 volatile 解决简单的可见性问题
/**
* 基础练习:使用volatile解决可见性问题
*/
public class BasicExercise {
// 任务1:使用volatile修复可见性问题
private volatile boolean shutdownRequested = false;
// 任务2:使用volatile实现简单的状态机
private volatile String currentState = \"INIT\";
public static void main(String[] args) throws InterruptedException {
BasicExercise exercise = new BasicExercise();
System.out.println(\"=== 基础练习:volatile可见性问题 ===\");
// 练习1:优雅停止线程
exercise.exercise1_GracefulShutdown();
Thread.sleep(1000);
// 练习2:状态通知
exercise.exercise2_StateNotification();
}
// 练习1:优雅停止线程
private void exercise1_GracefulShutdown() throws InterruptedException {
System.out.println(\"n--- 练习1:优雅停止工作线程 ---\");
Thread worker = new Thread(() -> {
System.out.println(\"工作线程开始运行...\");
int taskCount = 0;
while (!shutdownRequested) {
taskCount++;
// 模拟工作
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
// 定期报告状态
if (taskCount % 10 == 0) {
System.out.println(\"工作线程:已完成 \" + taskCount + \" 个任务\");
}
}
System.out.println(\"工作线程:收到停止信号,正在清理资源...\");
try {
Thread.sleep(200); // 模拟清理工作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println(\"工作线程:已停止,总共处理了 \" + taskCount + \" 个任务\");
}, \"Worker-Thread\");
worker.start();
// 主线程运行一段时间后请求停止
Thread.sleep(2000);
System.out.println(\"主线程:请求停止工作线程\");
shutdownRequested = true;
worker.join(1000);
if (worker.isAlive()) {
System.out.println(\"主线程:工作线程未及时响应,强制中断\");
worker.interrupt();
} else {
System.out.println(\"主线程:工作线程已优雅停止\");
}
}
// 练习2:状态通知
private void exercise2_StateNotification() throws InterruptedException {
System.out.println(\"n--- 练习2:基于volatile的状态通知 ---\");
String[] states = {\"INIT\", \"READY\", \"PROCESSING\", \"COMPLETED\"};
Thread stateMonitor = new Thread(() -> {
String lastState = \"\";
while (!\"COMPLETED\".equals(currentState)) {
if (!currentState.equals(lastState)) {
lastState = currentState;
System.out.println(\"状态监控器:检测到状态变化 -> \" + lastState);
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println(\"状态监控器:处理已完成\");
}, \"State-Monitor\");
Thread stateChanger = new Thread(() -> {
try {
for (String state : states) {
currentState = state;
System.out.println(\"状态设置器:设置状态为 \" + state);
Thread.sleep(300);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, \"State-Changer\");
stateMonitor.start();
stateChanger.start();
stateChanger.join();
stateMonitor.join();
System.out.println(\"练习2完成:状态通知机制工作正常\");
}
}
2. 进阶练习 ⭐⭐
任务: 实现一个基于 volatile 的自旋锁
/**
* 进阶练习:基于volatile的自旋锁实现
*/
public class AdvancedExercise {
public static void main(String[] args) throws InterruptedException {
System.out.println(\"=== 进阶练习:volatile自旋锁 ===\");
// 测试自旋锁
testSpinLock();
Thread.sleep(1000);
// 测试可重入自旋锁
testReentrantSpinLock();
}
// 测试基础自旋锁
private static void testSpinLock() throws InterruptedException {
System.out.println(\"n--- 测试基础自旋锁 ---\");
SpinLock spinLock = new SpinLock();
AtomicInteger counter = new AtomicInteger(0);
int threadCount = 5;
int incrementsPerThread = 1000;
CountDownLatch latch = new CountDownLatch(threadCount);
// 创建多个线程竞争锁
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
for (int j = 0; j < incrementsPerThread; j++) {
spinLock.lock();
try {
// 临界区:简单计数
counter.incrementAndGet();
// 模拟一些工作
Thread.sleep(1);
} finally {
spinLock.unlock();
}
}
System.out.println(\"线程 \" + threadId + \" 完成\");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}, \"SpinLock-Thread-\" + i).start();
}
latch.await();
System.out.println(\"所有线程完成,最终计数: \" + counter.get());
System.out.println(\"预期计数: \" + (threadCount * incrementsPerThread));
System.out.println(\"计数正确: \" + (counter.get() == threadCount * incrementsPerThread));
}
// 测试可重入自旋锁
private static void testReentrantSpinLock() throws InterruptedException {
System.out.println(\"n--- 测试可重入自旋锁 ---\");
ReentrantSpinLock reentrantLock = new ReentrantSpinLock();
AtomicInteger recursiveCounter = new AtomicInteger(0);
Thread recursiveThread = new Thread(() -> {
reentrantLock.lock();
try {
System.out.println(\"外层锁获取成功\");
reentrantLock.lock();
try {
System.out.println(\"内层锁获取成功\");
reentrantLock.lock();
try {
System.out.println(\"最内层锁获取成功\");
recursiveCounter.set(100);
} finally {
reentrantLock.unlock();
System.out.println(\"最内层锁释放\");
}
} finally {
reentrantLock.unlock();
System.out.println(\"内层锁释放\");
}
} finally {
reentrantLock.unlock();
System.out.println(\"外层锁释放\");
}
}, \"Recursive-Thread\");
recursiveThread.start();
recursiveThread.join();
System.out.println(\"可重入测试完成,递归计数器值: \" + recursiveCounter.get());
}
}
/**
* 基于volatile的简单自旋锁实现
*/
class SpinLock {
private volatile boolean locked = false;
public void lock() {
// 自旋直到获取锁
while (locked) {
// 自旋等待
Thread.onSpinWait(); // Java 9+ 提示处理器进入自旋等待状态
}
// 设置锁状态
locked = true;
}
public void unlock() {
locked = false;
}
}
/**
* 基于volatile的可重入自旋锁实现
*/
class ReentrantSpinLock {
private volatile Thread owner = null;
private volatile int recursionCount = 0;
public void lock() {
Thread currentThread = Thread.currentThread();
// 如果已经持有锁,增加重入计数
if (owner == currentThread) {
recursionCount++;
return;
}
// 自旋直到获取锁
while (owner != null) {
Thread.onSpinWait();
}
// 获取锁
owner = currentThread;
recursionCount = 1;
}
public void unlock() {
Thread currentThread = Thread.currentThread();
if (owner != currentThread) {
throw new IllegalStateException(\"当前线程未持有锁\");
}
recursionCount--;
if (recursionCount == 0) {
owner = null;
}
}
}
3. 挑战练习 ⭐⭐⭐
任务: 实现一个高性能的缓存系统,使用 volatile 保证数据一致性
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* 挑战练习:高性能缓存系统实现
*/
public class ChallengeExercise {
public static void main(String[] args) throws InterruptedException {
System.out.println(\"=== 挑战练习:高性能缓存系统 ===\");
// 测试缓存系统
testCacheSystem();
Thread.sleep(1000);
// 性能测试
performanceTest();
}
// 基础功能测试
private static void testCacheSystem() throws InterruptedException {
System.out.println(\"n--- 缓存系统功能测试 ---\");
CacheSystem cache = new CacheSystem();
// 测试基本的put/get操作
cache.put(\"key1\", \"value1\");
cache.put(\"key2\", \"value2\");
System.out.println(\"获取key1: \" + cache.get(\"key1\"));
System.out.println(\"获取key2: \" + cache.get(\"key2\"));
System.out.println(\"获取不存在的key: \" + cache.get(\"nonexistent\"));
// 测试并发访问
int threadCount = 10;
int operationsPerThread = 100;
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger missCount = new AtomicInteger(0);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
for (int j = 0; j < operationsPerThread; j++) {
String key = \"key\" + (threadId * 10 + j % 10);
String value = \"value\" + j;
// 尝试获取
String cachedValue = cache.get(key);
if (cachedValue == null) {
// 缓存未命中,放入缓存
cache.put(key, value);
missCount.incrementAndGet();
} else {
// 缓存命中
successCount.incrementAndGet();
}
// 模拟一些处理时间
if (j % 20 == 0) {
Thread.sleep(1);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}, \"Cache-Thread-\" + i).start();
}
latch.await();
System.out.println(\"并发测试完成:\");
System.out.println(\"缓存命中次数: \" + successCount.get());
System.out.println(\"缓存未命中次数: \" + missCount.get());
System.out.println(\"缓存命中率: \" +
String.format(\"%.2f%%\",
(double) successCount.get() / (successCount.get() + missCount.get()) * 100));
// 测试缓存统计
CacheStats stats = cache.getStats();
System.out.println(\"缓存统计: \" + stats);
}
// 性能测试
private static void performanceTest() throws InterruptedException {
System.out.println(\"n--- 性能测试 ---\");
CacheSystem cache = new CacheSystem();
int threadCount = 20;
int operationsPerThread = 10000;
System.out.println(\"启动 \" + threadCount + \" 个线程,每个执行 \" + operationsPerThread + \" 次操作\");
long startTime = System.nanoTime();
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
for (int j = 0; j < operationsPerThread; j++) {
int key = threadId * 1000 + (j % 1000);
String value = \"value-\" + key;
// 随机操作:70%读取,30%写入
if (Math.random() < 0.7) {
cache.get(key);
} else {
cache.put(key, value);
}
}
} finally {
latch.countDown();
}
}, \"Perf-Thread-\" + i).start();
}
latch.await();
long endTime = System.nanoTime();
long totalOperations = threadCount * operationsPerThread;
double duration = (endTime - startTime) / 1_000_000.0; // 毫秒
double opsPerSecond = totalOperations / (duration / 1000.0);
System.out.println(\"性能测试结果:\");
System.out.println(\"总操作数: \" + totalOperations);
System.out.println(\"总耗时: \" + String.format(\"%.2f\", duration) + \" ms\");
System.out.println(\"吞吐量: \" + String.format(\"%.0f\", opsPerSecond) + \" ops/sec\");
CacheStats finalStats = cache.getStats();
System.out.println(\"最终缓存统计: \" + finalStats);
}
}
/**
* 高性能缓存系统实现
*/
class CacheSystem {
// 使用volatile保证缓存状态的可见性
private volatile CacheEntry[] table;
private volatile int size;
private volatile int modCount;
// 统计信息
private final AtomicInteger hitCount = new AtomicInteger(0);
private final AtomicInteger missCount = new AtomicInteger(0);
private final AtomicInteger putCount = new AtomicInteger(0);
// 缓存配置
private static final int DEFAULT_CAPACITY = 16;
private static final float LOAD_FACTOR = 0.75f;
private final int capacity;
@SuppressWarnings(\"unchecked\")
public CacheSystem() {
this.capacity = DEFAULT_CAPACITY;
this.table = new CacheEntry[capacity];
this.size = 0;
this.modCount = 0;
}
public void put(K key, V value) {
if (key == null) {
throw new IllegalArgumentException(\"Key cannot be null\");
}
int index = getIndex(key);
CacheEntry entry = new CacheEntry(key, value);
// 简单的线性探测处理冲突
while (table[index] != null && !table[index].key.equals(key)) {
index = (index + 1) % capacity;
}
if (table[index] == null) {
size++;
}
table[index] = entry;
putCount.incrementAndGet();
modCount++;
}
public V get(K key) {
if (key == null) {
return null;
}
int index = getIndex(key);
int startIndex = index;
// 线性探测查找
while (table[index] != null) {
if (table[index].key.equals(key)) {
hitCount.incrementAndGet();
return table[index].value;
}
index = (index + 1) % capacity;
if (index == startIndex) {
break; // 遍历完整个表
}
}
missCount.incrementAndGet();
return null;
}
private int getIndex(K key) {
return Math.abs(key.hashCode()) % capacity;
}
public CacheStats getStats() {
int totalRequests = hitCount.get() + missCount.get();
double hitRate = totalRequests == 0 ? 0.0 : (double) hitCount.get() / totalRequests;
return new CacheStats(
size,
capacity,
hitCount.get(),
missCount.get(),
putCount.get(),
hitRate
);
}
// 缓存条目
private static class CacheEntry {
final K key;
volatile V value;
CacheEntry(K key, V value) {
this.key = key;
this.value = value;
}
}
// 缓存统计信息
public static class CacheStats {
private final int size;
private final int capacity;
private final int hitCount;
private final int missCount;
private final int putCount;
private final double hitRate;
public CacheStats(int size, int capacity, int hitCount, int missCount,
int putCount, double hitRate) {
this.size = size;
this.capacity = capacity;
this.hitCount = hitCount;
this.missCount = missCount;
this.putCount = putCount;
this.hitRate = hitRate;
}
@Override
public String toString() {
return String.format(
\"CacheStats{size=%d, capacity=%d, hitCount=%d, missCount=%d, putCount=%d, hitRate=%.2f%%}\",
size, capacity, hitCount, missCount, putCount, hitRate * 100
);
}
}
}
今日总结
核心要点回顾
-
Java 内存模型 (JMM) :
- 抽象了主内存和工作内存的概念
- 定义了 8 种原子操作来管理内存交互
- 为并发编程提供了规范保证
-
JMM 三大特性:
- 原子性:操作不可分割(需要 synchronized 或 atomic 类保证)
- 可见性:变量修改对其他线程立即可见(volatile 或 synchronized 保证)
- 有序性:程序执行顺序与代码顺序一致(volatile 或 synchronized 保证)
-
volatile 关键字:
- 保证可见性和有序性,但不保证原子性
- 通过插入内存屏障来实现有序性保证
- 基于 MESI 等缓存一致性协议实现可见性
-
happens-before 原则:
- 定义了操作间的偏序关系
- 是判断数据是否存在竞争、线程是否安全的重要依据
- 8 大规则为并发编程提供了理论基础
-
指令重排序与内存屏障:
- 编译器和处理器可能会进行指令重排序优化
- 内存屏障可以禁止特定类型重排序
- volatile 读/写会插入相应的内存屏障
最佳实践建议
-
volatile 使用场景:
- 状态标志位(如停止标志)
- 双重检查锁定(DCL)中的实例变量
- 独立观察者的发布场景
- 读写锁的状态变量
-
避免的陷阱:
- 不要依赖 volatile 保证复合操作的原子性
- 不要在 volatile 变量上进行递增或递减操作
- 理解 volatile 的性能开销(相对 synchronized 较小)
-
并发编程原则:
- 优先使用不可变对象
- 使用适当的同步机制(synchronized、volatile、atomic 类)
- 理解 happens-before 关系,避免数据竞争
下节预告
明天我们将深入学习 线程生命周期与状态转换,包括:
- 线程的六大状态详解
- 线程控制方法:sleep、join、yield、interrupt
- 守护线程的特性与应用
- 线程状态监控和调试技巧
课后作业
-
理论作业:
- 详细解释 JMM 的内存交互过程
- 分析 volatile 与 synchronized 的异同点
- 列举 happens-before 的 8 大规则并举例说明
-
实践作业:
- 实现一个基于 volatile 的任务调度器
- 优化之前的生产者-消费者代码,使用 volatile 优化性能
- 分析一个开源项目中 volatile 的使用情况



还没有评论呢,快来抢沙发~