JDK源码阅读(7)(ConcurrentHashMap类阅读笔记)

ConcurrentHashMap

public class ConcurrentHashMap extends AbstractMap implements ConcurrentMap, Serializable { ... }

1. 一些重要参数
1.1 MAXIMUM_CAPACITY参数
/** * The largest possible table capacity.This value must be * exactly 1<<30 to stay within Java array allocation and indexing * bounds for power of two table sizes, and is further required * because the top two bits of 32bit hash fields are used for * control purposes. */ private static final int MAXIMUM_CAPACITY = 1 << 30;

MAXIMUM_CAPACITY参数表示map的最大容量,默认为1 << 30。
1.2 DEFAULT_CAPACITY参数
/** * The default initial table capacity.Must be a power of 2 * (i.e., at least 1) and at most MAXIMUM_CAPACITY. */ private static final int DEFAULT_CAPACITY = 16;

DEFAULT_CAPACITY参数表示map的默认容量,为16。
1.3 MAX_ARRAY_SIZE参数
/** * The largest possible (non-power of two) array size. * Needed by toArray and related methods. */ static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

MAX_ARRAY_SIZE参数表示map的数组最大长度,在toArray()及其相关方法中可能用到。大小为Integer.MAX_VALUE - 8
1.4 DEFAULT_CONCURRENCY_LEVEL参数
/** * The default concurrency level for this table. Unused but * defined for compatibility with previous versions of this class. */ private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

DEFAULT_CONCURRENCY_LEVEL参数表示默认并发级别。在笔者当前使用版本JDK13中已经被弃用,但为了和先前版本兼容,保留这个参数。
1.5 LOAD_FACTOR参数
/** * The load factor for this table. Overrides of this value in * constructors affect only the initial table capacity.The * actual floating point value isn't normally used -- it is * simpler to use expressions such as {@code n - (n >>> 2)} for * the associated resizing threshold. */ private static final float LOAD_FACTOR = 0.75f;

LOAD_FACTOR参数表示加载因子,默认和HashMap一样,是0.75。
1.6 TREEIFY_THRESHOLD参数
/** * The bin count threshold for using a tree rather than list for a * bin.Bins are converted to trees when adding an element to a * bin with at least this many nodes. The value must be greater * than 2, and should be at least 8 to mesh with assumptions in * tree removal about conversion back to plain bins upon * shrinkage. */ static final int TREEIFY_THRESHOLD = 8;

TREEIFY_THRESHOLD参数表示数组中链表转换为红黑树的阈值,他被用于与一个链表的长度进行比较。
1.7 UNTREEIFY_THRESHOLD参数
/** * The bin count threshold for untreeifying a (split) bin during a * resize operation. Should be less than TREEIFY_THRESHOLD, and at * most 6 to mesh with shrinkage detection under removal. */ static final int UNTREEIFY_THRESHOLD = 6;

UNTREEIFY_THRESHOLD参数表示数组中红黑树转变成链表的阈值,他被用于与一个红黑树的大小进行比较。
1.8 MIN_TREEIFY_CAPACITY参数
/** * The smallest table capacity for which bins may be treeified. * (Otherwise the table is resized if too many nodes in a bin.) * The value should be at least 4 * TREEIFY_THRESHOLD to avoid * conflicts between resizing and treeification thresholds. */ static final int MIN_TREEIFY_CAPACITY = 64;

MIN_TREEIFY_CAPACITY参数表示将链表树化的哈希表最小容量。只有当整个ConcurrentHashMap的容量大于这个值时,具体的链表才可能发生树化。如果没有大于这个值,将进行扩容而非树化。(扩容也会减少单个链表中的元素数量)。
1.9 MIN_TRANSFER_STRIDE参数
/** * Minimum number of rebinnings per transfer step. Ranges are * subdivided to allow multiple resizer threads.This value * serves as a lower bound to avoid resizers encountering * excessive memory contention.The value should be at least * DEFAULT_CAPACITY.STR */ private static final int MIN_TRANSFER_STRIDE = 16;

在扩容操作中,transfer这个步骤允许多线程并发进行,MIN_TRANSFER_STRIDE参数表示进行一次transfer操作中一个工作线程的最小任务量。即最少要处理的连续哈希桶的数目,默认为16,即最少要对连续的16个哈希桶进行transfer操作,详见下文transfer()方法的解析。
1.10 RESIZE_STAMP_BITS参数(未理解)
/** * The number of bits used for generation stamp in sizeCtl. * Must be at least 6 for 32bit arrays. */ private static final int RESIZE_STAMP_BITS = 16;

RESIZE_STAMP_BITS参数用于生成在每次扩容中都唯一的生成戳。
1.11 MAX_RESIZERS参数(未理解)
/** * The maximum number of threads that can help resize. * Must fit in 32 - RESIZE_STAMP_BITS bits. */ private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

这个参数定义了resize时工作线程的最大数量,但这个计算方法我不明白。MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
1.12 RESIZE_STAMP_SHIFT参数(未理解)
/** * The bit shift for recording size stamp in sizeCtl. */ private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

这个参数定义了sizeCtl中记录大小标记的位移位,但这个计算方法我不明白。MAX_RESIZERS = 32 - RESIZE_STAMP_BITS;
1.13 特殊节点的hash状态参数
/* * Encodings for Node hash fields. See above for explanation. */ static final int MOVED= -1; // hash for forwarding nodes static final int TREEBIN= -2; // hash for roots of trees static final int RESERVED= -3; // hash for transient reservations

正常情况下hash的值都应该是正数,如果是负数说明当前是一个不正常的、特殊的节点。
  • hash值为-1时,代表当前节点是一个Forwarding Node
    • ForwardingNode是一种临时节点,在扩容进行中才会出现,并且它不存储实际的数据。
    • 如果旧数组的一个hash桶中全部的节点都迁移到新数组中,旧数组就在这个hash桶中放置一个ForwardingNode.
    • 读操作或者迭代读时碰到ForwardingNode时,将操作转发到扩容后的新的table数组上去执行,写操作碰见它时,则尝试帮助扩容。
  • hash值为-2时,代表当前节点是一个TreeBin
    • TreeBinConcurrentHashMap中用于代理操作TreeNode的特殊节点,持有存储实际数据的红黑树的根节点。
    • 因为红黑树进行写入操作,整个树的结构可能会有很大的变化,这个对读线程有很大的影响,所以TreeBin还要维护一个简单读写锁,这是相对HashMap,这个类新引入这种特殊节点的重要原因。
  • hash值为-3时,代表当前节点是一个保留节点,即占位符。
    • 一般情况下不会出现。
1.14 HASH_BITS参数
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

HASH_BITSHashTable中也见到过,通过和它的位运算,可以将负数的哈希值转变为正数。
1.15 NCPU参数
/** Number of CPUS, to place bounds on some sizings */ static final int NCPU = Runtime.getRuntime().availableProcessors();

NCPU参数可以获取当前JVM能使用的处理器内核数。
2. 一些重要属性
值得关注的是,ConcurrentHashMap中的关键属性基本都是volatile变量。
2.1 table属性
/** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. */ transient volatile Node[] table;

table属性用于存节点,是桶的集合。
2.2 nextTable属性
/** * The next table to use; non-null only while resizing. */ private transient volatile Node[] nextTable;

nextTable属性表示下一个要使用的数组,辅助resize操作,也仅在resize时非空。
2.3 baseCount属性
/** * Base counter value, used mainly when there is no contention, * but also as a fallback during table initialization * races. Updated via CAS. */ private transient volatile long baseCount;

baseCount属性是在没有争用现象时的基本计数器值,也在初始化表的竞争中使用。
2.4 sizeCtl属性
/** * Table initialization and resizing control.When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads).Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */ private transient volatile int sizeCtl;

sizeCtl属性在表初始化和resize操作控制中发挥作用。
  • sizeCtl为负数,说明表正在进行初始化或resize操作。
    • 表初始化时为 -1。
    • resize时为-(1+扩容线程数)
  • sizeCtl为正数。
    • 表为null时为初始表大小或 0。
    • 表不为null时为需进行resize的下一个计数值。
2.5 transferIndex属性
/** * The next table index (plus one) to split while resizing. */ private transient volatile int transferIndex;

resize中要拆分的下一个表索引。
2.6 cellsBusy属性
/** * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. */ private transient volatile int cellsBusy;

resize过程和/或创建CounterCells过程中使用的自旋锁。
2.7 counterCells数组
/** * Table of counter cells. When non-null, size is a power of 2. */ private transient volatile CounterCell[] counterCells;

显然,这是CounterCell的数组,即计数单元的数组。
3. 内部类
3.1 Node内部类 Node内部类是ConcurrentHashMap类中普通节点的抽象。
/** * Key-value entry.This class is never exported out as a * user-mutable Map.Entry (i.e., one supporting setValue; see * MapEntry below), but can be used for read-only traversals used * in bulk tasks.Subclasses of Node with a negative hash field * are special, and contain null keys and values (but are never * exported).Otherwise, keys and vals are never null. */ static class Node implements Map.Entry { final int hash; final K key; volatile V val; volatile Node next; Node(int hash, K key, V val) { this.hash = hash; this.key = key; this.val = val; }Node(int hash, K key, V val, Node next) { this(hash, key, val); this.next = next; }public final K getKey(){ return key; } public final V getValue(){ return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString() { return Helpers.mapEntryToString(key, val); } public final V setValue(V value) { throw new UnsupportedOperationException(); }public final boolean equals(Object o) { Object k, v, u; Map.Entry e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); }/** * Virtualized support for map.get(); overridden in subclasses. */ Node find(int h, Object k) { Node e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }

意义
Node内部类作为ConcurrentHashMap节点的实现。
hashCode()的实现
注意hashCode()的实现方式:Objects.hashCode(key) ^ Objects.hashCode(value);
find()
这里Node内部类的find()方法其实不会在一般的业务方法如get()中被调用,因为在那些地方会直接遍历。这个方法会在ForwardingNode类的find()方法中被调用。
4. 工具方法
4.1 spread方法
/** * Spreads (XORs) higher bits of hash to lower and also forces top * bit to 0. Because the table uses power-of-two masking, sets of * hashes that vary only in bits above the current mask will * always collide. (Among known examples are sets of Float keys * holding consecutive whole numbers in small tables.)So we * apply a transform that spreads the impact of higher bits * downward. There is a tradeoff between speed, utility, and * quality of bit-spreading. Because many common sets of hashes * are already reasonably distributed (so don't benefit from * spreading), and because we use trees to handle large sets of * collisions in bins, we just XOR some shifted bits in the * cheapest possible way to reduce systematic lossage, as well as * to incorporate impact of the highest bits that would otherwise * never be used in index calculations because of table bounds. */ static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }

通过取高位再进行掩码计算(保证哈希值为正),来减少哈希冲突。
这个方法就是所谓的扰动方法。
4.2 tableSizeFor方法
/** * Returns a power of two table size for the given desired capacity. * See Hackers Delight, sec 3.2 */ private static final int tableSizeFor(int c) { int n = -1 >>> Integer.numberOfLeadingZeros(c - 1); return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }

tableSizeFor方法用于计算参数c对应的resize阈值。往往出现为下面的语句。
4.3 comparableClassFor方法
/** * Returns x's Class if it is of the form "class C implements * Comparable", else null. */ static Class comparableClassFor(Object x) { if (x instanceof Comparable) { Class c; Type[] ts, as; ParameterizedType p; // 如果是String 直接返回 if ((c = x.getClass()) == String.class) return c; if ((ts = c.getGenericInterfaces()) != null) { for (Type t : ts) { if ((t instanceof ParameterizedType) && ((p = (ParameterizedType)t).getRawType() == Comparable.class) && (as = p.getActualTypeArguments()) != null && as.length == 1 && as[0] == c) // type arg is c return c; } } } return null; }

如果参数x是一个Comparable接口的实现类,那么返回它的类型。
4.4 compareComparables方法
/** * Returns k.compareTo(x) if x matches kc (k's screened comparable * class), else 0. */ @SuppressWarnings({"rawtypes","unchecked"}) // for cast to Comparable static int compareComparables(Class kc, Object k, Object x) { return (x == null || x.getClass() != kc ? 0 : ((Comparable)k).compareTo(x)); }

如果对象x匹配k的可比类kc,那么返回k.compareTo(x),否则返回0。
4.5 列表元素访问方法 4.5.1 tabAt方法
static final Node tabAt(Node[] tab, int i) { return (Node)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE); }

tabAt()方法可以获得在 i 位置上的 Node 节点。
4.5.2 casTabAt方法
static final boolean casTabAt(Node[] tab, int i, Node c, Node v) { return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v); }

casTabAt()方法可以以CAS的方式更新 i 位置上的 Node 节点
4.5.3 setTabAt方法
static final void setTabAt(Node[] tab, int i, Node v) { U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v); }

setTabAt方法可以设置在 i 位置上的 Node 节点。
注:像Unsafe.getReferenceAcquire()方法和Unsafe.putReferenceRelease()方法这样的方法实际上是Unsafevolatile方法的发行版本。如后者是putReferenceVolatile()的发行版本。
4.6 initTable方法
private final Node[] initTable() { Node[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) // 如果sizeCtl属性小于0,说明正在初始化或resize,自旋 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {// 如果SIZECTL仍是sc,则置为-1.表示进入初始化 try { if ((tab = table) == null || tab.length == 0) { // 获取初始大小(sc为正时即为初始大小) int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 创建一个node数组 @SuppressWarnings("unchecked") Node[] nt = (Node[])new Node[n]; // 为table属性赋值 table = tab = nt; sc = n - (n >>> 2); } } finally { // 最后记得更新sizeCtl sizeCtl = sc; } break; } } return tab; }

initTable()方法可以初始化一个空表。
4.7 hashCode方法
public int hashCode() { int h = 0; Node[] t; if ((t = table) != null) { Traverser it = new Traverser(t, t.length, 0, t.length); for (Node p; (p = it.advance()) != null; ) h += p.key.hashCode() ^ p.val.hashCode(); } return h; }

hashCode()方法就是遍历每个键值对,令他们的键和值的哈希码相异或,再全部叠加起来。
4.8 addCount方法 addCount()方法会在ConcurrentHashMap元素数量变化时被调用,两个参数中第一个为数量变化值,第二个为控制是否需要扩容检查的参数。
private final void addCount(long x, int check) { // 创建计数单元CounterCell CounterCell[] cs; long b, s; /** 1.如果counterCells为null: 那么说明之前没有发生过并发冲突,随后会执行U.compareAndSetLong(...,b+x),直接更新了计数值baseCount。而这个本地方法执行成功会返回true,取反后为false。那么整个这个if判断两个条件都为false,不执行if块内内容。 2.如果couterCells不为null: 那么说明之前发生过并发冲突,需要下面的if块处理。这里if的第一个条件为ture,第二个条件的更新方法就不会执行。 */ if ((cs = counterCells) != null || !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) { // 进入if块,说明曾发生过并发冲突,那么要把值加到CounterCell中 CounterCell c; long v; int m; boolean uncontended = true; if (cs == null // cs在并发中又变成了null || (m = cs.length - 1) < 0 // cs长度小于1 || (c = cs[ThreadLocalRandom.getProbe() & m]) == null // 对应的CouterCell为null || !(uncontended = U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {// 尝试更新找到的计数单元c的值 // 如果更新失败。一般就是上面的最后一个条件中的方法返回了false,取反后为true // 说明CounterCells数组中出现了并发冲突,可能涉及该数组的扩容,调用fullAddCount方法 fullAddCount(x, uncontended); return; } if (check <= 1)// 如果无需检查,直接返回 return; // 计数,存到s中,下面用于做检查 s = sumCount(); } // 检查是否需要扩容 if (check >= 0) { Node[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) // 元素数量大于扩容阈值:需要扩容 && (tab = table) != null // 表不为空 && (n = tab.length) < MAXIMUM_CAPACITY) {// 表长度未达上限 int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT; // 如果正在执行resize if (sc < 0) { // 一些放弃帮助扩容的条件 if (sc == rs + MAX_RESIZERS || sc == rs + 1 || (nt = nextTable) == null || transferIndex <= 0) break; // 将sc+1,表示一个新线程加入帮助扩容 if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 当前没在执行resize,尝试成为第一个进入扩容的线程。将sc设置为rs+2 else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2)) transfer(tab, null); // 重新计算元素数量 s = sumCount(); } } }

详细的逻辑看代码注释,这里单独讲几个点。
  • 第一个if的判断条件的使用很精彩,根据CounterCells数组是否为null来检查是该直接把值加到baseCount上还是加到对应的CounterCell中。
  • 注意在CounterCells数组中找到槽位置的方式:c = cs[ThreadLocalRandom.getProbe() & m]) == null
  • check参数小于等于1时,不用做检查就退出。大于1时,要在addCount的主逻辑结束之后在检查需不需要做扩容。在put方法调用addCount时,传入的check参数其实是put过程中遍历到的节点数量,这样逻辑就连通了:假如原先就只有一个节点或为空,就不需要考虑是否需要再检查扩容;否则就要在addCoumt中检查。
4.9 helpTransfer方法 helpTransfer方法可以在节点正在resize时协助数据迁移并返回新数组。在putremove等业务方法中都有调用这个方法。
/** * Helps transfer if a resize is in progress. */ final Node[] helpTransfer(Node[] tab, Node f) { Node[] nextTab; int sc; // 进入方法主逻辑需要同时满足三个条件 if (tab != null// 表不空 && (f instanceof ForwardingNode)// f 是一个Forwarding Node && (nextTab = ((ForwardingNode)f).nextTable) != null) // nextTable不空 { // 计算出本次resize时的标记“戳” int rs = resizeStamp(tab.length) << RESIZE_STAMP_SHIFT; while (nextTab == nextTable // nextTab不变 && table == tab // table不变 && (sc = sizeCtl) < 0) // sizeCtl保持小于0(正在resize) { if (sc == rs + MAX_RESIZERS // 工作线程数量已满 || sc == rs + 1 // 在addCount方法中,假如有第一个扩容线程,sc=rs+2。假如变成rs+1,说明扩容结束。 || transferIndex <= 0) // 如果transferIndex小于等于0,实际说明扩容工作已完成,已进入下标调整。 break; // 令sc++,进入扩容 if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } // 返回新表 return nextTab; } // 返回原表 return table; }

4.10 transfer方法 transfer将方法的功能是将每个 bin 中的节点移动和/或复制到新表。 在addCount()helpTransfer()中都有调用,是扩容工作的核心实现类。
下面例子中若出现具体数字,以传入tab的length为16为准。
private final void transfer(Node[] tab, Node[] nextTab) { // 定义n为表长。 int n = tab.length, stride; /** stride表示一次transfer中一个工作线程的任务量,即要处理的连续哈希桶的数目。 初始化stride:假如可用CPU核数大于1,初始化为(n >>> 3) / NCPU,否则初始化为n。 假如初始化后的stride小于MIN_TRANSFER_STRIDE,把他置为这个最小值。 */ if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab == null) { // nextTab未初始化,就要先初始化这个数组 try { @SuppressWarnings("unchecked")‘ // 创建nextTab数组,长度为原数组长度*2 Node[] nt = (Node[])new Node[n << 1]; nextTab = nt; } catch (Throwable ex) { // 创建新数组失败,sizeCtl就设为int的最大值 sizeCtl = Integer.MAX_VALUE; return; } // 这个数组赋值给nextTable nextTable = nextTab; // 更新转移下标 transferIndex = n; } int nextn = nextTab.length; // 创建ForwardingNode fwd,传入nextTab作为参数 ForwardingNode fwd = new ForwardingNode(nextTab); // 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进 boolean advance = true; // 标记是否已经扩容完成 boolean finishing = false; // to ensure sweep before committing nextTab /** 又是一个for循环自旋,处理每个槽位中的链表元素 */ for (int i = 0, bound = 0; ; ) { Node f; int fh; /** 这个while循环通过CAS不断尝试为当前线程分配任务,直到分配成功或任务队列已经被全部分配完毕。 如果线程已经被分配过bucket区域,那么会通过--i指向下一个待处理桶然后退出循环。 */ while (advance) { int nextIndex, nextBound; // --i表示进入下一个待处理的bucket。自减后大于等于bound,表明当前线程已经分配过bucket,advance=false if (--i >= bound || finishing) advance = false; // 所有bucket已经被分配完毕,为nextIndex赋值。 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // CAS修改TRANSFERINDEX,为线程分配任务。 // 处理的节点区间为(nextBound,nextINdex) else if (U.compareAndSetInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } }// 处理过程 // CASE1:旧数组已遍历完成,当前线程已经处理完所有负责的bucket if (i < 0 || i >= n || i + n >= nextn) { int sc; // 扩容已完成 if (finishing) { // 删除这个成员变量nextTable nextTable = null; // 更新数组 table = nextTab; // 更新扩容阈值 sizeCtl = (n << 1) - (n >>> 1); return; } // 使用 CAS 操作对 sizeCtl 的低16位进行减 1,代表做完了属于自己的任务 if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 如果上面的if没有执行,即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT // 说明当前已经没有扩容线程了,扩容结束了 finishing = advance = true; i = n; // recheck before commit } } // CASE2:节点i处为空,那么放入刚刚初始化的ForwardingNode else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // CASE3:该位置现在的哈希值就是MOVED,是一个ForwardingNode,说明已经被其他线程处理过,那么要求继续推进 else if ((fh = f.hash) == MOVED) advance = true; // already processed // CASE4:执行转移 else { // 锁住头节点 synchronized (f) { // 再次检查 if (tabAt(tab, i) == f) { Node ln, hn; // 槽中头节点是一个链表头节点 if (fh >= 0) { // 先计算好当前的fh * n int runBit = fh & n; // 存储遍历最终位置的lastRun Node lastRun = f; // 遍历链表 for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; // 如果遍历过程中出现hash&n出现了变化,需要更新runBit和lastRun if (b != runBit) { runBit = b; lastRun = p; } }//如果lastRun引用的是低位链表,令ln为lastRun if (runBit == 0) { ln = lastRun; hn = null; } // 如果lastRUn引用的是高位链表,令hn为lastRun else { hn = lastRun; ln = null; }// 遍历链表,把hash&n为0的放在低位链表中,不为0的放在高位链表中 // 循环跳出条件:当前循环结点!=lastRun for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node(ph, pk, pv, ln); else hn = new Node(ph, pk, pv, hn); } // 低位链表的位置不变 setTabAt(nextTab, i, ln); // 高位链表的位置是:原位置 + n setTabAt(nextTab, i + n, hn); // 标记当前桶已迁移 setTabAt(tab, i, fwd); // advance为true,返回上面进行--i操作 advance = true; } // 槽中头节点是一个树节点 else if (f instanceof TreeBin) { TreeBin t = (TreeBin)f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } // 槽中头节点是一个保留占位节点 else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } } } } }

transfer()方法是ConcurrentHashMap执行扩容操作的核心方法,他的扩容转移操作其实类似于HashMap,都是将原链表分裂为两个链表。
但也有很多实现细节上的不同,详见源码注释。
4.11 resizeStamp方法
/** * Returns the stamp bits for resizing a table of size n. * Must be negative when shifted left by RESIZE_STAMP_SHIFT. */ static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }

resizeStamp(int n)方法可以在扩容一个大小为n的table时,计算stamp bits
5. 业务方法
5.1 构造方法
// 默认构造方法 public ConcurrentHashMap() { }// 仅提供初始容量的构造方法 public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, LOAD_FACTOR, 1); }// 提供map的构造方法 public ConcurrentHashMap(Map m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); }// 提供默认容量和加载因子的构造方法 public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); }// 提供默认容量、加载因子和并发更新线程数的构造方法。 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); // 如果初始容量比并发更新线程数还要小,那为它赋新值 if (initialCapacity < concurrencyLevel)// Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); // cap赋值为最大容量或扩容阈值 int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }

5.2 size方法
// 计数单元数组 private transient volatile CounterCell[] counterCells; public int size() { // 调用sumCount() long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); }final long sumCount() { // 获得计数单元数组 CounterCell[] cs = counterCells; long sum = baseCount; if (cs != null) { // 所有计数单元中的值加起来 for (CounterCell c : cs) if (c != null) sum += c.value; } return sum; }// 非常简单的一个计数单元,仅有一个volatile型的计数器value @jdk.internal.vm.annotation.Contended // 这个注解可以保证当前类的对象独享缓存行 static final class CounterCell { // 只提供了构造器,没有提供get/set方法,也就是value的值在初始化时确定,后续不再更改 volatile long value; CounterCell(long x) { value = https://www.it610.com/article/x; } }

size()方法的实现是首先获取baseCount,这是无争用时获取的计数器值。再将计数单元数组中的计数值累加在上面。他有以下几个保证线程安全的举措:
  • counterCells数组和CounterCell类中的value变量设为volatile
  • 没有为CounterCell类中的value变量设置get/set方法。
那么CounterCells数组是怎么被创建和初始化的呢,baseCount是怎么增加的呢。后续结合到那些改变了size的业务方法,如put()等方法的源码,我们再来做解释。
5.3 isEmpty方法
public boolean isEmpty() { return sumCount() <= 0L; // ignore transient negative values }

sumCount()方法见5.2
5.4 get方法
public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; // 计算哈希值 int h = spread(key.hashCode()); if ((tab = table) != null // 表不为空 && (n = tab.length) > 0 // 表长度不为0 && (e = tabAt(tab, (n - 1) & h)) != null) {// 指定位置不为null // 首位置即为要找的key if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; }else if (eh < 0)// 当前链表头哈希值小于0,说明是一个特殊节点 // 调用特殊节点e的find方法 return (p = e.find(h, key)) != null ? p.val : null; // 一个正常节点,正常链表,正常遍历 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }

【JDK源码阅读(7)(ConcurrentHashMap类阅读笔记)】注意,首先我们计算出所要查找的key在哈希表中的散列位置。然后根据找到的节点的哈希值的不同,来做不同的处理。
  • 如果哈希值时所要找到的值,直接返回。
  • 如果哈希值小于0,代表当前节点是一个特殊节点。可参考1.13 特殊节点的hash状态参数。这样的话就会去调用特殊节点的find()方法,如ForwardingNode类和TreeNode类的find()方法。
  • 如果哈希值大于等于0,遍历当前链表查找。
5.5 containsKey方法
public boolean containsKey(Object key) { return get(key) != null; }

5.6 containsValue方法
public boolean containsValue(Object value) { if (value =https://www.it610.com/article/= null) throw new NullPointerException(); Node[] t; if ((t = table) != null) { Traverser it = new Traverser(t, t.length, 0, t.length); for (Node p; (p = it.advance()) != null; ) { V v; if ((v = p.val) == value || (v != null && value.equals(v))) return true; } } return false; }

Traverser类封装了containsValue方法的遍历逻辑,代码较为复杂,这里暂且按下不表。
5.7 put方法
public V put(K key, V value) { return putVal(key, value, false); }final V putVal(K key, V value, boolean onlyIfAbsent) { // 判空 if (key == null || value =https://www.it610.com/article/= null) throw new NullPointerException(); // 计算哈希值 int hash = spread(key.hashCode()); // 当前桶的计数器 int binCount = 0; // 自旋插入节点,直到成功 for (Node[] tab = table; ; ) { Node f; int n, i, fh; K fk; V fv; // CASE1:表为空则先调用初始化方法 if (tab == null || (n = tab.length) == 0) tab = initTable(); // CASE2:如果散列位置节点为空,向空位置插入时是无锁的 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 尝试把要put的键值对直接put到这里 if (casTabAt(tab, i, null, new Node(hash, key, value))) break; // 退出 } // CASE3:如果散列位置节点哈希值为-1,即为一个Forwarding Node,调用helperTransfer() else if ((fh = f.hash) == MOVED) // 协助转移数据并获取新数组 tab = helpTransfer(tab, f); // CASE4:如果onlyIfAbsent为true,且头节点即为所需节点,直接返回它 else if (onlyIfAbsent && fh == hash && ((fk = f.key) == key || (fk != null && key.equals(fk))) && (fv = f.val) != null) return fv; // CASE5:找到了指定位置,并且不为空(出现了哈希冲突)。 else { V oldVal = null; synchronized (f) {// 锁住当前节点(链表头) if (tabAt(tab, i) == f) {// 再判断一下f是不是头节点,防止它被别的线程已经修改了 // if-不是一个特殊节点 if (fh >= 0) { binCount = 1; for (Node e = f; ; ++binCount) {// 注意遍历过程中令计数器自增 K ek; // 在遍历的过程中找到了想要插入的值,看情况返回 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 如果到了尾部,就插入由当前key-value构建的新节点 Node pred = e; if ((e = e.next) == null) { pred.next = new Node(hash, key, value); break; } } } // elseIf-是一个树节点 else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } // else-如果是一个保留节点 else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } } // 插入完成后,检查一下需不需要把当前链表树化 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 计数器加一 addCount(1L, binCount); // 返回null return null; }

具体逻辑见注释,解释得很详细。
putVal方法会用一个for循环保持自旋,不断尝试插入要求插入的键值对。循环内有以下几个CASE,体现为if-else块中的五个分支。
  • 表为空。调用初始化方法。
  • 散列位置为空。无锁地直接put。
  • 散列位置是一个ForwardingNode,调用helpTransfer
  • 散列位置头即为当前键,且onlyIfAbsenttrue,直接返回。
  • 散列位置不为空,说明出现了哈希冲突。
注意在遍历过程中对binCount的更新,最终用addCount()令对象加一,并用binCount作为check参数。
5.8 remove方法
public V remove(Object key) { return replaceNode(key, null, null); }final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); // 自旋 for (Node[] tab = table; ; ) { Node f; int n, i, fh; // CASE1:可以直接退出的情况:数组为空或散列结果位置为null。 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; // CASE2:节点正在移动,协助移动 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // CASE3:出现哈希冲突,要在链表中查找 else { V oldVal = null; boolean validated = false; // 锁住头节点 synchronized (f) {// 内部具体逻辑不再赘述,类似于上面的put方法 if (tabAt(tab, i) == f) { if (fh >= 0) { validated = true; // e 表示当前循环处理结点,pred 表示当前循环节点的上一个节点 for (Node e = f, pred = null; ; ) { K ek; // 找到 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } pred = e; if ((e = e.next) == null) break; } } else if (f instanceof TreeBin) { validated = true; TreeBin t = (TreeBin)f; TreeNode r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } } if (validated) { if (oldVal != null) { // 如果是一次删除,元素个数减一 if (value =https://www.it610.com/article/= null) addCount(-1L, -1); return oldVal; } break; } } } return null; }

关键还是在于锁住链表头来实现线程安全。逻辑直接看源码即可。

    推荐阅读