在前一文 中,我们讲了HashMap的一些知识点,文末提到HashMap不是线程安全的,在并发的场景下,建议使用ConcurrentHashMap,它是java.util.concurrent并发包下面的一款线程安全的HashMap容器,本文就ConcurrentHashMap在JDK7和JDK8中的实现进行分析。
与HashMap想通的一些知识,本文就不再赘述了,可以看HashMap的文章。
JDK7 JDK7的ConcurrentHashMap中,加入了分段锁的逻辑,在分段锁的设计中,数据会被按照一定的规则分段(一般是对数据的hashcode,进行二次哈希计算),每个段使用一个锁,这样如果访问的数据不在一个分段中,则不会产生锁冲突,从而提高并发性能。
HashMap与ConcurrentHashMap的数据结构对比如下:
可以看出,主要区别在于ConcurrentHashMap多了一层Segment分段锁,每个Segment下面是一个数组+链表的HashMap结构。
数据结构 ConcurrentHashMap实现线程安全的关键就在于Segment分段锁,当需要修改元素时,要先获得Segment锁,从而保障在并发场景下的数据安全。Segment是继承自ReentrantLock ,一个基于AQS的重入锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class ConcurrentHashMap <K , V > extends AbstractMap <K , V > implements ConcurrentMap <K , V >, Serializable { final Segment<K,V>[] segments; static final class HashEntry <K ,V > { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; } static final class Segment <K ,V > extends ReentrantLock implements Serializable { transient volatile HashEntry<K,V>[] table; transient int count; transient int modCount; transient int threshold; final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this .loadFactor = lf; this .threshold = threshold; this .table = tab; } } }
segment初始化 segments成员变量的初始化逻辑为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 static final int DEFAULT_INITIAL_CAPACITY = 16 ;static final float DEFAULT_LOAD_FACTOR = 0.75f ;static final int DEFAULT_CONCURRENCY_LEVEL = 16 ;public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0 ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; int sshift = 0 ; int ssize = 1 ; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1 ; } this .segmentShift = 32 - sshift; this .segmentMask = ssize - 1 ; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1 ; Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int )(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); this .segments = ss; } public ConcurrentHashMap (int initialCapacity, float loadFactor) { this (initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); } public ConcurrentHashMap (int initialCapacity) { this (initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } public ConcurrentHashMap () { this (DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); }
segments数组大小(ssize)为大于等于concurrencyLevel的最小2次幂,例如concurrencyLevel为14,则ssize为16,concurrencyLevel为18,ssize则为32,默认情况下concurrencyLevel为DEFAULT_CONCURRENCY_LEVEL,即是默认情况下,数据会被分成16个段,
segment延迟初始化 在ConcurrentHashMap的初始化函数中,只初始化了segments数组中的第一个元素,其他元素并没有着急初始化,而是放在使用过程中进行初始化的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private Segment<K,V> ensureSegment (int k) { final Segment<K,V>[] ss = this .segments; long u = (k << SSHIFT) + SBASE; Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { Segment<K,V> proto = ss[0 ]; int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int )(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null ) { if (UNSAFE.compareAndSwapObject(ss, u, null , seg = s)) break ; } } } return seg; }
因此segments数组中其他未分配内存的元素,也存在并发访问的情况,即是ensureSegment函数也需要考虑线程安全问题,JDK并没有采用常规的锁的方式,而是使用UNSAFE.getObjectVolatile()提供的原子操作,结合CAS函数UNSAFE.compareAndSwapObject()来保证函数的并发访问。
segment寻址 要定位一个数据的位置,首选需要确定属于哪个segment,然后再确定在HashEntry数组中的位置,segment的寻址算法为:(hash >>> segmentShift) & segmentMask
,其中:
segmentShift:2的sshif次方等于ssize,segmentShift=32-sshift。例如ssize=16,则sshif=4,segmentShift=28;ssize=32,则sshif=5,segmentShift=27;
hash >>> segmentShift:hash值为32位,这一步即是将hash右移segmentShift位,只保留高位;
segmentMask:段掩码,假如ssize=16,则segmentMask=15,segmentMask的所有位都为1;
最后将两部分做与运算,得到数据所在的segment。
put方法 先来看看ConcurrentHashMap的put方法:
1 2 3 4 5 6 7 8 9 10 public V put (K key, V value) { Segment<K,V> s; if (value == null ) throw new NullPointerException(); int hash = hash(key.hashCode()); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null ) s = ensureSegment(j); return s.put(key, hash, value, false ); }
这个方法就比较简单了,大致就3个步骤:计算数据的hash值;定位hash值对应的segment,并确保其已初始化;调用Segment#put方法。
记下来看看Segment的put方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 final V put (K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1 ) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null ) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break ; } e = e.next; } else { if (node != null ) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1 ; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null ; break ; } } } finally { unlock(); } return oldValue; } private HashEntry<K,V> scanAndLockForPut (K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this , hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null ; int retries = -1 ; while (!tryLock()) { HashEntry<K,V> f; if (retries < 0 ) { if (e == null ) { if (node == null ) node = new HashEntry<K,V>(hash, key, value, null ); retries = 0 ; } else if (key.equals(e.key)) retries = 0 ; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break ; } else if ((retries & 1 ) == 0 && (f = entryForHash(this , hash)) != first) { e = first = f; retries = -1 ; } } return node; }
get方法 get方法源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public V get (Object key) { Segment<K,V> s; HashEntry<K,V>[] tab; int h = hash(key.hashCode()); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null ) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long )(((tab.length - 1 ) & h)) << TSHIFT) + TBASE); e != null ; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null ; }
由于共享变量都被volatile修饰,而且扩容的时候,也是直接开辟一块新内存进行扩容,不涉及原链表中的操作,因此get方法无需加锁,直接使用原子读操作即可,因此ConcurrentHashMap的读操作性能是很高的。
size方法 在Java - 锁 一文中,我们也讲到计算分段锁的size问题,ConcurrentHashMap中的处理方法与文章中描述的方法大同小异,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public int size () { final Segment<K,V>[] segments = this .segments; int size; boolean overflow; long sum; long last = 0L ; int retries = -1 ; try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0 ; j < segments.length; ++j) ensureSegment(j).lock(); } sum = 0L ; size = 0 ; overflow = false ; for (int j = 0 ; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null ) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0 ) overflow = true ; } } if (sum == last) break ; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0 ; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
扩容 put()函数中追加结点之后,如果元素个数超出了阈值,就会触发扩容,由于此时线程已经持有了当前分段的锁,因此扩容过程不会产生并发问题,直接重新计算结点hash并放到新的位置上,代码篇幅较长,逻辑较为简单,就不赘述了。
JDK8 JDK7中虽然是支持并发修改,但是其并发程度受制于分段锁的数量,默认情况下为16,并且分段锁创建之后就不能修改,因此锁的粒度还是有点粗。在JDK8中,将锁的粒度降低到了更细的级别。
数据结构 JDK8中的ConcurrentHashMap,从数据结构来说,跟JDK8中的HashMap数据结构类似:
put方法 在JDK8中,抛弃了JDK7中的分段锁的并发安全机制,转而使用CAS+synchronized的方式,我们来看看put方法的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node<K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }
即是将JDK7中的分段锁,改成了直接利用synchronized锁表头或者根结点对象(结点不存在时,则使用CAS添加结点)。
get方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public V get (Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0 ) return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null ; }
get方法的代码逻辑就比较简单了:
先根据key计算hash值,并找到对应的Node数组中的元素;
如果元素为空,则直接返回null;
否则判断元素hash值,如果小于0,则表示是红黑树(TreeNode),或者正在扩容(ForwardingNode结点),调用对应的类的find方法进行查找。
size方法 size方法本身比较简单:
1 2 3 4 5 6 7 8 9 10 public int size () { long n = sumCount(); return ((n < 0L ) ? 0 : (n > (long )Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int )n); } public long mappingCount () { long n = sumCount(); return (n < 0L ) ? 0L : n; }
size()函数返回值为int类型,因此最大只能返回Integer.MAX_VALUE,但是Map的容量有可能超过Integer的最大值,所以JDK8中增加了mappingCount()函数,返回一个long,JDK也推荐使用这个方法。不过由于并发场景的存在,size()方法返回的值也不不一定是准确的。
两个函数都用到了sumCount()函数:
1 2 3 4 5 6 7 8 9 10 11 final long sumCount () { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
逻辑也比较简单,将成员变量baseCount和counterCells数组中的value累加,就得到了sum。但是背后的技术就有点意思了,我们先来看看相关联的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private transient volatile long baseCount;private transient volatile CounterCell[] counterCells;@sun .misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } } private final void addCount (long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; s = sumCount(); } }
解释一下上面这段代码:
在addCount()函数中,先使用CAS更新baseCount,如果更新成功,则直接返回;否则继续;
这即是baseCount注释中描述的,当没有出现争用时,使用baseCount变量。
如果CAS失败,则说明出现了线程竞争,则使用counterCells来更新;
关于CounterCells的定义,有两个小知识点可以花3-5分钟先了解一下:
CounterCells改编自LongAdder和Striped64,主要目的是用来提高总体的吞吐量;
@sun.misc.Contended注解主要是用来解决“伪共享”的问题,在之前的CPU缓存与MESI 一文中有涉及到,具体可以阅读一下引用的文章 。
使用ThreadLocalRandom.getProbe(),随机选择一个counterCell,然后利用CAS来改变其volatile修饰的value,如果成功,则返回;否则使用fullAddCount()函数来更新;
ThreadLocalRandom是一个线程私有的伪随机数生成器,每个线程的probe都是不同的,可以认为每个线程的probe就是它在CounterCell数组中的hash code。
fullAddCount()函数篇幅过长,而且需要一些LongAdder的背景知识,这里就不深入了,简单来说,函数中利用for (;;)
死循环,来更新counterCell中的value,直到更新完成。
了解上述逻辑之后,就能理解sumCount()函数的意思了,将未冲突情况下的count作为基数,与各个线程在CounterCell中增加的count累加,最终得到整个ConcurrentHashMap的size。
扩容 虽然JDK8中降低了锁的粒度,改成了表头或者根节点,但是也是由于取消了分段锁,因此扩容的时候会影响整个数组,导致一个线程触发扩容时,其他线程都不能修改元素。针对这个情况,JDK8中采用了并发扩容的机制,被阻塞的线程与其等着单线程扩容完成,还不如一起参与扩容操作。
并发扩容本身的核心逻辑比较简单,但是需要有各种标志位,以及CAS的配合,在详细介绍扩容逻辑之前,先看看几个扩容相关的关键变量以及数据结构。
nextTable
扩容时的新数组,容量是原数组的两倍;
sizeCtl:
sizeCtl=0:初始值;
sizeCtl>0:表示当前数组元素个数达到sizeCtl时,会触发扩容;
sizeCtl=-1:通过CAS将sizeCtl设置为-1,表示进入了扩容逻辑;
sizeCtl<-1:表示有-sizeCtl-1个线程正在参与扩容(每新增一个扩容线程,sizeCtl减一)。
transferIndex
扩容时,标记当前正在扩容中的所有线程正在负责的数据范围的下届,例如transferIndex=10,表示[0, 10)这些元素还没有线程来处理。扩容初始的时候,会设置为原数组大小。
举个例子,假设原数组容量为48,则扩容初始的时候,transferIndex被设置为48,有一个线程参与扩容,并被分配16个结点之后,transferIndex被设置为32;再来一个线程扩容,transferIndex被设置为16;最后再来一个线程,则transferIndex被设置为0。
ForwardingNode
一个占位结点的数据结构,当有线程完成了数组中某个结点(可能是单个结点,可能是一个链条,可能是一颗红黑树)的转移工作之后,会将当前节点设置为ForwardingNode类型的结点,告诉后续访问此结点的线程:这个节点已经被转移到新数组中了,请在新数组中进行操作。
1 2 3 4 5 6 7 8 9 10 11 12 static final int MOVED = -1 static final int TREEBIN = -2 static final class ForwardingNode <K ,V > extends Node <K ,V > { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super (MOVED, null , null , null ); this .nextTable = tab; } }
扩容逻辑大致分为以下几个步骤:
触发扩容
扩容准备工作
触发扩容的线程,会利用CAS设置扩容标志位,让后续线程知道已经进入了扩容逻辑;
根据CPU核心数,以及数组元素数量,计算得到每个线程需要处理的元素个数;
扩容中
当有put线程进来,如果发现正在扩容,则判断是否已经有足够的线程在进行扩容工作;
如果没有,则自己加入到扩容队伍中,并且完成第一次分配的结点区间的扩容工作之后,会判断是否需要继续处理其他区间(取决于是否有足够多的线程加入了扩容工作);
否则在扩容中的新数组中,进行数据插入操作;
扩容线程会根据结点的hash值,将结点从旧数组移到新数组中的对应位置;
数组中的某个结点(可能是单个结点,可能是一个链条,可能是一颗红黑树)处理完成之后,使用占位结点设置到当前位置,告知后续访问此结点的线程;
由于扩容过程中,结点之间的next关系不变,因此并不影响get操作。
逐行代码的注释,可以参考文末的文章。
参考
-=全文完=-