LongAdder 你为啥那么迷人?深入源码了解真相

本文章欢迎转载,但是转载请标明出处。程序锋子https://blog.csdn.net/l13591302862/article/details/113178599

阅读时间:20 分钟

一 前言

小伙伴们一般都清楚,并发地对 Long 进行累加操作,我们应该使用原子类型的 AtomicLong,或者使用 LongAdderAtomicLongjdk1.5 中的类,而 LongAdderjdk1.8 中的类,可以简单理解 LongAdderAtomicLong 的升级版本。这篇文章将围绕着 LongAdder,讲解出现原因,并且解析源码。
本文大纲

LongAdder 是什么?

既然想要学习人家,那么总得了解下身高、体重什么的,不扯了,言归正传。

我们来讲下什么是 LongAdder

LongAdder 是一个累加器,和 AtomicLong 相似的功能,用于多线程下,能够并发安全地进行累加操作。初始值为 0L,使用 1 个或者多个变量(指 Cell 数组)来共同维护结果。

看下 LongAdder 的 UML 类图

LongAdder 继承了 Striped64,该父类为其提供了三个变量,basecellscellsBusy,后面源码讲解会讲到。

LongAdder UML类图

LongAdder 的核心方法

public class LongAdder extends Striped64 implements Serializable {
	// 对结果累加 x
	public void add(long x) {...}
	
    // 获取结果,如果当前存在并发更新,结果不准确
    // 如果没有并发更新,那么结果是准确的
	public long sum() {...}
    
    // 结果清零,同样是在非并发更新下,才能保证准确性
    public void reset() {...}
}

三 为何出现 LongAdder

在学习中凡事多问自己几个为什么,既促进自己的思考,又能学习得更加深入。那么我们现在可以思考这几个问题:

  • 之前有 AtomicLong,为什么还出现了 LongAdder?(是出于性能提升的考虑,还是出于安全的考虑,或者其他考虑?)
  • LongAdder 使用了何种设计理念来进行提升?这些设计理念我们如何学习和借鉴并且应用到项目中?
  • 提升的同时,有没有带来什么问题,或做出了什么牺牲?
  • 假设出现了问题,官方是否有提出解决方案?如果官方没有解决,那么我们应该如何解决?思考解决的方案,并且从中选择最优方案?
  • 具体的使用场景?分析其他同种工具类,相同场景下有没有 LongAdder 的替代品?
  • ……

以上是我平时在学习中的简单思考模型,只是简单举例,上述问题留待小伙伴们自行解答。简单归纳其实就是几句话,为什么用它?能学到啥?有没有坑?怎么回避这些坑

那么我们简单讲下 LongAdder 出现的根本原因性能原因

  • 多线程情况下,AtomicLong 是通过对其内部的 volatile 变量 value 进行不断地压榨(CAS 自旋),所有压力都落在一个小小的打工人身上,压力太大,干不过来(即只有一个线程可以 CAS 成功,其他线程需要重新自旋,浪费了 CPU 资源)。

  • 突然有一天,打工人觉得太累了,不干了,必须多招人,不然得干到猴年马月,因此 LongAdder 诞生了,在多线程同时压榨 value 变量(LongAdder 中叫 base)时,如果压榨失败那就招人(创建 Cell 数组,并且在 Cell 上进行 CAS 累加,后面源码分析中会讲到)。即将一个变量拆成多个变量进行存储,用空间换时间,提升了线程并发。人多力量大,所以效率就相对提升了,但是同时也带来了些许问题,老板的成本增加了(多建对象,空间复杂度提升),不容易管理(需要将多个 Cell 累加,最后和 base 相加)。

AtomicLong 和 LongAdder
总结一下就是,Long 非线程安全,所以 AtomicLong 出现了,AtomicLong 不够快,所以 LondAdder 出现了

LongAdderAtomicLong 性能对比

LongAdder 测试代码,1 千万次累加,1 亿次累加,10 亿次累加,20 亿次累加

package longadder;

import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;

/**
 * @author adden
 * @date 2021/1/25
 **/
public class LongAdderDemo {

    private static final ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor(
            4,
            4,
            0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("long-worker-"));

    private static class NamedThreadFactory implements ThreadFactory {

        private final String name;
        public LongAdder accumulator = new LongAdder();

        public NamedThreadFactory(String name) {
            this.name = name;
        }

        @Override
        public Thread newThread(Runnable r) {
            accumulator.increment();
            return new Thread(r, name + accumulator.longValue());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 分别测试,1 千万,1 亿,10 亿,20 亿
        final long[] count = new long[]{1000_0000L, 1_0000_0000L, 10_0000_0000L, 20_0000_0000L};
        // 4 个线程,每个线程负责累加 1/4
        for (int n = 0; n < count.length; n++) {
            final int num = n;
            System.out.println(count[num]);
            int threads = 4;
            final LongAdder adder = new LongAdder();
            CountDownLatch cdl = new CountDownLatch(4);
            long start = System.currentTimeMillis();
            for (int i = 0; i < threads; i++) {
                POOL_EXECUTOR.execute(() -> {
                    for (int j = 0; j < count[num] / threads; j++) {
                        adder.increment();
                    }
                    cdl.countDown();
                });
            }
            cdl.await();
            long cost = System.currentTimeMillis() - start;
            System.out.println("打印结果: count = " + adder.longValue() + ", time = " + cost + " ms");
        }
        POOL_EXECUTOR.shutdown();
        // 以下为输出结果:
        // 1 千万累加 10000000
        // 打印结果: count = 10000000, time = 190 ms
        // 1 亿累加 100000000
        // 打印结果: count = 100000000, time = 955 ms
        // 10 亿累加 1000000000
        // 打印结果: count = 1000000000, time = 7741 ms
        // 20 亿累加 2000000000
        // 打印结果: count = 2000000000, time = 13482 ms
    }

}

AtomicLong 测试代码,1 千万次累加,1 亿次累加,10 亿次累加,20 亿次累加

package longadder;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;

/**
 * @author adden
 * @date 2021/1/25
 **/
public class AtomicLongDemo {

    private static final ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor(
            4,
            4,
            0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("long-worker-"));

    private static class NamedThreadFactory implements ThreadFactory {

        private final String name;
        public LongAdder accumulator = new LongAdder();

        public NamedThreadFactory(String name) {
            this.name = name;
        }

        @Override
        public Thread newThread(Runnable r) {
            accumulator.increment();
            return new Thread(r, name + accumulator.longValue());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 分别测试,1 千万,1 亿,10 亿,20 亿
        final long[] count = new long[]{1000_0000L, 1_0000_0000L, 10_0000_0000L, 20_0000_0000L};
        // 4 个线程,每个线程负责累加 1/4
        for (int n = 0; n < count.length; n++) {
            final int num = n;
            int threads = 4;
            System.out.println(count[num]);
            final AtomicInteger adder = new AtomicInteger();
            CountDownLatch cdl = new CountDownLatch(4);
            long start = System.currentTimeMillis();
            for (int i = 0; i < threads; i++) {
                POOL_EXECUTOR.execute(() -> {
                    for (int j = 0; j < count[num] / threads; j++) {
                        adder.incrementAndGet();
                    }
                    cdl.countDown();
                });
            }
            cdl.await();
            long cost = System.currentTimeMillis() - start;
            System.out.println("打印结果: count = " + adder.longValue() + ", time = " + cost + " ms");
        }
        POOL_EXECUTOR.shutdown();
        // 以下为输出结果:
        // 1 千万累加 10000000
        // 打印结果: count = 10000000, time = 289 ms
        // 1 亿累加 100000000
        // 打印结果: count = 100000000, time = 2046 ms
        // 10 亿累加 1000000000
        // 打印结果: count = 1000000000, time = 23280 ms
        // 20 亿累加 2000000000
        // 打印结果: count = 2000000000, time = 43994 ms
    }

}

测试结果对比(单位:毫秒)

累加次数1 千万1 亿10 亿20 亿
LongAdder1909557,74113,482
AtomicLong2892,04623,28043,994

以上结果使用的是 4 核 8 G 的笔记本电脑进行测试的,可以看出,随着累加次数的提高,两者的性能差异明显加大。

五 源码分析

在进行源码解析前,我们先了解下一些前置知识。

1 伪共享

伪共享,本质是因为缓存的更新和读取方式造成的,缓存一般是以缓存行为单位,一个缓存行的大小为 64 字节,CPU 每次读取某个数据,都会将该数据所在的整个缓存行都进行读取,这里和 MySQL 读取数据类似,MySQL 是每次读取一页数据(默认16 KB)。这样做有利于提高缓存的利用率,但是这也带来了问题。

如果当该缓存行的某处数据发生变化,那么整个缓存行失效,需要重新读取。伪共享,即修改同一缓存行的一处数据导致其他数据的缓存也失效的一种现象,即无法做到真正的共享。那么这带来的问题就是,我们会经常读取缓存,导致效率变低。那么我们思考下,应该如何解决呢?

其实可以通过数据填充的方式,让该数据独占一个缓存行,其他填充数据都不使用。而 jdk1.8 则用 @Contended 注解优雅解决伪共享的问题,也是采取数据填充的方式。小伙伴可以先猜测下,我们为什么讲伪共享?和我们之后要讲的有什么联系?

// 可以参考 https://blog.csdn.net/xiaolong7713/article/details/106087258/ 进行理解
public class ValuePadding {
    // 7 个 long,56 个字节
    protected long p1, p2, p3, p4, p5, p6, p7;
    // 1 个 long,7 个字节
    protected volatile long value = 0L;
    // 8 个 long,64 个字节
    protected long p9, p10, p11, p12, p13, p14,p15;
}

2 核心理念

用空间换时间

LongAdder 使用 Cell 数组来和 base 一起共同维护结果值, 即将一个值拆分成多个值来维护,这样就可以进行线程并发操作,提升效率。而且为了防止伪共享,还为 Cell 类加上了 @Contended 注解,但是同时内存占用就变大了。

3 父类提供的核心变量和方法

核心变量

base 表示基础值,cellsCell 对象数组,Cell 对象里面封装了一个值 valuecellsBusy 表示自旋锁,有 0 和 1 两个值,1 表示加锁,0 表示无锁。LongAdder 的结果值是通过累加 cells 数组的 value,然后再加上 base 得到的。

// LongAdder 的父类,用来为 LongAdder 提供基础变量和方法
abstract class Striped64 extends Number {
     /**
     * Table of cells. When non-null, size is a power of 2.
     */
    // 保存 Cell 对象,其大小必须为 2 的 n 次幂,方便扩容
    // 默认是 null,出现并发修改时才会尝试创建,即延迟加载机制
    transient volatile Cell[] cells;

    /**
     * Base value, used mainly when there is no contention, but also as
     * a fallback during table initialization races. Updated via CAS.
     */
    // 基础值,如果不是多线程并发,一般只更新该值,默认 0L
    transient volatile long base;

    /**
     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
     */
    // 自旋锁(基于 CAS),用来创建 Cell 数组或进行 Cell 数组的扩容时加锁
    // 0 表示无锁,1 表示加锁
    transient volatile int cellsBusy;
}

// 核心内部类,通过 @Contended 注解来防止伪共享
// 关于伪共享可以参考美团的 https://tech.meituan.com/2016/11/18/disruptor.html
@sun.misc.Contended static final class Cell {
    // 用于存储结果
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

核心方法

abstract class Striped64 extends Number {
    
    // cas 方式修改 base
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

    // cas 方式修改 cellsBusy,尝试获取自旋锁
    final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }

	// 用来获取当前线程的 threadLocalRandomProbe 变量值
	// 因为该字段不是公共字段,但是又需要进行获取和修改操作
	// 所以使用反射获取字段,并使用 Unsafe 直接操作内存地址进行修改,比使用反射赋值更高效
	// 每个线程的 threadLocalRandomProbe 值都不同,后面用这个值方便获取 cells 的下标,冲突的概率也较小
    // 可以看下面的 Thread 类的讲解,进行理解
    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
    
    static{
        // ...省略其他...
        PROBE = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomProbe"));
        // ...省略其他...
    }

    // 对当前线程的 threadLocalRandomProbe 进行异或 hash,重新获取 probe 值
    // 目的是为了让当前线程重新定位 cells 数组的下标,区别于上次 cas 失败时的下标,减少冲突
    static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }

    // 用于在多线程并发运行的情况下,对 Cell 数组进行创建和扩容,或者对 Cell 对象进行 cas 操作
    // 参数 x 表示需要累加的值,参数 fn 提供自定义累加操作的函数(函数式编程),LongAdder 中不使用
    // 一般是在 LongAccumulator 中使用,用来自定义累加的公式
    // 参数 wasUncontended,表示是否存在并发竞争
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {...}
}

class Thread{
    // Probe hash value; nonzero if threadLocalRandomSeed initialized
	// ThreadLocalRandom 对象的 hash 值;如果不为 0,表明 threadLocalRandomSeed 被初始化了
    // 下面的 ThreadLocalRandom.current() 方法会进行初始化
    // 不同线程的 threadLocalRandomProbe 肯定不同
    // 如果相同,那么会进行随机调整,返回一个不相同的 threadLocalRandomProbe
    @sun.misc.Contended("tlr")
	int threadLocalRandomProbe;
    // ...省略其他...
}

// 本质是一个 Random 类,使用 ThreadLocal 来进行线程隔离,保证线程安全
// 相当于为每一个线程分配一个 Random 类,就是用来生成随机数的
class ThreadLocalRandom{
    // ThreadLocalRandom 初始化方法,longAccumulate 方法中会调用
    public static ThreadLocalRandom current() {
        if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
            // 进行初始化
            localInit();
        return instance;
    }
    // ...省略其他...
}

4 add 方法解析

java.util.concurrent.atomic.LongAdder#add

该方法用来对结果进行累加,在无明显竞争的情况下(即 Cell 数组为空),先尝试对 base 进行 CAS 修改。具体的解析都在下面的注解中了。

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    // 分支 1,主要是先尝试对 base 进行 cas,如果失败,进入分支 2
    // 该分支包含 3 种情况,以下是每种情况的讲解
    // 情况(1) cells 为空 && caseBase 成功,直接返回
    // 情况(2) cells 为空 && caseBase 失败,表明存在并发修改,进入分支 2
    // 情况(3) cells 不为空,表示已经存在竞争,再对 base 操作很可能失败,因此直接进入分支 2
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // 默认是表示非竞争状态
        boolean uncontended = true;
        // 分支 2,若 as 不为空,尝试根据 getProbe 方法获取当前线程的随机值,
        // 	并且把 getProbe & (as.length - 1) 作为下标(注 1),
        // 	获取 Cell 对象,调用其 cas 方法,如果元素为空,cas 失败都会进入分支 3
        // 该分支包含 5 种情况,以下是每种情况的讲解
        // 情况(1) as 为空,需要创建数组,直接进入分支 3
        // 情况(2) as 不为空 && as.length == 0,此时数组还未初始化完成,进入分支 3
        // 情况(3) as 不为空 && as.length > 0 && as[getProbe() & m] == null,
        // 		   表明定位到的元素为空,需要重新定位,进入分支 3
        // 情况(4) as 不为空 && as.length > 0 && as[getProbe() & m] != null && a.cas 失败,
        // 		   表明 cas 失败,需要重新定位,进入分支 3
        // 情况(5) as 不为空 && as.length > 0 && as[getProbe() & m] != null && a.cas 成功,
        //		   表明 4 个条件全部满足,最后 cas 成功,直接返回
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            // 此处将 cas 方法调用结果赋值给 uncontended,若调用 cas 失败,就返回 false
            // 	正好进入下个分支,然后表示处于竞争状态
            // 如果此处调用成功,不会进入下个分支
            !(uncontended = a.cas(v = a.value, v + x)))
            // 分支 3,如果 as 为空,那么该 longAccumulate 方法会先创建 cells 数组
            // 	如果是定位元素为空,或者 cas 失败,则会先尝试重新定位,不行再创建 Cell 对象
            // 	而且如果多次发生并发修改,会涉及到 cells 数组扩容操作
            longAccumulate(x, null, uncontended);
    }
}

// 以下的内容都是在阐述(注 1)
// 为何是用 probe & as.length - 1,获取下标?
// 因为相当于 probe % as.length,而且位运算效率更高
// 而且 probe 的值是当前线程随机生成的,相当于在数组的下标范围内进行随机选择
// 例如 length 为 16,length - 1 为 15,probe 为 20,那么
// 0001 0100 		(20)
// 0000 1111  & 	(15)
// -----------
// 0000 0100		(4)
// 和 20 % 16 = 4 结果一致

5 longAccumulate 方法解析

java.util.concurrent.atomic.Striped64#longAccumulate

该方法用在较为激烈的并发修改情况,会涉及 Cell 数组创建和扩容操作,通过 cellsBusy 自旋锁来保证原子性。该方法通过自旋 + cas 实现自旋锁,由于方法太长,我们拆开分析,外层有 3 个分支,分支 1 数组非空,分支 2 数组为空,抢锁创建数组,分支 3 抢锁失败尝试对 base 进行 case 操作。(被大神 Doug Lea 缜密的逻辑深深折服)

// 用于在多线程并发运行的情况下,对 Cell 数组进行创建和扩容,或者对 Cell 对象进行 cas 操作
// 参数 x 表示需要累加的值,参数 fn 提供自定义累加操作的函数(函数式编程),LongAdder 中不使用
// 一般是在 LongAccumulator 中使用,用来自定义累加的公式
// 参数 wasUncontended,表示是否存在并发竞争
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    // 给每个线程的 ThreadLocalRandom 进行初始化,下面的逻辑每个线程只会运行一次
    // 当前线程在未调用 ThreadLocalRandom.current() 进行初始化时,getProbe() 会返回 0
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        // 重新获取 probe 值,此时不会返回 0
        h = getProbe();
        // 初次调用,默认认为是无竞争
        wasUncontended = true;
    }
    // 表明并发
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // 分支 1:判断数组是否被创建,若否,进入分支 2 创建数组
        if ((as = cells) != null && (n = as.length) > 0) {
            // ...省略代码...
            // 该分支主要逻辑:
            // 首先尝试定位元素,为 null,尝试创建 Cell 元素
            // 不为 null,尝试进行 cas 操作,若 cas 操作失败,且并发修改频繁,会考虑扩容
            // 但是最大容量必须小于 CPU 核心数,双倍扩容
        }
        // 分支 2:此时数组是空的,以下需要尝试获取自旋锁,然后创建数组
        // 	在当前无锁,同时 cells 未被修改的情况下,才能尝试进行抢锁
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // ...省略代码...
            // 该分支主要逻辑:
            // 进行数组创建,默认容量为 2,因为是 2 的倍数
        }
        // 分支 3:表明获取自旋锁失败,此处尝试对 base 进行 cas 操作
        //  如果 cas 成功,那么直接返回,失败则重新进行自旋
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    // 函数接口,提供自定义计算公式,
                                    // LongAdder 为 null,一般用在 LongAccumulator 中
                                    fn.applyAsLong(v, x))))
            // 成功,退出循环
            break;                          // Fall back on using base
    }
}

分支 1 分析,数组非空时的情况分析

// 分支 1:判断数组是否被创建,若否,进入分支 2 创建数组
if ((as = cells) != null && (n = as.length) > 0) {
    // 分支 1.1:重新获取元素,为空则尝试抢锁并创建
    if ((a = as[(n - 1) & h]) == null) {
        // 判断是否无锁
        if (cellsBusy == 0) {       // Try to attach new Cell
            // 提前创建 Cell 对象,空间换时间
            Cell r = new Cell(x);   // Optimistically create
            // 再次判断是否无锁,并尝试获取锁
            if (cellsBusy == 0 && casCellsBusy()) {
                boolean created = false;
                try {               // Recheck under lock
                    Cell[] rs; int m, j;
                    // 需要满足多个条件,才能将之前创建的元素赋值给数组
                    // 条件1:数组非空
                    // 条件2:数组容量 > 0
                    // 条件3:定位到的元素为空
                    if ((rs = cells) != null &&
                        (m = rs.length) > 0 &&
                        rs[j = (m - 1) & h] == null) {
                        rs[j] = r;
                        created = true;
                    }
                } finally {
                    // 进行解锁
                    cellsBusy = 0;
                }
                // 创建成功,退出循环
                if (created)
                    break;
                // 创建失败,已被其他线程创建
                // 进行自旋,此次元素非空,可以获取
                continue;           // Slot is now non-empty
            }
        }
        collide = false;
    }
    // 分支 1.2:在之前的 add 方法中,调用 Cell 的 cas 方法失败,而且当前线程并非首次进入该方法,
    //  之后会进入分支 1.7,通过 xorshift 异或方式重新计算 probe,继续自旋,重新定位元素
    // 	这里和 collide 不同,collide 表明出现多次严重竞争,是扩容的预备标志
    else if (!wasUncontended)       // CAS already known to fail
        // 重设为非竞争,相当于重新给了一次机会,继续自旋
        wasUncontended = true;      // Continue after rehash
    // 分支 1.3
    else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                 fn.applyAsLong(v, x))))
        break;
    // 分支 1.4:如果数组长度大于等于 CPU 核心数,那么即使有并发修改也不会再扩容
    //	或者当前的数组被修改,也不会进行扩容,
    // 	即在并发严重时,当前线程可能会在分支 1.4 和分支 1.5 之间来回切换
    else if (n >= NCPU || cells != as)
        collide = false;            // At max size or stale
    // 分支 1.5:表明并发修改了同一数组的同一元素,且满足数组长度小于 CPU 核心数,会到该分支
    //	该分支是进行扩容的预备和缓冲,如果下次自旋,当前线程还是 cas 失败,那么就会进行扩容
    else if (!collide)
        collide = true;
    // 分支 1.6:该分支主要进行数组的扩容,
    // 	collide 为 true 时,表明并发修改数组同一元素,且满足扩容要求时,会来到该分支进行扩容
    else if (cellsBusy == 0 && casCellsBusy()) {
        try {
            // 再次判断数组是否发生变动
            if (cells == as) {      // Expand table unless stale
                // 双倍扩容
                Cell[] rs = new Cell[n << 1];
                // 进行数据迁移
                for (int i = 0; i < n; ++i)
                    rs[i] = as[i];
                cells = rs;
            }
        } finally {
            // 进行解锁
            cellsBusy = 0;
        }
        collide = false;
        // 继续自旋,然后在扩容后的数组上重新获取元素
        continue;                   // Retry with expanded table
    }
    // 分支 1.7:以上的抢锁均失败,因为只要抢锁成功都会 continue 或者 break,
    // 	为了能够重新定位到一个新的下标,重新计算 probe 值,重新自旋
    h = advanceProbe(h);
}

分支 2 分析,进行数组创建,初始容量为 2

// 分支 2:此时数组是空的,以下需要尝试获取自旋锁,然后创建数组
// 	在当前无锁,同时 cells 未被修改的情况下,才能尝试进行抢锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    boolean init = false;
    try {                           // Initialize table
        // 再次判断数组是否被创建
        if (cells == as) {
            Cell[] rs = new Cell[2];
            rs[h & 1] = new Cell(x);
            cells = rs;
            init = true;
        }
    } finally {
        // 进行解锁
        cellsBusy = 0;
    }
    // 创建数组完成,跳出循环
    if (init)
        break;
}

为了让大家能够理解整个流程,我特意画了流程图,花了很多时间,里面省略了部分细节,否则太乱
核心流程图

注意事项

  • LongAdderequals 方法没有重写,比较的是引用,不是数值,因为数值在并发情况下易变。
  • LongAdder 在高并发下,性能优于 AtomicLong,但是内存消耗变多,因此在并发量不大的情况下,建议使用 AtomicLong
  • LongAdder 的初始值为 0L,而且无法设置,且只能对结果进行加减,无法进行自定义运算。如果希望能设置初始值,并且可以按照自定义公式进行计算,可以使用 LongAccumulator,和 LongAdder 设计理念一致,但是可以自定义计算和设置初始值。

参考文章

如果有兴趣可以微信搜一搜程序锋子,关注本人的微信公众号
微信公众号


版权声明:本文为l13591302862原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。