ConcurrentHashMap线程安全实现详解
一、引言与动机
在多线程编程中,线程安全是一个至关重要的问题。HashMap作为Java中最常用的数据结构之一,在并发环境下使用会存在线程安全问题。虽然可以通过Collections.synchronizedMap()或Hashtable来实现线程安全,但它们的性能较差,因为采用了全局锁机制。
ConcurrentHashMap是Java并发包中最重要的线程安全哈希表实现,采用分段锁和CAS操作实现高并发性能。它是HashMap的线程安全版本,支持高并发环境下的高效读写操作。
1.1 为什么需要ConcurrentHashMap?
在高并发场景下,传统的线程安全Map实现存在以下问题:
- Hashtable:使用synchronized关键字修饰所有方法,导致读写操作串行化,性能较差
- Collections.synchronizedMap():同样采用全局锁机制,限制了并发性能
ConcurrentHashMap通过精细的锁粒度控制和无锁化设计,有效解决了这些问题。
1.2 典型应用场景
ConcurrentHashMap在实际开发中应用广泛,常见场景包括:
- 高并发缓存系统:利用其高并发读写能力实现高性能缓存
- 计数器系统:多线程环境下统计访问次数、频率等
- 数据聚合处理:在并发环境中收集和处理数据
- 分布式系统协调:作为本地缓存存储分布式状态
二、核心架构演进
2.1 JDK 7 vs JDK 8架构对比
JDK 7和JDK 8中ConcurrentHashMap的实现有显著差异:
graph TB
subgraph \"JDK 7: 分段锁架构\"
A[ConcurrentHashMap] --> B[Segment 0]
A --> C[Segment 1]
A --> D[...]
A --> E[Segment n]
B --> B1[HashEntry数组]
B1 --> B2[链表]
C --> C1[HashEntry数组]
C1 --> C2[链表]
end
subgraph \"JDK 8: 数组+链表+红黑树\"
F[ConcurrentHashMap] --> G[Node数组]
G --> H[链表/红黑树]
G --> I[链表/红黑树]
G --> J[...]
G --> K[链表/红黑树]
end
2.2 架构演进原因分析
从JDK 7到JDK 8的演进主要出于以下考虑:
- 减少内存开销:JDK 7中每个Segment都需要额外的对象头和数组,而JDK 8只维护一个数组
- 提高并发度:JDK 7中Segment数量固定,限制了并发度;JDK 8中可以更细粒度地控制锁
- 优化查找性能:引入红黑树,在链表过长时提升查找效率
- 简化实现:JDK 8的实现更加简洁,易于维护
三、JDK 8核心实现
3.1 关键数据结构
// 节点基类
static class Node implements Map.Entry {
final int hash;
final K key;
volatile V val;
volatile Node next;
}
// 树节点
static final class TreeNode extends Node {
TreeNode parent;
TreeNode left;
TreeNode right;
TreeNode prev;
boolean red;
}
// 转发节点(扩容时使用)
static final class ForwardingNode extends Node {
final Node[] nextTable;
}
3.2 核心字段解析
public class ConcurrentHashMap extends AbstractMap
implements ConcurrentMap, Serializable {
// 哈希表数组
transient volatile Node[] table;
// 扩容时的新表
private transient volatile Node[] nextTable;
// 控制表初始化和扩容
private transient volatile int sizeCtl;
// 基础计数器
private transient volatile long baseCount;
// 扩容进度
private transient volatile int transferIndex;
// 计数器单元数组
private transient volatile CounterCell[] counterCells;
}
四、线程安全实现机制
4.1 CAS操作应用
ConcurrentHashMap大量使用CAS(Compare-And-Swap)操作来实现无锁化设计:
// 使用CAS更新值
static final boolean casTabAt(Node[] tab, int i,
Node c, Node v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
// 获取节点
static final Node tabAt(Node[] tab, int i) {
return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
4.2 锁粒度优化
ConcurrentHashMap通过细粒度锁机制提升并发性能:
graph LR
A[键K1] --> B[计算哈希]
B --> C[索引i]
A2[键K2] --> B2[计算哈希]
B2 --> C2[索引j]
C --> D[桶i: 锁A]
C2 --> E[桶j: 锁B]
D --> F[线程1操作]
E --> G[线程2操作]
style D fill:#9f9
style E fill:#99f
不同桶之间的操作可以并发执行,互不干扰。
五、核心操作源码分析
5.1 put操作详解
put操作是ConcurrentHashMap最核心的操作之一:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node[] tab = table;;) {
Node f; int n, i, fh;
// 表为空则初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 计算索引位置
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 使用CAS插入新节点
if (casTabAt(tab, i, null, new Node(hash, key, value, null)))
break;
}
// 检测到扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 哈希冲突处理
else {
V oldVal = null;
synchronized (f) { // 锁住桶的头节点
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node e = f;; ++binCount) {
K ek;
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
if ((e = e.next) == null) {
pred.next = new Node(hash, key, value, null);
break;
}
}
}
// 红黑树处理
else if (f instanceof TreeBin) {
Node p;
binCount = 2;
if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 检查是否需要树化
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 更新计数
addCount(1L, binCount);
return null;
}
put操作的线程安全流程:
graph TD
A[开始put] --> B[计算哈希和索引]
B --> C[位置为空?]
C -->|是| D[CAS插入新节点]
C -->|否| E[检测扩容标志]
D --> F[成功?]
F -->|是| G[结束]
F -->|否| B
E --> H[正在扩容?]
H -->|是| I[协助扩容]
H -->|否| J[锁住桶头节点]
I --> B
J --> K[链表/树操作]
K --> L[更新计数]
L --> G
- 计算哈希值和索引位置
- 如果位置为空,使用CAS操作插入新节点
- 如果正在扩容,则协助扩容
- 如果发生哈希冲突,则锁住桶的头节点进行操作
- 检查是否需要将链表转换为红黑树
- 更新元素计数
5.2 get操作详解
get操作通过无锁化设计实现高效读取:
final Node getNode(Object key) {
Node[] tab;
Node e, p;
int n, eh;
K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e;
}
}
return null;
}
get操作的特点:
- 无锁实现:完全依赖volatile语义和CAS操作
- 高效读取:大多数情况下不需要加锁即可获取数据
- 弱一致性:可能返回过期数据,但不会导致程序出错
六、扩容机制
6.1 多线程协同扩容
ConcurrentHashMap支持多线程协同扩容,提高扩容效率:
private final void transfer(Node[] tab, Node[] nextTab) {
int n = tab.length, stride;
// 计算每个线程处理的区间
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// 初始化新表
if (nextTab == null) {
try {
Node[] nt = (Node[])new Node[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
// 多线程协作迁移数据
while (advance) {
// 分配迁移任务给当前线程
// ...
}
}
6.2 扩容状态管理
多线程协同扩容的工作机制:
graph LR
A[线程1] --> B[迁移区间A-B]
C[线程2] --> D[迁移区间C-D]
E[线程3] --> F[迁移区间E-F]
B --> G[更新transferIndex]
D --> G
F --> G
G --> H[全部完成?]
H -->|是| I[替换table]
H -->|否| J[继续分配任务]
七、大小统计机制
7.1 分段计数
为了在高并发环境下高效统计元素数量,ConcurrentHashMap采用了分段计数机制:
// 计数器单元
static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
// 基础计数 + 分段计数
transient volatile CounterCell[] counterCells;
7.2 size()方法实现
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
八、性能对比分析
8.1 并发性能对比
| 操作类型 | HashMap | Hashtable | ConcurrentHashMap |
|---|---|---|---|
| 读操作 | 非线程安全 | 全局锁 | 无锁/CAS |
| 写操作 | 非线程安全 | 全局锁 | 分段锁 |
| 高并发读 | 快 | 慢 | 非常快 |
| 高并发写 | 线程不安全 | 非常慢 | 快 |
8.2 适用场景
- 高并发读:ConcurrentHashMap最优
- 高并发写:ConcurrentHashMap表现良好
- 低并发:HashMap性能最好
- 强一致性:Hashtable或Collections.synchronizedMap
九、实战应用
9.1 缓存实现
public class ConcurrentCache {
private final ConcurrentHashMap cache;
private final int maxSize;
public ConcurrentCache(int maxSize) {
this.cache = new ConcurrentHashMap();
this.maxSize = maxSize;
}
public V get(K key) {
return cache.get(key);
}
public void put(K key, V value) {
if (cache.size() >= maxSize) {
// 淘汰策略
cache.clear();
}
cache.put(key, value);
}
}
9.2 频率统计
public class FrequencyCounter {
private final ConcurrentHashMap counter = new ConcurrentHashMap();
public void increment(String key) {
counter.computeIfAbsent(key, k -> new LongAdder()).increment();
}
public long getCount(String key) {
LongAdder adder = counter.get(key);
return adder != null ? adder.longValue() : 0;
}
}
十、最佳实践与注意事项
10.1 初始化容量设置
// 合理设置初始容量避免频繁扩容
ConcurrentHashMap map = new ConcurrentHashMap(1024);
10.2 弱一致性问题
由于ConcurrentHashMap追求高性能,其迭代器具有弱一致性,可能无法反映最新的修改:
Iterator<Map.Entry> iterator = map.entrySet().iterator();
// 在迭代过程中其他线程对map的修改可能不会被此迭代器感知到
while (iterator.hasNext()) {
Map.Entry entry = iterator.next();
// 处理entry
}
10.3 复合操作需要额外同步
虽然单个操作是线程安全的,但复合操作仍需外部同步:
// 错误示例 - 非原子操作
if (!map.containsKey(key)) {
map.put(key, value); // 可能被其他线程抢先
}
// 正确示例
map.putIfAbsent(key, value);
// 或者使用显式同步
synchronized(map) {
if (!map.containsKey(key)) {
map.put(key, value);
}
}
10.4 常见问题和陷阱
- size()方法的近似性:
// size()方法返回的是一个估计值,可能不准确
int size = map.size();
- 遍历过程中的修改:
// 遍历过程中其他线程可能修改map内容
for (Map.Entry entry : map.entrySet()) {
// 处理entry时,map可能已经被其他线程修改
}
十一、总结
ConcurrentHashMap通过精巧的锁粒度控制、CAS操作和多线程协作机制,在保证线程安全的同时提供了优异的并发性能。其主要特点包括:
- 高效的并发访问:通过CAS操作和局部锁定实现高并发读写
- 良好的扩展性:支持多线程协同扩容,提高整体吞吐量
- 内存友好:在JDK 8中优化了内存布局,减少了空间开销
- 渐进式改进:从JDK 7到JDK 8的重大重构体现了持续优化的思想
理解其实现原理对于开发高性能并发应用至关重要,特别是在缓存、计数器等典型应用场景中能够发挥最大价值。



还没有评论呢,快来抢沙发~