LongAdder源码分析(longadder原理)
基本介绍
LongAdder
跟AtomicLong
都是用于计数器统计的,AtomicLong
底层通过CAS
操作进行计数,但是在高并发条件下性能比较低。
阿里的开发手册上明确说明:
LongAdder
的继承结构如下:
//LongAdder是Striped64的子类 public class LongAdder extends Striped64 implements Serializable { } 复制代码
Striped64
类中重要的属性如下:
abstract class Striped64 extends Number { /** * Padded variant of AtomicLong supporting only raw accesses plus CAS. * * JVM intrinsics note: It would be possible to use a release-only * form of CAS here, if it were provided. */ @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } } /** Number of CPUS, to place bound on table size */ //表示当前计算机CPU数量,什么用? 控制cells数组长度的一个关键条件 static final int NCPU = Runtime.getRuntime().availableProcessors(); /** * Table of cells. When non-null, size is a power of 2. * cells数组 */ transient volatile Cell[] cells; /** * Base value, used mainly when there is no contention, but also as * a fallback during table initialization races. Updated via CAS. * 没有发生过竞争时,数据会累加到 base上 | 当cells扩容时,需要将数据写到base中 */ transient volatile long base; /** * Spinlock (locked via CAS) used when resizing and/or creating Cells. * 初始化cells或者扩容cells都需要获取锁,0 表示无锁状态,1 表示其他线程已经持有锁了 */ transient volatile int cellsBusy; /** * Package-private default constructor */ Striped64() { } /** * CASes the base field. * 通过修改base中的值 */ final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } /** * CASes the cellsBusy field from 0 to 1 to acquire lock. * 通过CAS方式获取锁,即将CELLSBUSY改成1,表示获取到了锁 */ final boolean casCellsBusy() { return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); } /** * Returns the probe value for the current thread. * Duplicated from ThreadLocalRandom because of packaging restrictions. * * 获取当前线程的Hash值 */ static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); } /** * Pseudo-randomly advances and records the given probe value for the * given thread. * Duplicated from ThreadLocalRandom because of packaging restrictions. * * 重置当前线程的Hash值 */ static final int advanceProbe(int probe) { probe ^= probe << 13; // xorshift probe ^= probe >>> 17; probe ^= probe << 5; UNSAFE.putInt(Thread.currentThread(), PROBE, probe); return probe; } } 复制代码
Cell 是 java.util.concurrent.atomic 下 Striped64 的一个内部类, LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。
即内部有一个base变量,一个Cell[]数组。
base变量:非竞态条件下,直接累加到该变量上
Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中
当计算总值调用sum()方法,sum源码如下:
/** * 将base的值加上cells数组中所有槽位中的值得到总值 */ public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; } 复制代码
LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。
源码分析
入口:longAdder.increment()
public void increment() { add(1L); } 复制代码
接着查看add方法如下:
public void add(long x) { //as 表示cells引用 //b 表示获取的base值 //v 表示 期望值 //m 表示 cells 数组的长度 //a 表示当前线程命中的cell单元格 Cell[] as; long b, v; int m; Cell a; //条件一:true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中 // false->表示cells未初始化,当前所有线程应该将数据写到base中 //条件二:false->表示当前线程cas替换数据成功, // true->表示发生竞争了,可能需要重试 或者 扩容 if ((as = cells) != null || !casBase(b = base, b + x)) { //什么时候会进来? //1.true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中 //2.true->表示发生竞争了,可能需要重试 或者 扩容 //true -> 未竞争 false->发生竞争 boolean uncontended = true; //条件一:true->说明 cells 未初始化,也就是多线程写base发生竞争了 // false->说明 cells 已经初始化了,当前线程应该是 找自己的cell 写值 //条件二:getProbe() 获取当前线程的hash值 m表示cells长度-1 cells长度 一定是2的次方数 15= b1111 // true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持 // false-> 说明当前线程对应的cell 不为空,说明 下一步想要将x值 添加到cell中。 //条件三:true->表示cas失败,意味着当前线程对应的cell 有竞争 // false->表示cas成功 if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) //都有哪些情况会调用? //1.true->说明 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells] //2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持 //3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容] longAccumulate(x, null, uncontended); } } 复制代码
条件递增,逐步解析,如下:
1.最初无竞争时只更新base;
2.如果更新base失败后,首次新建一个Cell[]数组
3.当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[]扩容
longAccumulate
入参说明如下:
只有三种情况会调用longAccumulate
方法
1 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells]
2 当前线程对应下标的cell为空,需要创建 longAccumulate 支持
3 cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]
longAccumulate
方法总纲如下:
上述代码首先给当前线程分配一个hash值,然后进入一个for(;;)自旋,这个自旋分为三个分支:
CASE1:Cell[]数组已经初始化
CASE2:Cell[]数组未初始化(首次新建)
CASE3:Cell[]数组正在初始化中
一开始刚刚要初始化Cell[]数组(首次新建),即未初始化过Cell[]数组,尝试占有锁并首次初始化cells数组。
//CASE2:前置条件cells还未初始化 as 为null //条件一:true 表示当前未加锁 //条件二:cells == as?因为其它线程可能会在你给as赋值之后修改了 cells //条件三:true 表示获取锁成功 会把cellsBusy = 1,false 表示其它线程正在持有这把锁 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table //cells == as? 防止其它线程已经初始化了,当前线程再次初始化 导致丢失数据 if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } 复制代码
如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]表示数组的长度为2,rs[h & 1] = new Cell(x) 表示创建一个新的Cell元素,value是x值,默认为1。h & 1类似于我们之前HashMap常用到的计算散列桶index的算法,通常都是hash & (table.len - 1)。同hashmap一个意思
兜底的else模块,即多个线程尝试CAS修改失败的线程会走到这个分支,如下:
该分支实现直接操作base基数,将值累加到base上,也即其它线程正在初始化,多个线程正在更新base的值。
当cell已经初始化了时,流程代码如下:
//CASE1: 表示cells已经初始化了,当前线程应该将数据写入到对应的cell中 if ((as = cells) != null && (n = as.length) > 0) { //2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持 //3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容] //CASE1.1:true->表示当前线程对应的下标位置的cell为null,需要创建new Cell if ((a = as[(n - 1) & h]) == null) { //true->表示当前锁 未被占用 false->表示锁被占用 if (cellsBusy == 0) { // Try to attach new Cell //拿当前的x创建Cell Cell r = new Cell(x); // Optimistically create //条件一:true->表示当前锁 未被占用 false->表示锁被占用 //条件二:true->表示当前线程获取锁成功 false->当前线程获取锁失败.. if (cellsBusy == 0 && casCellsBusy()) { //是否创建成功 标记 boolean created = false; try { // Recheck under lock //rs 表示当前cells 引用 //m 表示cells长度 //j 表示当前线程命中的下标 Cell[] rs; int m, j; //条件一 条件二 恒成立 //rs[j = (m - 1) & h] == null 为了防止其它线程初始化过 该位置,然后当前线程再次初始化该位置 //导致丢失数据 if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } //扩容意向 强制改为了false collide = false; } // CASE1.2: // wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //CASE 1.3:当前线程rehash过hash值,然后新命中的cell不为空 //true -> 写成功,退出循环 //false -> 表示rehash之后命中的新的cell 也有竞争 重试1次 再重试1次 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; //CASE 1.4: //条件一:n >= NCPU true->扩容意向 改为false,表示不扩容了 false-> 说明cells数组还可以扩容 //条件二:cells != as true->其它线程已经扩容过了,当前线程rehash之后重试即可 else if (n >= NCPU || cells != as) //扩容意向 改为false,表示不扩容了 collide = false; // At max size or stale //CASE 1.5: //!collide = true 设置扩容意向 为true 但是不一定真的发生扩容 else if (!collide) collide = true; //CASE 1.6:真正扩容的逻辑 //条件一:cellsBusy == 0 true->表示当前无锁状态,当前线程可以去竞争这把锁 //条件二:casCellsBusy true->表示当前线程 获取锁 成功,可以执行扩容逻辑 // false->表示当前时刻有其它线程在做扩容相关的操作。 else if (cellsBusy == 0 && casCellsBusy()) { try { //cells == as if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { //释放锁 cellsBusy = 0; } collide = false; continue; // Retry with expanded table } //重置当前线程Hash值,即rehash h = advanceProbe(h); } 复制代码
当cell已经初始化了
1.如果当前线程对应的hash槽位为null时,通过cas创建cell,并将cell赋值,将cell存入到cells数组中
//CASE1.1:true->表示当前线程对应的下标位置的cell为null,需要创建new Cell if ((a = as[(n - 1) & h]) == null) { //true->表示当前锁 未被占用 false->表示锁被占用 if (cellsBusy == 0) { // Try to attach new Cell //拿当前的x创建Cell Cell r = new Cell(x); // Optimistically create //条件一:true->表示当前锁 未被占用 false->表示锁被占用 //条件二:true->表示当前线程获取锁成功 false->当前线程获取锁失败.. if (cellsBusy == 0 && casCellsBusy()) { //是否创建成功 标记 boolean created = false; try { // Recheck under lock //rs 表示当前cells 引用 //m 表示cells长度 //j 表示当前线程命中的下标 Cell[] rs; int m, j; //条件一 条件二 恒成立 //rs[j = (m - 1) & h] == null 为了防止其它线程初始化过 该位置,然后当前线程再次初始化该位置 //导致丢失数据 if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } //扩容意向 强制改为了false collide = false; } 复制代码
2.如果当前线程对应的cells数组中的槽位不为空null,并且已经尝试过cas操作修改值失败,即存在竞争。将
wasUncontended
改为true,接着调用最下面的h = advanceProbe(h);
重置当前线程Hash值,
// wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //重置当前线程Hash值,即rehash h = advanceProbe(h); 复制代码
3.重置当前线程Hash值后,接着再次判断对应的cells数组中的槽位是否为空,如果为空,则将值存入到对应的槽位,如果不为空,则通过cas操作尝试能不能修改槽位的值。如果修改成功,则执行结束
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; 复制代码
4.如果步骤3修改失败的话,就会将扩容意向
collide
的值置为true
else if (!collide) collide = true; 复制代码
5.接着下次还是修改槽位的值不成功的话,最后会执行扩容操作。
else if (cellsBusy == 0 && casCellsBusy()) { try { //cells == as if (cells == as) { // Expand table unless stale //扩容为2倍 Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) //将之前cells数组中的值复制到扩容之后的数组中 rs[i] = as[i]; cells = rs; } } finally { //释放锁 cellsBusy = 0; } collide = false; continue; // Retry with expanded table } 复制代码
完整代码如下:
//都有哪些情况会调用? //1.true->说明 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells] //2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持 //3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容] // wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { //h 表示线程hash值 int h; //条件成立:说明当前线程 还未分配hash值 if ((h = getProbe()) == 0) { //给当前线程分配hash值 ThreadLocalRandom.current(); // force initialization //取出当前线程的hash值 赋值给h h = getProbe(); //为什么? 因为默认情况下 当前线程 肯定是写入到了 cells[0] 位置。 不把它当做一次真正的竞争 wasUncontended = true; } //表示扩容意向 false 一定不会扩容,true 可能会扩容。 boolean collide = false; // True if last slot nonempty //自旋 for (;;) { //as 表示cells引用 //a 表示当前线程命中的cell //n 表示cells数组长度 //v 表示 期望值 Cell[] as; Cell a; int n; long v; //CASE1: 表示cells已经初始化了,当前线程应该将数据写入到对应的cell中 if ((as = cells) != null && (n = as.length) > 0) { //2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持 //3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容] //CASE1.1:true->表示当前线程对应的下标位置的cell为null,需要创建new Cell if ((a = as[(n - 1) & h]) == null) { //true->表示当前锁 未被占用 false->表示锁被占用 if (cellsBusy == 0) { // Try to attach new Cell //拿当前的x创建Cell Cell r = new Cell(x); // Optimistically create //条件一:true->表示当前锁 未被占用 false->表示锁被占用 //条件二:true->表示当前线程获取锁成功 false->当前线程获取锁失败.. if (cellsBusy == 0 && casCellsBusy()) { //是否创建成功 标记 boolean created = false; try { // Recheck under lock //rs 表示当前cells 引用 //m 表示cells长度 //j 表示当前线程命中的下标 Cell[] rs; int m, j; //条件一 条件二 恒成立 //rs[j = (m - 1) & h] == null 为了防止其它线程初始化过 该位置,然后当前线程再次初始化该位置 //导致丢失数据 if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } //扩容意向 强制改为了false collide = false; } // CASE1.2: // wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //CASE 1.3:当前线程rehash过hash值,然后新命中的cell不为空 //true -> 写成功,退出循环 //false -> 表示rehash之后命中的新的cell 也有竞争 重试1次 再重试1次 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; //CASE 1.4: //条件一:n >= NCPU true->扩容意向 改为false,表示不扩容了 false-> 说明cells数组还可以扩容 //条件二:cells != as true->其它线程已经扩容过了,当前线程rehash之后重试即可 else if (n >= NCPU || cells != as) //扩容意向 改为false,表示不扩容了 collide = false; // At max size or stale //CASE 1.5: //!collide = true 设置扩容意向 为true 但是不一定真的发生扩容 else if (!collide) collide = true; //CASE 1.6:真正扩容的逻辑 //条件一:cellsBusy == 0 true->表示当前无锁状态,当前线程可以去竞争这把锁 //条件二:casCellsBusy true->表示当前线程 获取锁 成功,可以执行扩容逻辑 // false->表示当前时刻有其它线程在做扩容相关的操作。 else if (cellsBusy == 0 && casCellsBusy()) { try { //cells == as if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { //释放锁 cellsBusy = 0; } collide = false; continue; // Retry with expanded table } //重置当前线程Hash值,即rehash h = advanceProbe(h); } //CASE2:前置条件cells还未初始化 as 为null //条件一:true 表示当前未加锁 //条件二:cells == as?因为其它线程可能会在你给as赋值之后修改了 cells //条件三:true 表示获取锁成功 会把cellsBusy = 1,false 表示其它线程正在持有这把锁 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table //cells == as? 防止其它线程已经初始化了,当前线程再次初始化 导致丢失数据 if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } //CASE3: //1.当前cellsBusy加锁状态,表示其它线程正在初始化cells,所以当前线程将值累加到base //2.cells被其它线程初始化后,当前线程需要将数据累加到base else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } } 复制代码
总体步骤如下:
小总结
AtomicLong 原理:CAS+自旋
场景:
低并发下的全局计算
AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题。
缺陷: 高并发后性能急剧下降,AtomicLong的自旋会成为瓶颈,N个线程CAS操作修改线程的值,每次只有一个成功过,其它N - 1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。
LongAdder 原理:
CAS+Base+Cell数组分散
空间换时间并分散了热点数据
场景:高并发下的全局计算
缺陷:sum求和后还有计算线程修改结果的话,最后结果不够准确
伪原创工具 SEO网站优化 https://www.237it.com/
作者:努力更文的小白
链接:https://juejin.cn/post/7035493501911433229