掘金 后端 ( ) • 2024-07-01 15:25

theme: fancy

本文已收录于:https://github.com/danmuking/all-in-one(持续更新)

前言

哈喽,大家好,我是 DanMu。这篇文章想和大家聊聊 ConcurrentHashMap 相关的知识点。严格来说,ConcurrentHashMap 属于java.lang.current,并属于通常意义上的容器,但是考虑到面试官非常喜欢打出 HashMap 和 ConcurrentHashMap 这套丝滑小连招,因此我决定将它们放在一起,来加深大家的印象,发车!

1636366877800.jpg

数据结构

与 HashMap 一样,ConcurrentHashMap 的底层结构在JDK1.8的时候出现过一次比较大的变化,这篇文章将以JDK1.8的数据结构为主,对于1.8前的结构,大家只需要大致了解即可(JDK版本都已经更新到21了,让我看看哪个活化石面试官还问老版的数据结构) dtb-1718985477846.jpg

JDK1.8之前

ConcurrentHashMap.drawio.png

JDK1.8ConcurrentHashMap-第 2 页.drawio.png

在JDK1.8之前,ConcurrentHashMap 采用 **Segment 数组 + HashEntry 数组 + 链表 **的结构,因此其最大并发数会收到 Segment 数量的限制。在 JDK 1.8中,对 ConcurrentHashMap 主要进行了两点改进:

  1. 细化锁的粒度,从针对 Segment 加锁修改为针对 每个数组元素单独加锁,
  2. 与 HashMap 一样,将 链表 的结构修改为 链表/红黑树 的结构,提高了搜索链表的效率

对ConcurrentHashMap的数据结构有了一个基础的认识之后,接下来就准备深入底层,彻彻底底的搞懂 ConcurrentHashMap 的实现原理。

ConcurrentHashMap 的源码相对于 HashMap 复杂了很多,希望大家能静下心来慢慢看,我会尽力帮助大家简单的理解源码中的思想。

核心源码详解

重要的成员变量

老规矩,还是先来看看 ConcurrentHashMap 中维护了哪些重要的成员属性:

// ConcurrentHashMap 中存放元素的最大值
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认的初始化大小
private static final int DEFAULT_CAPACITY = 16;
// 从链表转换为红黑树的阈值
static final int TREEIFY_THRESHOLD = 8;
// 从红黑树转换为链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 开启红黑树转换的数组阈值
// 只有同时满足 TREEIFY_THRESHOLD 和 MIN_TREEIFY_CAPACITY 才会转换为红黑树
static final int MIN_TREEIFY_CAPACITY = 64;
// 接下来几个都是标志位,用来表示扩容过程中的状态,在后面会详细解释
static final int MOVED     = -1; // 表示正在转移
static final int TREEBIN   = -2; // 表示已经转换成树
static final int RESERVED  = -3; // hash for transient reservations
// 实际存放元素的数组
transient volatile Node<K,V>[] table;
// 在扩容时会用到的数组
private transient volatile Node<K,V>[] nextTable;
/**
 * 用来控制表初始化和扩容的
 * 默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,先有个印象
 *  -1 说明正在初始化
    -N 说明有 N-1 个线程正在进行扩容
     0 默认状态,表明table没有初始化
    >0 表示 table 扩容的阈值,如果 table 已经初始化。
 */
private transient volatile int sizeCtl;
// 扩容时使用,用来表示扩容进行到哪里
private transient volatile int transferIndex;

初始化

接下来,来看看 ConcurrentHashMap 在初始化的时候做了什么

// 默认的初始化函数
// 可以看到默认的初始化函数啥都没干,所以实际分配空间也是推迟到第一次添加元素的时候
// 这里也是采用的懒加载
public ConcurrentHashMap() {
}
// 指定初始化容量的初始化函数
public ConcurrentHashMap(int initialCapacity) {
    // 这边先处理了一下两个边界情况
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               // tableSizeFor 的作用是找到最接近 initialCapacity 的2的次幂
               // 在扩容的时候也是扩容为原容量的2倍
               // 因此 ConcurrentHashMap 的容量永远是2的幂次
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}


// 因为 ConcurrentHashMap 使用了懒加载,因此实际分配空间将会推迟到第一次添加函数时
// 这里为了内容的连贯性,将它们放在一起
// initTable 初始化空间
private final Node<K,V>[] initTable() {
    // 用来存放 table 成员变量
    Node<K,V>[] tab; 
    // 用来存放 sizeCtl 成员变量
    int sc;

    // 判断空间初始化了没有
    // 注意:这里可能有多个线程想进行初始化,因此这里用了一个循环
    // 只有 cas 成功的线程才能进行初始化
    // 其他 cas 失败的线程会进入自旋,找到 ConcurrentHashMap 初始化完成,然后退出。
    while ((tab = table) == null || tab.length == 0) {
        // cas 失败的线程会走到这个条件
        // 如果 sizeCtl < 0 ,说明另外的线程执行CAS 成功,正在进行初始化。
        // 失败的线程释放 CPU,或者在 while 里自旋
        if ((sc = sizeCtl) < 0)
            // 让出 CPU 使用权
            Thread.yield(); // lost initialization race; just spin
        // cas 修改 sc 的值为 -1,只有一个线程能够成功,然后开始初始化
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // 上面说了,如果是带容量的初始化函数会将容量暂存在 sc 里
                    // 所以如果 sc > 0,就是走带容量的流程,否则就是默认初始化容量
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // 给成员变量赋值
                    table = tab = nt;
                    // 给 sc 赋值,现在 sc 存放的是扩容阈值
                    // sc为0.75数组大小
                    sc = n - (n >>> 2);
                }
            } finally {
                // 赋值给对应的成员变量
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

添加元素

下面,来看看如何向 ConcurrentHashMap 中添加元素。 要向 ConcurrentHashMap 中添加元素可分为这几个步骤:

  1. 计算 key 对应的 hash 值,根据 hash 定位到对应的数组下标
  2. 先快速判断一下对应下标是否为空,如果为空,直接使用 cas 将元素写入
  3. 否则的话就需要遍历对应数组位置的链表/红黑树,为保证遍历过程中数据不发生变化,需要先给对应数组位置加锁。
  4. 如果在遍历过程中遇到相同的 key 就直接覆盖,否则就添加到尾部。

大致理解一下流程应该能帮助大家更好的理解源码,接下来直接上硬菜:

// 向 ConcurrentHashMap 中添加元素
public V put(K key, V value) {
    return putVal(key, value, false);
}
// 添加元素中实际调用的函数
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 边界条件判断,ConcurrentHashMap 是不允许添加 null 的
    if (key == null || value == null) throw new NullPointerException();
    // 计算 hash
    int hash = spread(key.hashCode());
    // 用于记录相应链表的长度
    int binCount = 0;
    // 先把table的值存到tab里
    // 这里加了一个循环,和上面初始化时候的循环作用类似
    for (Node<K,V>[] tab = table;;) {
        // f 用来存放 key 对应数组位置的第一个节点
        Node<K,V> f; int n, i, fh;
        // 如果数组"空",进行数组初始化
        if (tab == null || (n = tab.length) == 0)
            // 上面已经说过了
            tab = initTable();

        // 找该 hash 值对应的数组下标,得到第一个节点 f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果数组该位置为空,
            // 用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就结束了,可以拉到最后面了
            // 如果 CAS 失败,那就是有并发操作,进到下一个循环就好了
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // hash 居然可以等于 MOVED,这个可以先放着,涉及到后面扩容的知识
        else if ((fh = f.hash) == MOVED)
            // 帮助数据迁移,这个等到看完数据迁移部分的介绍后,再理解这个就很简单了
            tab = helpTransfer(tab, f);

        // 到这里就是说,hash 对应的数组位置已经有值了,需要遍历链表/红黑树
        else { 

            V oldVal = null;
            // 对该数组位置加锁
            // 接下来就是遍历了
            // 如果有相同的 key 就覆盖,否则添加到尾部
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { // 头节点的 hash 值大于 0,说明是链表
                        // 用于累加,记录链表的长度
                        binCount = 1;
                        // 遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 到了链表的最末端,将这个新值放到链表的最后面
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 红黑树的遍历,这里可以不用看了,很复杂,面试也不大可能会考
                    else if (f instanceof TreeBin) { // 红黑树
                        Node<K,V> p;
                        binCount = 2;
                        // 调用红黑树的插值方法插入新节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }

            if (binCount != 0) {
                // 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
                if (binCount >= TREEIFY_THRESHOLD)
                    // 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
                    // 具体源码我们就不看了,扩容部分后面说
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

获取元素

获取元素的逻辑和添加元素差不多,这里就不在赘述了:

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 计算 hash 值
    int h = spread(key.hashCode());
    // 根据 hash 值确定节点位置
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 如果搜索到的节点 key 与传入的 key 相同且不为 null,直接返回这个节点  
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 这边涉及到一些后面扩容的知识,可以先不看,等看完后面扩容的知识在来看就懂了
        // 如果 eh<0(hash <0) 说明这个节点在树上或者在扩容中并且转移到新数组了
        // 所以这个 find 方法是树节点的 find 方法或者是 fwd 节点的 find 方法
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
         // 否则遍历链表 找到对应的值并返回
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
 // 这是 ForwardingNode 节点的 find 方法   
Node<K,V> find(int h, Object k) {
    // 注意这里的nextTable 是扩容时传进来的
    outer: for (Node<K,V>[] tab = nextTable;;) {
        Node<K,V> e; int n;
        // 没找到直接返回 null
        if (k == null || tab == null || (n = tab.length) == 0 ||
            (e = tabAt(tab, (n - 1) & h)) == null)
            return null;
        // 自旋
        for (;;) {
            int eh; K ek;
            // 找到了就返回节点
            if ((eh = e.hash) == h &&
                ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
            // 同样要判断是树节点还是 ForwardingNode 节点
            if (eh < 0) {
                // ForwardingNode 节点就继续往里找
                if (e instanceof ForwardingNode) {
                    tab = ((ForwardingNode<K,V>)e).nextTable;
                    continue outer;
                }
                else
                    // 树节点 就调用数节点的 find 方法
                    return e.find(h, k);
            }
            // 没找到就返回n ull
            if ((e = e.next) == null)
                return null;
        }
    }
}

扩容

在 ConcurrentHashMap 中还有一个非常重要的知识点,就是 ConcurrentHashMap 的扩容,与 HashMap 的库容不同,由于 ConcurrentHashMap 需要再扩容的过程中不仅要有高性能,还要保证线程的安全,因此 ConcurrentHashMap 的扩容过程可以说是相当的复杂,希望大家能够耐下心来慢慢慢看,我也会尽力帮大家理解 ConcurrentHashMap 的扩容思想。 总体来说,ConcurrentHashMap 的**核心思想就是:分而治之。**在扩容过程中,ConcurrentHashMap 将数组划分成若干个不重合的任务包,并将任务包分配给多个线程,来达到提高性能的同时保证线程安全的目的。 但是体现在源码中,可不像说起来这么简单。来,上硬菜:

// tryPresize 会在将链表转换为红黑树的 treeifyBin 中调用
// 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了
// 也就是说,如果当前 ConcurrentHashMap 的数组长度为8,那么 size 就是 16
private final void tryPresize(int size) {
    // c: size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
    // 注意,c 不表示扩容后的数组大小
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;

        // 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2); // 0.75 * n
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        // 这个分支负责处理一些边界情况
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
            // 扩容戳,为一个负值
            // 高16为为扩容标记
            // 低16为为扩容线程数+1,从2开始是因为1已经作为初始化标记
            // 这个函数会在后面详细分析
            int rs = resizeStamp(n);

            // 这个分支表示当前已经开始扩容,用来帮助扩容
            if (sc < 0) {
                Node<K,V>[] nt;
                // 这个分支用来处理边界,只要满足一个条件就直接退出,不帮助扩容
                    // RESIZE_STAMP_SHIFT 表示左移16位,用来比较扩容标记是否相同
                    // 这个条件有问题,不会走到
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || 
                    // 最后一个扩容进程在执行最后一个任务
                    sc == rs + 1 ||
                    // 帮助扩容的线程数达到上线
                    sc == rs + MAX_RESIZERS || 
                    // 新数组还没有初始化完成
                    (nt = nextTable) == null ||
                    // 所有的任务包都已经分发完
                    transferIndex <= 0)
                    break;
                // cas 将 sc+1 表示扩容进程数+1,然后开始帮助扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }

            // 第一个开始扩容的线程,cas 将 sc 赋值为 rs+2 开始,开始扩容
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}

从上面可以看到,具体实现扩容的逻辑都通过调用 transfer 来实现,接下来就来看看 transfer 函数:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    // n 是原数组的长度
    int n = tab.length, stride;

    // 前面说了,扩容的核心思想就分而治之,将数组划分为多个任务包,stride就是每个任务包的大小
    // stride 在单核下直接等于 n,就等于只有一个任务包
    // 多核模式下为 (n>>>3)/NCPU,最小值是 16
    // 将这 n 个任务分为多个任务包,每个任务包有 stride 个任务
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range

    // 如果 nextTab 为 null,先进行一次初始化
    // 前面我们说了,外围会保证第一个发起扩容的线程调用此方法时,参数 nextTab 为 null
    // 之后帮助扩容的线程调用此方法时,nextTab 不会为 null
    if (nextTab == null) {
        try {
            // 容量翻倍
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // nextTable 是 ConcurrentHashMap 中的属性
        nextTable = nextTab;
        // transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置
        transferIndex = n;
    }

    int nextn = nextTab.length;

    // ForwardingNode 翻译过来就是正在被迁移的 Node
    // 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED
    // 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后,
    // 就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了
    // 所以它其实相当于是一个标志。
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);


    // advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了
    // 初始化为 true
    boolean advance = true;
    // 迁移工作是否全部完成
    boolean finishing = false; // to ensure sweep before committing nextTab

    /*
     * 下面这个 for 循环,最难理解的在前面,而要看懂它们,应该先看懂后面的,然后再倒回来看
     * 
     */

    // i 是位置索引,bound 是边界,注意是从后往前
    // 这里可以理解为初始化,并不是要使用的值
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;

        // advance 为 true 表示可以进行下一个位置的迁移了
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;

            // 将 transferIndex 值赋给 nextIndex
            // 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // 第一次会走这个分支,将 transferIndex 赋值为 nextIndex-stride
            // 第一个进程就负责处理 nextIndex-stride 到 nextIndex 这一个任务包
            // 任务包会被从后往前处理
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                // 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前
                bound = nextBound;
                i = nextIndex - 1;
                // 刚开始当前任务肯定没完成
                advance = false;
            }
        }
            // 所有任务完成
        if (i < 0 || 
            // 不应该发生
            i >= n || 
            // 不应该发生
            i + n >= nextn) {
            int sc;
            // 扩容完成
            if (finishing) {
                // 所有的迁移操作已经完成
                nextTable = null;
                // 将新的 nextTab 赋值给 table 属性,完成迁移
                table = nextTab;
                // 重新计算 sizeCtl: n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }

            // 之前我们说过,sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2
            // 然后,每有一个线程参与迁移就会将 sizeCtl 加 1,
            // 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 任务结束,方法退出
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;

                // 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
                // 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 下面几个分支就是进行迁移的代码了,比较简单
        // 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 该位置处是一个 ForwardingNode,代表该位置已经迁移过了
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 头节点的 hash 大于 0,说明是链表的 Node 节点
                    if (fh >= 0) {
                        // 需要将链表一分为二,
                        // 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
                        // lastRun 之前的节点需要进行克隆,然后分到两个链表中
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 其中的一个链表放在新数组的位置 i
                        setTabAt(nextTab, i, ln);
                        // 另一个链表放在新数组的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
                        // 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
                        setTabAt(tab, i, fwd);
                        // advance 设置为 true,代表该位置已经迁移完毕
                        advance = true;
                    }
                    // 红黑树可以不看
                    else if (f instanceof TreeBin) {
                        // 红黑树的迁移
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 如果一分为二后,节点数小于等于6,那么将红黑树转换回链表
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;

                        // 将 ln 放置在新数组的位置 i
                        setTabAt(nextTab, i, ln);
                        // 将 hn 放置在新数组的位置 i+n
                        setTabAt(nextTab, i + n, hn);
                        // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
                        //    其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
                        setTabAt(tab, i, fwd);
                        // advance 设置为 true,代表该位置已经迁移完毕
                        advance = true;
                    }
                }
            }
        }
    }
}

恭喜你,看到这里你已经对于扩容有了相当深入的理解!扩容的代码确实相当的硬核,需要大家有一点耐心,慢慢理解。

答疑

在源码中还有一些用一句两句话很难解释清楚的问题,就在这部分进行更详细的解读

扩容戳有什么用

先来填个前面留下的坑 image.png resizeStamp 会生成一个扩容戳,要理解 要理解扩容戳,必须从二进制的角度来分析。resizeStamp 方法就一句话:

static final int resizeStamp(int n) {
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

其中 RESIZE_STAMP_BITS是一个默认值 16。这里面关键就是 Integer.numberOfLeadingZeros(n) 这个方法,这个方法源码就不贴出来了,实际上这个方法就是做一件事,那就是获取当前数据转成二进制后的最高位的 1 前的 0 的个数。这句话有点拗口,举个例子: 以 16 为准,16 转成二进制是10000,最高非 0 位是在第 5 位,因为 int 类型是 32 位,所以他前面还有 27 位,而且都是 0,那么这个方法得到的结果就是 27(1 的前面还有 27 个 0)。 然后1 << (RESIZE_STAMP_BITS - 1)在当前版本就是1<<15,也就是得到一个二进制数1000 0000 0000 0000,这里也是要做一件事,把这个 1 移动到第 16 位。最后这两个数通过|操作一定得到的结果就是第 16 位是 1,因为 int 是 32 位,最多也就是 32 个 0,而且因为 n 的默认大小是 16(ConcurrentHashMap 默认大小),所以实际上最多也就是 27(11011)个 0,也就是说这个数最高位的 1 也只是在第五位,执行|运算最多也就是影响低 5 位的结果。扩容戳在被使用的时候会先左移 16 位,然后赋值给 sc,所以这边计算出的扩容戳,其实是高 16 位。上面说的高 16 位代表当前扩容的标记,低 16 位代表了扩容的线程数的扩容戳实际上指的是 rs 赋值后的 sizeCtl。

注意:这里之所以要保证第 16 位为 1,是为了保证 sizeCtl 变量为负数,扩容戳在被使用时会先左移 16 位,这样最高位永远为 1 保证 sizeCtl 变量为负数。

首次扩容为什么计数要 +2 而不是 +1

我们上面已经说了扩容戳的作用:

  • 高 16 位代表当前扩容的标记
  • 低 16 位代表了扩容的线程数。

知道了这两个条件就好理解了,因为 rs 最终是要赋值给 sizeCtl 的,而 sizeCtl 负数才代表扩容,而将 rs 左移 16 位就刚好使得最高位为 1,此时低 16 位全部是 0,而因为低 16 位要记录扩容线程数,所以应该 +1,但是这里是 +2,原因是 sizeCtl 中 -1 这个数值已经被使用了,用来代替当前有线程准备扩容,所以如果直接 +1 是会和标志位发生冲突。为了避开标志位,所以初始化第一次记录扩容线程数的时候才需要 +2。

点关注,不迷路

好了,以上就是这篇文章的全部内容了,如果你能看到这里,非常感谢你的支持! 如果你觉得这篇文章写的还不错, 求点赞👍 求关注❤️ 求分享👥 对暖男我来说真的 非常有用!!! 白嫖不好,创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见! 如果本篇博客有任何错误,请批评指教,不胜感激 !

最后推荐我的IM项目DiTinghttps://github.com/danmuking/DiTing-Go),致力于成为一个初学者友好、易于上手的 IM 解决方案,希望能给你的学习、面试带来一点帮助,如果人才你喜欢,给个Star⭐叭!

参考资料

助力面试之ConcurrentHashMap面试灵魂拷问,你能扛多久 JUC集合: ConcurrentHashMap详解