接上篇,持续剖析桶为链表状况下的数据迁徙。
扩容迁徙
首先tab的size始终是2的n次幂
,转换成二进制就是100..00
的模式
而落点桶的计算公式为:plot = (n-1)&hash
-> 11..11&hash
那么hash的第n个地位(runBit=hash&n)是0还是1,决定了迁徙后的落点桶:
- 如果runBit=0,那么新tab的落点为
plot
- 如果runBit=1,那么新tab的落点为
plot+n
基于上述剖析,迁徙时会对桶中的链条从新组装(当然会先对头节点对象做syn加锁)
- runBit=0的组成低位链条ln,runBit=1的组成高位链条hn;并最终放入新table中(上图红色局部)
- 旧tab的迁徙桶会指向Forwarding Node节点(fwd,上图紫色局部)
- 随着syn锁开释,旧链node1->node2->node3会被GC回收(上图绿色局部)
给出transfer()办法
的源码:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // ## 迁徙分片,最小MIN_TRANSFER_STRIDE(16) if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 创立新tab if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } // transferIndex-传输地位,初始化为旧数组大小 // stride-迁徙分片 // nextn - 新数组的大小 int nextn = nextTab.length; // A node inserted at head of bins during transfer operations. // 迁徙过程中的非凡标记,在迁徙桶的头节点;它的hash值固定为MOVED=-1 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 迁徙正在执行的标记 boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab // i-正在迁徙桶的索引;boud-线程本次迁徙范畴的最小索引 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // ****** A、参数设置阶段 ****** while (advance) { int nextIndex, nextBound; // -- 2.迁徙后续桶时`--i`,证实迁徙方向:从右往左 if (--i >= bound || finishing) advance = false; // $$ 下一个迁徙地位nextIndex,i<bound时会执行(将i=-1,设定了一个区间完结标记) else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // -- 1.线程迁徙它的第一个桶时赋值: // 如果旧tab的length是32 // 第一个线程:transferIndex=nextBound=16=bound、nextIndex=32(迁徙范畴,16-31) // 第二个线程:transferIndex=nextBound=0=bound、nextIndex=16(迁徙范畴,0-15) else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } // ****** A、参数设置阶段 ****** // ****** B、完结断定阶段 ****** // $$ 满足i=-1区间完结标记 if (i < 0 || i >= n || i + n >= nextn) { int sc; // tab替换,新加载因子设置 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } // 完结条件 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } // ****** B、完结断定阶段 ****** // ****** C、迁徙阶段 ****** // 迁徙桶为空,cas形式搁置fwd标记节点 // f-正在迁徙的节点 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 此桶的头节点是fw,示意正在迁徙 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // ### 对正在迁徙的节点f加锁 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; // -- 链状况 if (fh >= 0) { // runBit取决于fh的第n地位,要么是0,要么是1 // 0放在低位链ln,1放在高位链hn int runBit = fh & n; // 高位或位置链的最初一个节点 Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { // f的下一个节点p的hash值的n地位。同样的,要么是0,要么是1 int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } // 高下位拆分双链 if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } // 头插法组装链条 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } // -- 树状况 else if (f instanceof TreeBin) { // ... } } } } // ****** C、迁徙阶段 ****** } }
元素计数
执行put办法后,会对元素个数进行统计
统计形式如图:
- 先尝试简略模式,以cas形式对baseCount进行批改,批改胜利则完结
- 简略模式失败(比方竞争较多的状况),会尝试简单模式,创立counterCells数组,并以此进行统计
统计形式依然通过cas的形式批改随机一个数组元素的value值,最终以sum所有数组元素value的形式计算出以后map的元素个数。
实现层面,会通过cellsBusy变量来管制对counterCells数组的操作(如counterCells扩容、桶初始化等)。
给出fullAddCount()办法
的源码:
private final void fullAddCount(long x, boolean wasUncontended) { // ThreadLocalRandom产生的随机数(多线程条件下的随机工具) int h; // ThreadLocalRandom,须要初始化 if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { CounterCell[] as; CounterCell a; int n; long v; // == 一、counterCells不为空的 if ((as = counterCells) != null && (n = as.length) > 0) { // ## 1.随机抉择的桶没有元素,须要新建 if ((a = as[(n - 1) & h]) == null) { // -- cellsBusy-标识位,用于初始化或扩容 if (cellsBusy == 0) { // Try to attach new Cell CounterCell r = new CounterCell(x); // Optimistic create if (cellsBusy == 0 && // cas批改标识位胜利的,作桶位的初始化操作 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } // -- 创立后,重置标识位 finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // ## 2.随机抉择的桶有元素(a-CounterCell对象),间接对a的value值做cas累加操作 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale else if (!collide) collide = true; // ## 3.步骤2失败,做counterCells的2倍扩容 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = ThreadLocalRandom.advanceProbe(h); } // == 二、counterCells为空,初始化操作 else if (cellsBusy == 0 && counterCells == as && <i style="color:transparent">来源gaodai$ma#com搞$$代**码网</i> U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } // == 三、兜底计划,仍应用baseCount else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }
get办法
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); // tab存在,且依据hash计算出落点桶有元素 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // -- 比拟后key相等,间接返回val if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // -- 树或迁徙中(fwd) else if (eh < 0){ return (p = e.find(h, key)) != null ? p.val : null; } // -- 链:遍历链,找到就返回;找不到返回null while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
次要关注一个问题:如果要获取的元素正在迁徙中,源码中是如何解决的呢?
间接查看fwd的find()
办法:
Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; // 无tab,或落点桶无元素,返回null if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; // -- 2.新table上找到元素,返回 if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { // -- 1.迁徙中,去新表查问 if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } // 树上查找 else return e.find(h, k); } if ((e = e.next) == null) return null; } } }
tableSizeFor办法
最初察看一个有意思的办法
// 保障失去的是一个大于入参c的最小2的n次幂数 // 参考:https://www.cnblogs.com/xiyixiaodao/p/14483876.html private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }