ConcurrentHashMap(1.8)

前言 1.8后的ConcurrentHashMap与之前有截然不同的设计,之前是分段锁的思想,通过采用分段锁Segment减少热点域来提高并发效率。1.8利用CAS+Synchronized来保证并发更新的安全,底层采用数组+链表+红黑树的存储结构。
在此再一次膜拜Doug Lea大神,高山仰止。1.8的ConcurrentHashMap有6313行代码,之前大概是1000多行。
这篇文章也只是概括了部分功能。
相关概念 table 所有数据都被存放在table数组中,大小是2的整数次幂,存储的元素分为三种类型

  • TreeBin 用于包装红黑树结构的结点类型 ,它继承了Node,代表它也是个节点,hash为-2,内部有个root变量指向红黑树的头节点,封装了众多红黑树方法
  • ForwardingNode 扩容时存放的结点类型,并发扩容的实现关键之一 ,是一个标记,代表此处不需要扩容。
  • Node 普通结点类型
    ConcurrentHashMap(1.8)
    文章图片

    Node
static class Node implements Map.Entry { final int hash; final K key; volatile V val; volatile Node next; //辅助map.get()方法 Node find(int h, Object k) { Node e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } //不支持set方法改变value值 public final V setValue(V value) { throw new UnsupportedOperationException(); }

【ConcurrentHashMap(1.8)】value和next都用volatile修饰,保证并发的可见性
ForwardingNode
static final class ForwardingNode extends Node { final Node[] nextTable; ForwardingNode(Node[] tab) { super(MOVED, null, null, null); this.nextTable = tab; }

ForwardingNode作用在扩容期间,它是一个标记
nextTable 扩容时新生成的数组,其大小为原数组的两倍
sizeCtl
/** * Table initialization and resizing control.When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads).Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */ private transient volatile int sizeCtl;

用于table数组初始化及扩容控制,下面来看看它是如何控制的:
接下来的思路是沿着sizeCtl来跟踪源码,但是很多方法具有多种功能,比如addCount...会牵扯很多其它的概念,这里就把他们先剔除出去,先沿着一条简单的线来解读
sizeCtl在初始化与扩容中的作用 1,初始化:ConcurrentHashMap有五个构造器,不考虑构造时指定集合的,其他四个都没有在初始化期间创建table数组对象,而是将这一操作下放到第一次调用put插入键值对时。sizeCtl决定了table数组的大小,无参构造器则sizeCtl为默认值0,他会直接影响到table数组的大小,为16;传入了初始值大小,经过tableSizeFor将初始值改为2的n次幂
final V putVal(K key, V value, boolean onlyIfAbsent) { .......省略 for (Node[] tab = table; ; ) { Node f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); ......省略

initTable会被调用
private final Node[] initTable() { Node[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node[] nt = (Node[])new Node[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }

首先sc = sizeCtl,将sizeCtl值保存起来,之后当一个线程cas设置sizeCtl值为-1成功,之后的线程都将被拒绝,通过执行Thread.yield()。也就是说只允许一个线程执行初始化table 数组操作。sc == 0则table大小为16,否则就为sizeCtl大小的值。数组创建完成后sizeCtl = n - (n >>> 2),相当于原先值的0.75,这之后sizeCtl代表阀值。
2,接下来看看它在扩容上的控制: 扩容-是transfer方法,在addCount里被调用,而addCount在putVal最后被调用,所以想要理解sizeCtl的变换得先从put操作开始。(这里只是按照这条线来说,这些方法在很多地方都有调用)
在分析put前先来看看三个方法:tabAt,casTabAt,setTabAt
static final Node tabAt(Node[] tab, int i) { return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }static final boolean casTabAt(Node[] tab, int i, Node c, Node v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); }static final void setTabAt(Node[] tab, int i, Node v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }

ABASE表示table中首个元素的内存偏移地址,所以(long)i << ASHIFT) + ABASE得到table[i]的内存偏移地址:
  1. tabAt利用Unsafe来从内存直接获取tab数组中i位置的节点。为什么不用table[i]来直接获取?虽然table是volatile修饰的,但并不能保证数组中的元素是最新的,所以直接从内存上拿到最新的值,
  2. casTabAt利用cas原子操作来设置i位置的头节点,所以旧值一共是null才能设置成功。
  3. setTabAt利用Unsafe直接更改内存上tab[i]处节点。
为什么有两个写操作?下面是注释
* Note that calls to setTabAt always occur within locked regions, * and so in principle require only release ordering, not * full volatile semantics, but are currently coded as volatile * writes to be conservative.

意思是:setTabAt 调用是在拥有锁的状态下,也就是被synchronized保护起来的状态下,所以没有原子性的考虑。
put
public V put(K key, V value) { return putVal(key, value, false); }/** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value =https://www.it610.com/article/= null) throw new NullPointerException(); int hash = spread(key.hashCode()); //计算出hash int binCount = 0; for (Node[] tab = table; ; ) { Node f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node e = f; ; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; if ((e = e.next) == null) { pred.next = new Node(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }

(直接在代码里注释显示不清,所以下面就是在分析代码逻辑)
  1. key与value都不能为null
  2. spread计算key的hash值。(h ^ (h >>> 16)) & HASH_BITS; //0x7fffffff;保证了hash >= 0.
  3. 循环+cas,循环cas是Java相较于synchronized的另一种锁实现,之前文章介绍过。
  • 如果tab == null执行initTable操作,上面介绍过。
  • 利用tabAt取出i处头节点赋给f,若为null则利用casTabAt设置头节点。
  • 若f的hash == MOVED,说明有线程在i处正在执行扩容操作,执行helpTransfer,该线程帮助执行扩容任务,之后再新数组中再添加值。
  • 以上情况都不是,利用synchronized 锁住头节点f确保线程安全,区分是链表还是树,执行插入操作;如果是链表,那么在遍历过程中++binCount,最后如果binCount > 8,调用treeifyBin树化
  1. 在成功插入了一个新的元素后,addCount会被调用,这个方法一共做了两件事,增加个数,扩容。
addCount
两个功能:增加个数,检测是否进行扩容。这里主要分析扩容
private final void addCount(long x, int check) { CounterCell[] as; long b, s; CounterCell,baseCount与size等计算map元素个数的方法有关,之后介绍; if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { .....省略......................................... if (check >= 0) { Node[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }

一开始说过这段源码的解析是沿着sizeCtl这条线,来看看它的作用的,所以这里省略了addCount的其他功能。
前面sizeCtl在经过initTable后,代表的是阀值,当元素个数>=sizeCtl后进行扩容,但是不要忘了多线程环境,假设一个场景:sizeCtl此时代表阀值,多线程下其中一个线程进入了while循环里,调用resizeStamp方法(先不管该方法的意义),之后由于sizeCtl > 0,执行CAS将sizeCtl设置为(rs << RESIZE_STAMP_SHIFT) + 2,之后调用transfer(tab, null)扩容,这时nextTable一定为null,所以transfer方法传null。那么现在就已经有了一个线程在进行扩容操作,那么对于其他进入循环里的线程该如何处理?它们在条件允许下会帮助进行扩容操作,每增加一个线程就cas将sizeCtl +1 。
这里的条件允许指的是扩容未完成即nextTable!=null;数组未分配完即transferIndex>0(看下面扩容部分);sizeCtl未发生变化因为扩容结束或有其他线程抢先该值都会发生变化;扩容线程未超过允许值即MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
先来看看resizeStamp方法
private static int RESIZE_STAMP_BITS = 16; static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }

numberOfLeadingZeros返回数的二进制中左侧0的个数,比如传入2左侧有30各0,就返回30;传入32,左侧有26个0就返回26;
resizeStamp:比如传入16,返回数的二进制是1000000000011011;
回到上面假设场景:第一个线程调用resizeStamp后得到数rs,之后cas将rs左移16位再加2。以n =16为例,sizeCtl变为1000000000011011 0000000000000010。此时sizeCtl < 0,后16位的大小假设为N,则代表目前有N-1个线程在执行扩容操作。
在第一个线程将sizeCtl改变后,sizeCtl<0,其他线程会进入if (sc < 0) {}代码块中,判断能否执行扩容操作,就是上面说的五个条件判断。这里可以看到sizeCtl对扩容操作的影响。
上面的分析可以看到sizeCtl在并发扩容期间起到的重要作用
总结一下sizeCtl的变化
table 初始化: 1,根据你调用的构造函数的不同,比如无参则sizeCtl = 0,initTable中将数组初始化为16; 若传了大小,则先经tableSizeFor改变大小确保为2的n次幂,之后赋给sizeCtl, initTable中将数组初始化为sizeCtl大小 2,=-1 在初始化数组期间,即initTable里为了保证只有一个线程能够初始化table数组, 线程会利用cas将sizeCtl改为-1,之后的线程检测到sizeCtl< 0会退回到就绪状态 3,数组初始化完成后sizeCtl变为为阀值,大小为0.75倍数组大小 扩容: 第一个执行扩容的线程会将sizeCtl变为< 0,扩容期间sizeCtl低16位数大小假设为N, 则代表有N-1个线程在执行扩容操作。 下面的源码分析可以看出,很多方法会判断sizeCtl的正负,<0则代表正在扩容,>0则代表阀值

在上面以sizeCtl为线的分析中,出现很多方法,还有一些方法的功能没有分析全,下面来分析分析它们
扩容
transfer 扩容涉及到两个操作:1,新建新数组nextTable。2,将原数组元素移到新数组。
private final void transfer(Node[] tab, Node[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) {// initiating try { @SuppressWarnings("unchecked") Node[] nt = (Node[])new Node[n << 1]; nextTab = nt; } catch (Throwable ex) {// try to cope with OOME 创建nextTab数组失败,sizeCtl 此时为负值,重新赋值,这样其他线程便会来初始化新数组, 可去查看addCount帮助理解 sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } .........

如果nextTab == null,新建一个新数组大小为原数组的2倍。多线程下应该只允许一个线程创建nextTab数组,那么如何实现?在前面提到的addCount方法里实现,通过cas改变sizeCtl值,其他线程在条件允许下会调用transfer(tab, nt),nt为创建后的nextTab数组,这样就实现了多线程并行扩容
int nextn = nextTab.length; 之前介绍过,它就是个标记节点,标识 ForwardingNode fwd = new ForwardingNode(nextTab); 下面for循环里面的while循环是为了配合CAS操作,即自旋CAS乐观锁形式, 意思是当该线程设置transferIndex失败,就在尝试一次,但是问题随之而来 如何控制什么时候结束?这就是advance的作用 boolean advance = true; boolean finishing = false; 代表旧数组元素是否已经全部移到新数组i就是该线程在属于自己的stride区里移动的指针;bound就是该区域的边界(包含边界) for (int i = 0, bound = 0; ; ) { Node f; int fh; while (advance) { int nextIndex, nextBound; 首先将i减一,再与bound比较,意思是如果大于bound则代表线程还没有处理完自己的stride区域 finishing为true,代表当前线程是唯一的扩容线程了,其他的扩容线程已经结束, 该最后留下的线程会执行table重赋值操作,逻辑在第二块代码中 if (--i >= bound || finishing) advance = false; transferIndex就是前一个stride区域的边界值(被包含在前一块区域里), 它为0代表数组中没有剩下的空间需要你操作了 else if ((nextIndex = transferIndex) <= 0) { i = -1; i设为-1,为了进入第二块代码,在哪里检测是否完成扩容 advance = false; } 运行到这里则代表数组中还有未分配的位置,那么就执行CAS将transferIndex重新赋值, 得到该区域的边界bound与指针i else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } }

这里我的理解是:
首先这里的for循环意味着一个线程它将一个位置上的链表或树移到新数组后,又会再次循环移动另一位置直到整个数组全部完成。for循环整体代码分为5块,上面贴的是第一块。
这第一块的作用是什么?ConcurrentHashMap允许多线程并发扩容,那么位置如何选择?多线程下如何保证全部位置都转移完?这就是第一块代码的功能。在方法代码一开始计算了一个stride值,它根据你的cpu数与数组大小计算得来,最小值为16,它的作用是?每个线程扩容都会从数组中得到一块区域,这块区域的转移工作归该线程负责,完成就去重新申请下一块区域,这块区域大小就是stride。
比如第一个线程他的区域是[n-1,n-stride],第二个线程的区域[n-stride-1, n-2*stride]。
上述功能的具体实现在代码中已详细说明。
接下来看看第二块:能进入该块代码说明数组已经被线程们分配完了,等他们全部执行完自己的stride区域,扩容就完成了。
if (i < 0 || i >= n || i + n >= nextn) { int sc; finishing为true,将nextTable 归零,table重新赋值, sizeCtl重新变为阀值,为新数组大小的0.75 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); 相当于0.75 return; } cas将sizeCtl减一,因为已经不需要该线程帮助扩容了,该线程可以结束了 此时的sizeCtl后16位大小等于当前扩容线程数+1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { 这里如果相等代表此线程是最后一个扩容线程 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // 再次循环一次,会执行if (finishing){}里代码,之后退出,扩容结束 } }

接下来第三块:
else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd);

如果该位置为空,则用cas添加一个ForwardingNode节点,之前说过它是个标记,hash值为-1,它标志着该位置为不需要扩容操作,可能该位置原本就为null,或者已经执行了扩容操作;
ConcurrentHashMap运行中有些线程在插入,有些线程在执行扩容,如何避免相互影响?
  1. 在put中发现当前位置节点hash为-1,也就是ForwardingNode,那么put线程转而执行helpTransfer操作,帮助执行扩容,扩容完成后再插入;
  2. 具体插入与扩容代码是被synchronized保护的,而它们的锁都是头节点对象。获取不到锁的线程会等待,那么之后重新获取到锁还能够继续执行吗?我的意思是例如线程A要在该位置插入,由于该位置正在执行扩容,锁被占用,于是A阻塞,之后扩容执行完,A获取到了锁应该继续执行插入操作吗?不应该,这就是在synchronized 代码中再次校队头节点的必要性,即if (tabAt(tab, i) == f)判断。对于A线程他会在putval的for代码块内再循环一次,会检测到此时头节点的hash值已为-1,执行helpTransfer操作,帮助进行扩容,整体扩容完成后再插入节点。
来看看第四块:
else if ((fh = f.hash) == MOVED) advance = true; // already processed

表明该位置已经扩容过了,重赋值advance为true,确认下一位置i
最后第五块:
else { synchronized (f) { if (tabAt(tab, i) == f) { Node ln, hn; if (fh >= 0) { 该位置的链表按runBit的值分为两类, int runBit = fh & n; Node lastRun = f; 按runBit节点分成两类,每当遇到不同于之前节点值就标记为lastRun for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node(ph, pk, pv, ln); else hn = new Node(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { ........省略树的操作,逻辑与上面类似 } } }

这一块是真正执行扩容操作的一块,被synchronized保护,锁是头节点,获取锁后会再次对头节点进行校队,确保没有改变。
画了个图来说明上面代码的操作,假设有七个节点,1,2,6,7节点runBit为0.
在上面代码经历第一个for循环后,lastRun为6,其runBit为0,其后lastRun被赋值给ln,那么hn为null。

ConcurrentHashMap(1.8)
文章图片

第二个for循环后,节点分类情况,2为ln,4为hn,在该过程中节点会被 倒序
ConcurrentHashMap(1.8)
文章图片

ConcurrentHashMap(1.8)
文章图片

最后利用setTabAt将以ln为头的链表放到nextTab的i位,即仍放在原为不动,将以hn为头节点的链表放到nextTab的i+n位置,接着将tab的i位改为fwd,即标记节点ForwardingNode,告诉其他线程该位置已经扩容完成。最后将advance = true,继续循环去找下一位置扩容。
上面提过在putVal时如果发现该位置头节点hash为-1,即ForwardingNode节点,调用helpTransfer
helpTransfer
final Node[] helpTransfer(Node[] tab, Node f) { Node[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode)f).nextTable) != null) { int rs = resizeStamp(tab.length); 前面解析过 循环的这些判断条件为tue的话表明扩容未结束 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {扩容时sizeCtl一定小于0 resizeStamp的参数大小不变则值相等,sizeCtl的高16位就是resizeStamp的返回值左移16位 sc == rs + 1这个判断对应的情况没想明白 MAX_RESIZERS表示并发扩容所允许的最大线程数 transferIndex <= 0在上面分析过,扩容中transferIndex表示最近一个被分配的stride区域的边界, <=0代表数组被分配完了 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; cas配合while循环构成自旋CAS,保证操作原子性。将sizeCtl+1, sizeCtl此时的低16位为N=扩容线程数+1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }

回到putVal逻辑,在扩容操作整体完成后,线程回到putVal中,再次循环将节点插入。
Size 之前在解析addCount时有部分代码被省略,省略的那部分代码与ConcurrentHashMap的size操作有关。对于ConcurrentHashMap来说table中的节点数量是个不确定的值,你没法停下所有正在执行各种操作的线程们来统计准确数字,所以最终得到的只是个估计值。下面来看看如何统计出来的。
首先来看看一些相关内部类与变量:
@sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = https://www.it610.com/article/x; } } 同步状态,利用CAS从0改为1代表获取锁 private transient volatile int cellsBusy; 初始大小为2,每次扩容翻倍,存储CounterCell对象,该对象有个value变量,用来存储个数 private transient volatile CounterCell[] counterCells; 它也被用来存储个数,在下面的源码分析中发现,在counterCells为空时会将个数累加在bseCount里; 若counterCells非空,在将个数存储进counterCells失败后,会将其累加进baseCount private transient volatile long baseCount;

mappingCount与size 这两个方法都是统计个数的,不同在于size返回int,mappingCount返回long,文档注释建议使用mappingCount
public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values }public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); }final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }

可以看出sumCount是关键。统计的方法就是遍历counterCells将每个位置存储的值相加再加上baseCount的值,和就是此时的个数估计值。
为了搞清一开始说的三个变量的用途,回到addCount里被我省略的部分:
private final void addCount(long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); }

counterCells
private final void fullAddCount(long x, boolean wasUncontended) { int h; h大小初始化为0x9e3779b9; 用来(n - 1) & h得到相应位置的CounterCell if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (; ; ) { CounterCell[] as; CounterCell a; int n; long v; if ((as = counterCells) != null && (n = as.length) > 0) { 先初始化CounterCell,value就是参数x即个数;cellsBusy相当于AQS中的同步状态, 即可以看成是一个锁,获取锁操作就是cas将其从0改为1;被锁保护的代码中, 将CounterCell对象放进(n - 1) & h位置,再将cellsBusy = 0;成功就结束循环 if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) {// Try to attach new Cell CounterCell r = new CounterCell(x); // Optimistic create if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try {// Recheck under lock CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; 说明该位置已被其他线程放入了CounterCell } } collide = false; } 在addCount中如果CAS设置CounterCell中的value失败,uncontended为false, 即参数wasUncontended为false,所以它表示CAS失败。回到addCount在发现能够执行到cas设置 value这步说明前面三个判断都为false,即表明CounterCell非空,且数组该位置CounterCell不为null, 说明已经记录了值。 else if (!wasUncontended) wasUncontended = true; 设为true,重新cas设置CounterCell的value else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; else if (counterCells != as || n >= NCPU) collide = false; counterCells 已被更改或counterCells数组达到最大值; else if (!collide) collide = true; else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = ThreadLocalRandom.advanceProbe(h); } else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try {// Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }

来看看无限for循环中的处理逻辑:
  1. 如果counterCells数组不为空
    • 利用(n - 1) & h取出该位置CounterCell,若为空,初始化CounterCell对象,CAS更改同步状态cellsBusy的值,可以看成是一种获取锁的操作,将对象放入该位置。break退出循环,方法结束。
    • 若该位置为null,检测wasUncontended值,false说明addCount利用CAS更改该位置CounterCell的value值失败,那么对于失败该如何处理?再次CAS尝试更改该为值的value值?并不,看源码发现除了将wasUncontended 改为true后,最后会调用h = ThreadLocalRandom.advanceProbe(h)更改h的值,也就是将值x存储在其他位置,该位置可能为null或已存储值
    • 该位置不为null,CAS更改value值,成功则退出循环,方法结束。
    • 这里的判断成立则代表counterCells 已被更改或counterCells数组达到最大值,更改h值继续尝试。
    • !collide判断,collide若为true代表上个位置非空;以我的理解collide的作用便是在对CounterCels数组扩容前(即下个else if判判断中的操作),再次更改h值再循环尝试一次,增加一次尝试的机会,可能扩容数组再次尝试操作对性能有影响吧。
    • 前面的尝试都失败,那么就扩容counterCells 数组,再次尝试
  2. counterCells 数组未初始化,首先获取锁,初始化数组大小为2,将值x存储在h & 1位置;来看看它的判断条件(cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)),先确认同步状态cellsBusy的值,再确认数组counterCells并未被其他线程更改,最后CAS来获取锁。看到问题了吗?如果在你确认前两个后在CAS之前它们被更改了,怎么办?在代码里看到在操作之前仍会再次判断counterCells == as,若被更改,则init为false,再次for循环处理。
  3. 上面都没成功,那就把x加到baseCount里,结束方法。
总结一下就是:cellsBusy是同步状态,将以下四个操作隔离开。1.给数组中某为空的位置存储CounterCell对象操作。2. 给数组中某非空的位置,累加其CounterCell的value值。3. CounterCell数组扩容。4. 初始化CounterCell数组。
注:从上面一路分析下来,一个问题ConcurrentHashMap如何保证线程安全性?
在put中,真正添加节点的操作是被synchronized保护的,还有扩容时移动节点的操作也是被synchronized保护起来的,它们时安全的。那对于共享变量的操作呢?虽然它们都用了volatile修饰,但很多操作会根据它们原先的值来决定新值,这就需要原子性的保证,所以采用自旋CAS确保这些共享变量值更改的安全性。
但是并不是自旋CAS原子改变同步状态就再配合synchronized就万事大吉了。比如再addCount里,一个线程CAS将sizeCtl更改,之后调用transfer,先初始化nextTable数组,但是这是多线程环境,情况远比想象的要复杂,比如,在你未初始化完nextTable之前,其他线程transfer由于检测到了sizeCtl<0,要来帮助扩容,若直接调用transfer(tab, nt); 而此时nextTable还未初始化完成,那么那些线程就会去执行初始化nextTable操作,这是不允许的,所以在源码中会先对线程进行扩容条件判断,在判断合格后,仍会使用CAS尝试更改sizeCtl,为什么?因为很多状态的改变都会更改sizeCtl的值,比如扩容完成了,那么sizeCtl会被改变;比如有其他线程抢先,那么transferIndex与sizeCtl一定改变了,这些情况的发生可能在你条件判断合格之后,那么就不能让改线程轻易去调用transfer,所以才会有CAS对sizeCtl的再次判断。从这里也看出了sizeCtl的重要性,它与太多情况相关,不禁再次感慨大神的高山仰止。
所以我认为ConcurrentHashMap是以synchronized与CAS为基,每种操作都充分考虑到不同的情况下实现的线程安全
在ConcurrentHashMap中还有一部分是与红黑树的变化有关,如一开始提到的TreeBin,后面的文章再说
参考
深入浅出ConcurrentHashMap1.8
ConcurrentHashMap源码分析(JDK8版本)
更好地理解jdk1.8中ConcurrentHashMap实现机制

    推荐阅读