阅读 160

LongAdder源码分析(longadder原理)

基本介绍

LongAdderAtomicLong都是用于计数器统计的,AtomicLong底层通过CAS操作进行计数,但是在高并发条件下性能比较低。

阿里的开发手册上明确说明:

image.png

LongAdder的继承结构如下:

image.png

//LongAdder是Striped64的子类 public class LongAdder extends Striped64 implements Serializable { } 复制代码

Striped64类中重要的属性如下:

abstract class Striped64 extends Number {     /**      * Padded variant of AtomicLong supporting only raw accesses plus CAS.      *      * JVM intrinsics note: It would be possible to use a release-only      * form of CAS here, if it were provided.      */     @sun.misc.Contended static final class Cell {         volatile long value;         Cell(long x) { value = x; }         final boolean cas(long cmp, long val) {             return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);         }         // Unsafe mechanics         private static final sun.misc.Unsafe UNSAFE;         private static final long valueOffset;         static {             try {                 UNSAFE = sun.misc.Unsafe.getUnsafe();                 Class<?> ak = Cell.class;                 valueOffset = UNSAFE.objectFieldOffset                         (ak.getDeclaredField("value"));             } catch (Exception e) {                 throw new Error(e);             }         }     }     /** Number of CPUS, to place bound on table size */     //表示当前计算机CPU数量,什么用? 控制cells数组长度的一个关键条件     static final int NCPU = Runtime.getRuntime().availableProcessors();     /**      * Table of cells. When non-null, size is a power of 2.      * cells数组      */     transient volatile Cell[] cells;     /**      * Base value, used mainly when there is no contention, but also as      * a fallback during table initialization races. Updated via CAS.      * 没有发生过竞争时,数据会累加到 base上 | 当cells扩容时,需要将数据写到base中      */     transient volatile long base;     /**      * Spinlock (locked via CAS) used when resizing and/or creating Cells.      * 初始化cells或者扩容cells都需要获取锁,0 表示无锁状态,1 表示其他线程已经持有锁了      */     transient volatile int cellsBusy;     /**      * Package-private default constructor      */     Striped64() {     }     /**      * CASes the base field.      * 通过修改base中的值      */     final boolean casBase(long cmp, long val) {         return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);     }     /**      * CASes the cellsBusy field from 0 to 1 to acquire lock.      * 通过CAS方式获取锁,即将CELLSBUSY改成1,表示获取到了锁      */     final boolean casCellsBusy() {         return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);     }     /**      * Returns the probe value for the current thread.      * Duplicated from ThreadLocalRandom because of packaging restrictions.      *      * 获取当前线程的Hash值      */     static final int getProbe() {         return UNSAFE.getInt(Thread.currentThread(), PROBE);     }     /**      * Pseudo-randomly advances and records the given probe value for the      * given thread.      * Duplicated from ThreadLocalRandom because of packaging restrictions.      *      * 重置当前线程的Hash值      */     static final int advanceProbe(int probe) {         probe ^= probe << 13;   // xorshift         probe ^= probe >>> 17;         probe ^= probe << 5;         UNSAFE.putInt(Thread.currentThread(), PROBE, probe);         return probe;     } } 复制代码

image.png

Cell 是 java.util.concurrent.atomic 下 Striped64 的一个内部类, LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回

sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。

image.png

即内部有一个base变量,一个Cell[]数组。

  • base变量:非竞态条件下,直接累加到该变量上

  • Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中

当计算总值调用sum()方法,sum源码如下:

/**  * 将base的值加上cells数组中所有槽位中的值得到总值  */ public long sum() {     Cell[] as = cells; Cell a;     long sum = base;     if (as != null) {         for (int i = 0; i < as.length; ++i) {             if ((a = as[i]) != null)                 sum += a.value;         }     }     return sum; } 复制代码

image.png

LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。

image.png

源码分析

入口:longAdder.increment()

public void increment() {     add(1L); } 复制代码

接着查看add方法如下:

public void add(long x) {     //as 表示cells引用     //b 表示获取的base值     //v 表示 期望值     //m 表示 cells 数组的长度     //a 表示当前线程命中的cell单元格     Cell[] as; long b, v; int m; Cell a;     //条件一:true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中     //       false->表示cells未初始化,当前所有线程应该将数据写到base中     //条件二:false->表示当前线程cas替换数据成功,     //       true->表示发生竞争了,可能需要重试 或者 扩容     if ((as = cells) != null || !casBase(b = base, b + x)) {         //什么时候会进来?         //1.true->表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中         //2.true->表示发生竞争了,可能需要重试 或者 扩容         //true -> 未竞争  false->发生竞争         boolean uncontended = true;         //条件一:true->说明 cells 未初始化,也就是多线程写base发生竞争了         //       false->说明 cells 已经初始化了,当前线程应该是 找自己的cell 写值         //条件二:getProbe() 获取当前线程的hash值   m表示cells长度-1 cells长度 一定是2的次方数   15= b1111         //       true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持         //       false-> 说明当前线程对应的cell 不为空,说明 下一步想要将x值 添加到cell中。         //条件三:true->表示cas失败,意味着当前线程对应的cell 有竞争         //       false->表示cas成功         if (as == null || (m = as.length - 1) < 0 ||                 (a = as[getProbe() & m]) == null ||                 !(uncontended = a.cas(v = a.value, v + x)))             //都有哪些情况会调用?             //1.true->说明 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells]             //2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持             //3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]             longAccumulate(x, null, uncontended);     } } 复制代码

条件递增,逐步解析,如下:

  • 1.最初无竞争时只更新base;

  • 2.如果更新base失败后,首次新建一个Cell[]数组

  • 3.当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[]扩容

longAccumulate入参说明如下:

image.png

只有三种情况会调用longAccumulate方法

  • 1 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells]

  • 2 当前线程对应下标的cell为空,需要创建 longAccumulate 支持

  • 3 cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]

longAccumulate方法总纲如下:

image.png

上述代码首先给当前线程分配一个hash值,然后进入一个for(;;)自旋,这个自旋分为三个分支:

  • CASE1:Cell[]数组已经初始化

  • CASE2:Cell[]数组未初始化(首次新建)

  • CASE3:Cell[]数组正在初始化中

一开始刚刚要初始化Cell[]数组(首次新建),即未初始化过Cell[]数组,尝试占有锁并首次初始化cells数组。

//CASE2:前置条件cells还未初始化 as 为null //条件一:true 表示当前未加锁 //条件二:cells == as?因为其它线程可能会在你给as赋值之后修改了 cells //条件三:true 表示获取锁成功 会把cellsBusy = 1,false 表示其它线程正在持有这把锁 else if (cellsBusy == 0 && cells == as && casCellsBusy()) {     boolean init = false;     try {                           // Initialize table         //cells == as? 防止其它线程已经初始化了,当前线程再次初始化 导致丢失数据         if (cells == as) {             Cell[] rs = new Cell[2];             rs[h & 1] = new Cell(x);             cells = rs;             init = true;         }     } finally {         cellsBusy = 0;     }     if (init)         break; } 复制代码

image.png

image.png

如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]表示数组的长度为2,rs[h & 1] = new Cell(x) 表示创建一个新的Cell元素,value是x值,默认为1。h & 1类似于我们之前HashMap常用到的计算散列桶index的算法,通常都是hash & (table.len - 1)。同hashmap一个意思

兜底的else模块,即多个线程尝试CAS修改失败的线程会走到这个分支,如下:

image.png 该分支实现直接操作base基数,将值累加到base上,也即其它线程正在初始化,多个线程正在更新base的值。

当cell已经初始化了时,流程代码如下:

//CASE1: 表示cells已经初始化了,当前线程应该将数据写入到对应的cell中 if ((as = cells) != null && (n = as.length) > 0) {     //2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持     //3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]     //CASE1.1:true->表示当前线程对应的下标位置的cell为null,需要创建new Cell     if ((a = as[(n - 1) & h]) == null) {         //true->表示当前锁 未被占用  false->表示锁被占用         if (cellsBusy == 0) {       // Try to attach new Cell             //拿当前的x创建Cell             Cell r = new Cell(x);   // Optimistically create             //条件一:true->表示当前锁 未被占用  false->表示锁被占用             //条件二:true->表示当前线程获取锁成功  false->当前线程获取锁失败..             if (cellsBusy == 0 && casCellsBusy()) {                 //是否创建成功 标记                 boolean created = false;                 try {               // Recheck under lock                     //rs 表示当前cells 引用                     //m 表示cells长度                     //j 表示当前线程命中的下标                     Cell[] rs; int m, j;                     //条件一 条件二 恒成立                     //rs[j = (m - 1) & h] == null 为了防止其它线程初始化过 该位置,然后当前线程再次初始化该位置                     //导致丢失数据                     if ((rs = cells) != null &&                             (m = rs.length) > 0 &&                             rs[j = (m - 1) & h] == null) {                         rs[j] = r;                         created = true;                     }                 } finally {                     cellsBusy = 0;                 }                 if (created)                     break;                 continue;           // Slot is now non-empty             }         }         //扩容意向 强制改为了false         collide = false;     }     // CASE1.2:     // wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false     else if (!wasUncontended)       // CAS already known to fail         wasUncontended = true;      // Continue after rehash         //CASE 1.3:当前线程rehash过hash值,然后新命中的cell不为空         //true -> 写成功,退出循环         //false -> 表示rehash之后命中的新的cell 也有竞争 重试1次 再重试1次     else if (a.cas(v = a.value, ((fn == null) ? v + x :             fn.applyAsLong(v, x))))         break;         //CASE 1.4:         //条件一:n >= NCPU true->扩容意向 改为false,表示不扩容了  false-> 说明cells数组还可以扩容         //条件二:cells != as true->其它线程已经扩容过了,当前线程rehash之后重试即可     else if (n >= NCPU || cells != as)         //扩容意向 改为false,表示不扩容了         collide = false;            // At max size or stale         //CASE 1.5:         //!collide = true 设置扩容意向 为true 但是不一定真的发生扩容     else if (!collide)         collide = true;         //CASE 1.6:真正扩容的逻辑         //条件一:cellsBusy == 0 true->表示当前无锁状态,当前线程可以去竞争这把锁         //条件二:casCellsBusy true->表示当前线程 获取锁 成功,可以执行扩容逻辑         // false->表示当前时刻有其它线程在做扩容相关的操作。     else if (cellsBusy == 0 && casCellsBusy()) {         try {             //cells == as             if (cells == as) {      // Expand table unless stale                 Cell[] rs = new Cell[n << 1];                 for (int i = 0; i < n; ++i)                     rs[i] = as[i];                 cells = rs;             }         } finally {             //释放锁             cellsBusy = 0;         }         collide = false;         continue;                   // Retry with expanded table     }     //重置当前线程Hash值,即rehash     h = advanceProbe(h); } 复制代码

当cell已经初始化了

  • 1.如果当前线程对应的hash槽位为null时,通过cas创建cell,并将cell赋值,将cell存入到cells数组中

//CASE1.1:true->表示当前线程对应的下标位置的cell为null,需要创建new Cell if ((a = as[(n - 1) & h]) == null) {     //true->表示当前锁 未被占用  false->表示锁被占用     if (cellsBusy == 0) {       // Try to attach new Cell         //拿当前的x创建Cell         Cell r = new Cell(x);   // Optimistically create         //条件一:true->表示当前锁 未被占用  false->表示锁被占用         //条件二:true->表示当前线程获取锁成功  false->当前线程获取锁失败..         if (cellsBusy == 0 && casCellsBusy()) {             //是否创建成功 标记             boolean created = false;             try {               // Recheck under lock                 //rs 表示当前cells 引用                 //m 表示cells长度                 //j 表示当前线程命中的下标                 Cell[] rs; int m, j;                 //条件一 条件二 恒成立                 //rs[j = (m - 1) & h] == null 为了防止其它线程初始化过 该位置,然后当前线程再次初始化该位置                 //导致丢失数据                 if ((rs = cells) != null &&                         (m = rs.length) > 0 &&                         rs[j = (m - 1) & h] == null) {                     rs[j] = r;                     created = true;                 }             } finally {                 cellsBusy = 0;             }             if (created)                 break;             continue;           // Slot is now non-empty         }     }     //扩容意向 强制改为了false     collide = false; } 复制代码

  • 2.如果当前线程对应的cells数组中的槽位不为空null,并且已经尝试过cas操作修改值失败,即存在竞争。将wasUncontended改为true,接着调用最下面的h = advanceProbe(h);重置当前线程Hash值,

// wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false else if (!wasUncontended)       // CAS already known to fail     wasUncontended = true;      // Continue after rehash //重置当前线程Hash值,即rehash h = advanceProbe(h); 复制代码

  • 3.重置当前线程Hash值后,接着再次判断对应的cells数组中的槽位是否为空,如果为空,则将值存入到对应的槽位,如果不为空,则通过cas操作尝试能不能修改槽位的值。如果修改成功,则执行结束

else if (a.cas(v = a.value, ((fn == null) ? v + x :         fn.applyAsLong(v, x))))     break; 复制代码

  • 4.如果步骤3修改失败的话,就会将扩容意向collide的值置为true

else if (!collide)     collide = true; 复制代码

  • 5.接着下次还是修改槽位的值不成功的话,最后会执行扩容操作。

else if (cellsBusy == 0 && casCellsBusy()) {     try {         //cells == as         if (cells == as) {      // Expand table unless stale             //扩容为2倍             Cell[] rs = new Cell[n << 1];             for (int i = 0; i < n; ++i)                 //将之前cells数组中的值复制到扩容之后的数组中                 rs[i] = as[i];             cells = rs;         }     } finally {         //释放锁         cellsBusy = 0;     }     collide = false;     continue;                   // Retry with expanded table } 复制代码

完整代码如下:

//都有哪些情况会调用? //1.true->说明 cells 未初始化,也就是多线程写base发生竞争了[重试|初始化cells] //2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持 //3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容] // wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false final void longAccumulate(long x, LongBinaryOperator fn,                           boolean wasUncontended) {     //h 表示线程hash值     int h;     //条件成立:说明当前线程 还未分配hash值     if ((h = getProbe()) == 0) {         //给当前线程分配hash值         ThreadLocalRandom.current(); // force initialization         //取出当前线程的hash值 赋值给h         h = getProbe();         //为什么? 因为默认情况下 当前线程 肯定是写入到了 cells[0] 位置。 不把它当做一次真正的竞争         wasUncontended = true;     }     //表示扩容意向 false 一定不会扩容,true 可能会扩容。     boolean collide = false;                // True if last slot nonempty     //自旋     for (;;) {         //as 表示cells引用         //a 表示当前线程命中的cell         //n 表示cells数组长度         //v 表示 期望值         Cell[] as; Cell a; int n; long v;         //CASE1: 表示cells已经初始化了,当前线程应该将数据写入到对应的cell中         if ((as = cells) != null && (n = as.length) > 0) {             //2.true-> 说明当前线程对应下标的cell为空,需要创建 longAccumulate 支持             //3.true->表示cas失败,意味着当前线程对应的cell 有竞争[重试|扩容]             //CASE1.1:true->表示当前线程对应的下标位置的cell为null,需要创建new Cell             if ((a = as[(n - 1) & h]) == null) {                 //true->表示当前锁 未被占用  false->表示锁被占用                 if (cellsBusy == 0) {       // Try to attach new Cell                     //拿当前的x创建Cell                     Cell r = new Cell(x);   // Optimistically create                     //条件一:true->表示当前锁 未被占用  false->表示锁被占用                     //条件二:true->表示当前线程获取锁成功  false->当前线程获取锁失败..                     if (cellsBusy == 0 && casCellsBusy()) {                         //是否创建成功 标记                         boolean created = false;                         try {               // Recheck under lock                             //rs 表示当前cells 引用                             //m 表示cells长度                             //j 表示当前线程命中的下标                             Cell[] rs; int m, j;                             //条件一 条件二 恒成立                             //rs[j = (m - 1) & h] == null 为了防止其它线程初始化过 该位置,然后当前线程再次初始化该位置                             //导致丢失数据                             if ((rs = cells) != null &&                                     (m = rs.length) > 0 &&                                     rs[j = (m - 1) & h] == null) {                                 rs[j] = r;                                 created = true;                             }                         } finally {                             cellsBusy = 0;                         }                         if (created)                             break;                         continue;           // Slot is now non-empty                     }                 }                 //扩容意向 强制改为了false                 collide = false;             }             // CASE1.2:             // wasUncontended:只有cells初始化之后,并且当前线程 竞争修改失败,才会是false             else if (!wasUncontended)       // CAS already known to fail                 wasUncontended = true;      // Continue after rehash                 //CASE 1.3:当前线程rehash过hash值,然后新命中的cell不为空                 //true -> 写成功,退出循环                 //false -> 表示rehash之后命中的新的cell 也有竞争 重试1次 再重试1次             else if (a.cas(v = a.value, ((fn == null) ? v + x :                     fn.applyAsLong(v, x))))                 break;                 //CASE 1.4:                 //条件一:n >= NCPU true->扩容意向 改为false,表示不扩容了  false-> 说明cells数组还可以扩容                 //条件二:cells != as true->其它线程已经扩容过了,当前线程rehash之后重试即可             else if (n >= NCPU || cells != as)                 //扩容意向 改为false,表示不扩容了                 collide = false;            // At max size or stale                 //CASE 1.5:                 //!collide = true 设置扩容意向 为true 但是不一定真的发生扩容             else if (!collide)                 collide = true;                 //CASE 1.6:真正扩容的逻辑                 //条件一:cellsBusy == 0 true->表示当前无锁状态,当前线程可以去竞争这把锁                 //条件二:casCellsBusy true->表示当前线程 获取锁 成功,可以执行扩容逻辑                 // false->表示当前时刻有其它线程在做扩容相关的操作。             else if (cellsBusy == 0 && casCellsBusy()) {                 try {                     //cells == as                     if (cells == as) {      // Expand table unless stale                         Cell[] rs = new Cell[n << 1];                         for (int i = 0; i < n; ++i)                             rs[i] = as[i];                         cells = rs;                     }                 } finally {                     //释放锁                     cellsBusy = 0;                 }                 collide = false;                 continue;                   // Retry with expanded table             }             //重置当前线程Hash值,即rehash             h = advanceProbe(h);         }         //CASE2:前置条件cells还未初始化 as 为null         //条件一:true 表示当前未加锁         //条件二:cells == as?因为其它线程可能会在你给as赋值之后修改了 cells         //条件三:true 表示获取锁成功 会把cellsBusy = 1,false 表示其它线程正在持有这把锁         else if (cellsBusy == 0 && cells == as && casCellsBusy()) {             boolean init = false;             try {                           // Initialize table                 //cells == as? 防止其它线程已经初始化了,当前线程再次初始化 导致丢失数据                 if (cells == as) {                     Cell[] rs = new Cell[2];                     rs[h & 1] = new Cell(x);                     cells = rs;                     init = true;                 }             } finally {                 cellsBusy = 0;             }             if (init)                 break;         }         //CASE3:         //1.当前cellsBusy加锁状态,表示其它线程正在初始化cells,所以当前线程将值累加到base         //2.cells被其它线程初始化后,当前线程需要将数据累加到base         else if (casBase(v = base, ((fn == null) ? v + x :                 fn.applyAsLong(v, x))))             break;                          // Fall back on using base     } } 复制代码

总体步骤如下:

image.png

小总结

AtomicLong 原理:CAS+自旋

场景:

  • 低并发下的全局计算

  • AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题。

缺陷: 高并发后性能急剧下降,AtomicLong的自旋会成为瓶颈,N个线程CAS操作修改线程的值,每次只有一个成功过,其它N - 1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。

LongAdder 原理:

  • CAS+Base+Cell数组分散

  • 空间换时间并分散了热点数据

场景:高并发下的全局计算

缺陷:sum求和后还有计算线程修改结果的话,最后结果不够准确

 伪原创工具 SEO网站优化  https://www.237it.com/

作者:努力更文的小白
链接:https://juejin.cn/post/7035493501911433229

文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐