java|java ConcurrentHashMap源码解读(jdk1.6)

1.简介:
本文分析的ConcurrentHashMap是基于jdk1.6版本,jdk1.8版本的ConcurrentHashMap发生了较大变化将在下文分析,相比于传统的线程安全容器hashtable所有方法都是synchronized,对整张哈希表加锁,ConcurrentHashMap使用分段的思想,及每个锁锁住一段数据。

private static final int MAXIMUM_CAPACITY = 1 << 30; private static final int DEFAULT_CAPACITY = 16; static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; private static final int DEFAULT_CONCURRENCY_LEVEL = 16; private static final float LOAD_FACTOR = 0.75f; static final int TREEIFY_THRESHOLD = 8; static final int MAX_SEGMENTS = 1 << 16;

【java|java ConcurrentHashMap源码解读(jdk1.6)】和hashmap一样,其默认的初始大小为16,最大容量1<<30,默认装载因子0.75。concurrentHashMap默认并发数量是16.最大并发数量为1<<16.
final Segment[] segments;

ConcurrentHashMap的主干为一个Segment数组,其大小必须为2的幂,而Segment类似一个hashmap,每个segment内部维护一个table数组,tabel中的每个桶为一个hashEntry,hashEntry类似hashmap中的entry,内部维护一个键值对。

java|java ConcurrentHashMap源码解读(jdk1.6)
文章图片
未命名文件 (4).png
static final class HashEntry { final K key; final int hash; volatile V value; final HashEntry next; HashEntry(K key, int hash, HashEntry next, V value) { this.key = key; this.hash = hash; this.next = next; this.value = https://www.it610.com/article/value; }@SuppressWarnings("unchecked") static final HashEntry[] newArray(int i) { return new HashEntry[i]; } }

ConcurrentHashMap允许多个读操作并发进行,并没有对读操作加锁.其next是用final修饰的,说明只能在链表的头部新增一个节点,删除时也不能像普通hashmap那样删除,需要将删除节点之前的节点复制一遍。
segment为ConcurrentHashMap中的一个嵌套类,继承ReentrantLock,与hashmap类似,其主体为一个hashEntry类型的table,成员变量业与hashmap类似。
static final class Segment extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; transient volatile int count; transient int modCount; transient int threshold; transient volatile HashEntry[] table; final float loadFactor;

构造函数:
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; //找到比concurrencyLevel小的最大的2的幂 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; segmentMask = ssize - 1; this.segments = Segment.newArray(ssize); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; //找到比c大的最小的2的幂 while (cap < c) cap <<= 1; for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment(cap, loadFactor); }

由此可见ConcurrentHashMap的concurrencyLevel并发级别就是segment数组的大小,每个segment的容量为总容量/segment的数量。
二.主要方法:
get方法:
public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); } final Segment segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; }

segmentFor方法确定元素在那个segment中,segmentMask=segment大小-1
V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; //在put的时候new了一个entry, if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; }

get方法没有加锁是一种乐观的设计方法,只有在put方法做了安全检查。
第一步判段每个桶中链表的长度,由于count是volatile的且put和remove都是加锁的,且修改count值的操作总是在方法的最后一步。所以可能出现一个一个线程正在执行get方法,对应桶上链表的长度为0,但另一个线程正在执行put方法向该桶中添加元素,put方法执行到一半还没有修改count的值。这样该方法会返回null.
由于value为volatile的,所以当其他线程修改了节点的value,get方法会立即可见。next为final不可变的,就算删除操作将节点删除暴露出去,由于next不可变,所以也不会发生并发安全问题。但getFirst()可能返回过期的头结点,这一点是允许的。
当value的值为null的时候,调用加锁的get方法,理论上不可能因为put的时候做了判段,如果键为空则抛出空指针异常,但由于在put的时候new了一个Entry:tab[index] = new HashEntry(key, hash, first, value); 在这个语句中对tab[index]的赋值和对HashEntry的赋值可能发生指令重排序,导致节点的值为空。当这个值为空时说明有别的线程正在改变节点,会引起数据不一致,需要加锁。
put方法:
public V put(K key, V value) { if (value =https://www.it610.com/article/= null) throw new NullPointerException(); int hash = hash(key.hashCode()); return segmentFor(hash).put(key, hash, value, false); }V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { int c = count; if (c++> threshold) // ensure capacity rehash(); HashEntry[] tab = table; int index = hash & (tab.length - 1); HashEntry first = tab[index]; HashEntry e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { oldValue = https://www.it610.com/article/e.value; if (!onlyIfAbsent) e.value = value; } else { oldValue = null; ++modCount; tab[index] = new HashEntry(key, hash, first, value); count = c; // write-volatile } return oldValue; } finally { unlock(); } }

put方法在链表的头部插入节点。
void rehash() { HashEntry[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity >= MAXIMUM_CAPACITY) return; HashEntry[] newTable = HashEntry.newArray(oldCapacity<<1); threshold = (int)(newTable.length * loadFactor); int sizeMask = newTable.length - 1; for (int i = 0; i < oldCapacity ; i++) { // We need to guarantee that any existing reads of old Map can //proceed. So we cannot yet null out each bin. HashEntry e = oldTable[i]; if (e != null) { HashEntry next = e.next; int idx = e.hash & sizeMask; //Single node on list if (next == null) newTable[idx] = e; else { // Reuse trailing consecutive sequence at same slot HashEntry lastRun = e; int lastIdx = idx; for (HashEntry last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone all remaining nodes for (HashEntry p = e; p != lastRun; p = p.next) { int k = p.hash & sizeMask; HashEntry n = newTable[k]; newTable[k] = new HashEntry(p.key, p.hash, n, p.value); } } } } table = newTable; }

    推荐阅读