无锁
💒

无锁

问题提出

有如下需求,保证 account.withdraw 取款方法的线程安全
interface Account { // 获取余额 Integer getBalance(); // 取款 void withdraw(Integer amount); /** * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作 * 如果初始余额为 10000 那么正确的结果应当是 0 */ static void demo(Account account) { List<Thread> ts = new ArrayList<>(); for (int i = 0; i < 1000; i++) { ts.add(new Thread(() -> { account.withdraw(10); })); } long start = System.nanoTime(); ts.forEach(Thread::start); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + " cost: " + (end-start)/1000_000 + " ms"); } }
线程不安全的实现
class AccountUnsafe implements Account { private Integer balance; public AccountUnsafe(Integer balance) { this.balance = balance; } @Override public Integer getBalance() { return this.balance; } @Override public void withdraw(Integer amount) { this.balance -= amount; } }
测试
public static void main(String[] args) { Account account = new AccountUnsafe(10000); Account.demo(account); //330 cost: 306 ms 余额不为0 }

为什么不安全

 
withdraw 方法
public void withdraw(Integer amount) { balance -= amount; }
对应的字节码
ALOAD 0 // <- this ALOAD 0 GETFIELD cn/google/AccountUnsafe.balance : Ljava/lang/Integer; // <- this.balance INVOKEVIRTUAL java/lang/Integer.intValue ()I // 拆箱 ALOAD 1 // <- amount INVOKEVIRTUAL java/lang/Integer.intValue ()I // 拆箱 ISUB // 减法 INVOKESTATIC java/lang/Integer.valueOf (I)Ljava/lang/Integer; // 结果装箱 PUTFIELD cn/google/AccountUnsafe.balance : Ljava/lang/Integer; // -> this.balance
 

解决思路-锁

class AccountUnsafe implements Account { private Integer balance; public AccountUnsafe(Integer balance) { this.balance = balance; } @Override public Integer getBalance() { synchronized (this) { return this.balance; } } @Override public void withdraw(Integer amount) { synchronized (this) { this.balance -= amount; } } }
加锁后结果:0 cost: 399 ms

解决思路-无锁

class AccountCas implements Account { private AtomicInteger balance; public AccountCas(int balance) { this.balance = new AtomicInteger(balance); } @Override public Integer getBalance() { return balance.get(); } @Override public void withdraw(Integer amount) { while (true) { // 获取余额的最新值 int prev = balance.get(); // 要修改的余额 int next = prev - amount; // 真正修改 if (balance.compareAndSet(prev, next)) { break; } } balance.getAndAdd(-1 * amount); } }
public static void main(String[] args) { Account account = new AccountCas(10000); Account.demo(account); //0 cost: 75 ms }

CAS

前面看到的 AtomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢
@Override public void withdraw(Integer amount) { while (true) { // 获取余额的最新值 int prev = balance.get(); // 要修改的余额 int next = prev - amount; // 真正修改 if (balance.compareAndSet(prev, next)) { /* compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值 - 不一致了,next 作废,返回 false 表示失败 比如,别的线程已经做了减法,当前值已经被减成了 990 那么本线程的这次 990 就作废了,进入 while 下次循环重试 - 一致,以 next 设置为新值,返回 true 表示成功 */ break; } } }
其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作
CAS算法的过程是这样:它包含3个参数CAS(V,E,N)。V表示要更新的变量,E表示预期值,N表示新值。
仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。最后,CAS返回当前V的真实值。CAS操作是抱着乐观的态度进行的,它总是认为自己可以成功完成操作。
当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS操作即时没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。
notion image

CPU指令对CAS的支持

其实 CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交 换】的原子性。
在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再 开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子 的。
CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。
notion image
@Slf4j public class SlowMotion { public static void main(String[] args) { AtomicInteger balance = new AtomicInteger(10000); int mainPrev = balance.get(); log.info("try get {}", mainPrev); new Thread(() -> { sleep(1000); int prev = balance.get(); balance.compareAndSet(prev, 9000); log.debug(balance.toString()); }, "t1").start(); sleep(2000); log.info("try set 8000..."); boolean isSuccess = balance.compareAndSet(mainPrev, 8000); log.debug("is success ? {}", isSuccess); if (!isSuccess) { mainPrev = balance.get(); log.info("try set 8000..."); isSuccess = balance.compareAndSet(mainPrev, 8000); log.info("is success ? {}", isSuccess); } } private static void sleep(int millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
 
输出结果
14:43:44.828 cn.itcast.test.SlowMotion [main] - try get 10000 14:43:45.878 cn.itcast.test.SlowMotion [t1] - 9000 14:43:46.878 cn.itcast.test.SlowMotion [main] - try set 8000... 14:43:46.878 cn.itcast.test.SlowMotion [main] - is success ? false 14:43:46.878 cn.itcast.test.SlowMotion [main] - try set 8000... 14:43:46.878 cn.itcast.test.SlowMotion [main] - is success ? true
volatile 获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。 它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取 它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。
 
注意 volatile 仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原 子性) CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果
 

无锁效率高的原因

无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时 候,发生上下文切换,进入阻塞。
打个比喻线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火, 等被唤醒又得重新打火、启动、加速... 恢复到高速运行,代价比较大 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑 道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还 是会导致上下文切换

CAS 的特点

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
  1. CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再 重试呗。
  1. synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。 CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

CAS的ABA问题

假设这样一种场景,当第一个线程执行CAS(V,E,U)操作,在获取到当前变量V,准备修改为新值U前,另外两个线程已连续修改了两次变量V的值,使得该值又恢复为旧值,这样的话,我们就无法正确判断这个变量是否已被修改过,如下图
notion image
这就是典型的CAS的ABA问题,一般情况这种情况发现的概率比较小,可能发生了也不会造成什么问题,比如说我们对某个做加减法,不关心数字的过程,那么发生ABA问题也没啥关系。但是在某些情况下还是需要防止的,那么该如何解决呢?在Java中解决ABA问题,我们可以使用以下两个原子类
案例1: ABA 不影响业务
@Slf4j public class TestAbA { static AtomicReference<String> ref = new AtomicReference<>("A"); public static void main(String[] args) throws InterruptedException { log.debug("main start..."); // 获取值 A // 这个共享变量被它线程修改过? String prev = ref.get(); other(); sleep(1); // 尝试改为 C log.debug("change A->C {}", ref.compareAndSet(prev, "C")); } private static void other() { new Thread(() -> { log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B")); }, "t1").start(); sleep(0.5); new Thread(() -> { log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A")); }, "t2").start(); } }
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程 希望: 只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号
 
案例2: ABA 影响业务
 
public class AtomicStampedReferenceDemo { static AtomicReference<Integer> money = new AtomicReference<Integer>(19); public static void main(String[] args) { /*用户消费线程,模拟消费行为 如果很不幸的,用户正好正在进行消费,就在赠予金额到账的同时,他进行了一次消费,使得总金额又小于20元,并且正好累计消费了20元。使得消费、赠予后的金额等于消费前、赠予前的金额。这时,后台的赠予进程就会误以为这个账户还没有赠予,所以,存在被多次赠予的可能 */ new Thread() { public void run() { for(int i=0;i<1000;i++){//模拟1000次消费行为 while(true){ Integer m=money.get(); //判断用户余额并给予赠予金额。如果已经被其他用户处理,那么当前线程就会失败。因此,可以确保用户只会被充值一次。 if(m>10){ System.out.println("大于10元"); if(money.compareAndSet(m, m-10)){ System.out.println("成功消费10元,余额:"+money.get()); break; } }else{ //System.out.println("没有足够的金额"); break; } } } } }.start(); //模拟多个线程同时更新后台数据库,为用户充值 for (int i = 0; i < 30; i++) { new Thread() { public void run() { while (true) { Integer m = money.get(); if (m < 20) { if (money.compareAndSet(m, m + 20)) { System.out.println("余额小于20元,充值成功,余额:" + money.get() + "元"); break; } } else { System.out.println("余额大于20元,无需充值"); break; } } } }.start(); } } }

JDK并发包中的原子操作类(Atomic系列)

CAS在Java中的应用,即并发包中的原子操作类(Atomic系列),从JDK 1.5开始提供了java.util.concurrent.atomic包,在该包中提供了许多基于CAS实现的原子操作类,用法方便,性能高效,主要分以下4种类型。

原子更新基本类型

使用原子的方式更新基本类型,Atomic包提供了以下3个类:
  • AtomicBoolean
  • AtomicInteger
  • AtomicLong
这3个类的实现原理和使用方式几乎是一样的,这里我们以AtomicInteger为例进行分析,AtomicInteger主要是针对int类型的数据执行原子操作,它提供了原子自增方法、原子自减方法以及原子赋值方法等,鉴于AtomicInteger的源码不多,我们直接看源码:
public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // 获取指针类Unsafe private static final Unsafe unsafe = Unsafe.getUnsafe(); //下述变量value在AtomicInteger实例对象内的内存偏移量 private static final long valueOffset; static { try { //通过unsafe类的objectFieldOffset()方法,获取value变量在对象内存中的偏移 //通过该偏移量valueOffset,unsafe类的内部方法可以获取到变量value对其进行取值或赋值操作 valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } //当前AtomicInteger封装的int变量value private volatile int value; public AtomicInteger(int initialValue) { value = initialValue; } public AtomicInteger() { } //获取当前最新值, public final int get() { return value; } //设置当前值,具备volatile效果,方法用final修饰是为了更进一步的保证线程安全。 public final void set(int newValue) { value = newValue; } //最终会设置成newValue,使用该方法后可能导致其他线程在之后的一小段时间内可以获取到旧值,有点类似于延迟加载 public final void lazySet(int newValue) { unsafe.putOrderedInt(this, valueOffset, newValue); } //设置新值并获取旧值,底层调用的是CAS操作即unsafe.compareAndSwapInt()方法 public final int getAndSet(int newValue) { return unsafe.getAndSetInt(this, valueOffset, newValue); } //如果当前值为expect,则设置为update(当前值指的是value变量) public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } //当前值加1返回旧值,底层CAS操作 public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } //当前值减1,返回旧值,底层CAS操作 public final int getAndDecrement() { return unsafe.getAndAddInt(this, valueOffset, -1); } //当前值增加delta,返回旧值,底层CAS操作 public final int getAndAdd(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta); } //当前值加1,返回新值,底层CAS操作 public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } //当前值减1,返回新值,底层CAS操作 public final int decrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, -1) - 1; } //当前值增加delta,返回新值,底层CAS操作 public final int addAndGet(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta) + delta; } //省略一些不常用的方法.... }
通过上述的分析,可以发现AtomicInteger原子类的内部几乎是基于一个Unsafe类中的CAS相关操作的方法实的,这也同时证明AtomicInteger是基于无锁实现的,这里重点分析自增操作实现过程,其他方法自增实现原理一样。
 
我们发现AtomicInteger类中所有自增或自减的方法都间接调用Unsafe类中的getAndAddInt()方法实现了CAS操作,从而保证了线程安全,关于getAndAddInt其实前面已分析过,它是Unsafe类中1.8新增的方法,源码如下
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
可看出getAndAddInt通过一个while循环不断的重试更新要设置的值,直到成功为止,调用的是Unsafe类中的compareAndSwapInt方法,是一个CAS操作方法
 
AtomicInteger示例代码如下:
AtomicInteger i = new AtomicInteger(0); // 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++ System.out.println(i.incrementAndGet()); // 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i System.out.println(i.getAndIncrement()); // 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i System.out.println(i.decrementAndGet()); // 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i-- System.out.println(i.getAndDecrement()); // 获取并加值(i = 0, 结果 i = 5, 返回 0) System.out.println(i.getAndAdd(5)); // 加值并获取(i = 5, 结果 i = 0, 返回 0) System.out.println(i.addAndGet(-5)); // 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0) // 其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.getAndUpdate(p -> p - 2)); // 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0) // 其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.updateAndGet(p -> p + 2)); // 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0) // 其中函数中的操作能保证原子,但函数需要无副作用 // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的 // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final System.out.println(i.getAndAccumulate(10, (p, x) -> p + x)); // 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0) // 其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));

原子更新对象引用Reference

原子更新基本类型的AtomicInteger,只能更新一个变量,如果要原子更新多个变量,就需要使用这个原子更新引用类型提供的类。Atomic包提供了以下3个类:
  • AtomicReference
  • AtomicStampedReference
  • AtomicMarkableReference

AtomicReference

封装的是一个对象的引用, 只要对对象的引用进行修改, 就可以用atomicreference 保证线程安全
示例代码如下:
interface DecimalAccount { // 获取余额 BigDecimal getBalance(); // 取款 void withdraw(BigDecimal amount); /** * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作 * 如果初始余额为 10000 那么正确的结果应当是 0 */ static void demo(DecimalAccount account) { List<Thread> ts = new ArrayList<>(); for (int i = 0; i < 1000; i++) { ts.add(new Thread(() -> { account.withdraw(BigDecimal.TEN); })); } ts.forEach(Thread::start); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(account.getBalance()); } }
 
class DecimalAccountCas implements DecimalAccount { private AtomicReference<BigDecimal> balance; public DecimalAccountCas(BigDecimal balance) { // this.balance = balance; this.balance = new AtomicReference<>(balance); } @Override public BigDecimal getBalance() { return balance.get(); } @Override public void withdraw(BigDecimal amount) { while(true) { BigDecimal prev = balance.get(); BigDecimal next = prev.subtract(amount); if (balance.compareAndSet(prev, next)) { break; } } } }
 

AtomicStampedReference

AtomicStampedReference原子类是一个带有时间戳(版本号)的对象引用,在每次修改后,AtomicStampedReference不仅会设置新值而且还会记录更改的时间。
当AtomicStampedReference设置对象值时,对象值以及时间戳都必须满足期望值才能写入成功,这也就解决了反复读写时,无法预知值是否已被修改的窘境
@Slf4j(topic = "c.Test36") public class Test36 { static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0); public static void main(String[] args) throws InterruptedException { log.debug("main start..."); // 获取值 A String prev = ref.getReference(); // 获取版本号 int stamp = ref.getStamp(); log.debug("版本 {}", stamp); // 如果中间有其它线程干扰,发生了 ABA 现象 other(); sleep(1); // 尝试改为 C log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1)); } private static void other() { new Thread(() -> { log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1)); log.debug("更新版本为 {}", ref.getStamp()); }, "t1").start(); sleep(0.5); new Thread(() -> { log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1)); log.debug("更新版本为 {}", ref.getStamp()); }, "t2").start(); } }
 
AtomicMarkableReference
AtomicMarkableReference与AtomicStampedReference不同的是,AtomicMarkableReference维护的是一个boolean值的标识,也就是说至于true和false两种切换状态,这种方式并不能完全防止ABA问题的发生,只能减少ABA问题发生的概率。
@Slf4j(topic = "c.Test38") public class Test38 { public static void main(String[] args) throws InterruptedException { GarbageBag bag = new GarbageBag("装满了垃圾"); // 参数2 mark 可以看作一个标记,表示垃圾袋满了 AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true); log.debug("start..."); GarbageBag prev = ref.getReference(); log.debug(prev.toString()); new Thread(() -> { log.debug("start..."); bag.setDesc("空垃圾袋"); ref.compareAndSet(bag, bag, true, false); log.debug(bag.toString()); },"保洁阿姨").start(); sleep(1); log.debug("想换一只新垃圾袋?"); boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false); log.debug("换了么?" + success); log.debug(ref.getReference().toString()); } } class GarbageBag { String desc; public GarbageBag(String desc) { this.desc = desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return super.toString() + " " + desc; } }
 

原子更新数组

通过原子的方式更新数组里的某个元素,Atomic包提供了以3类
  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray
AtomicIntegerArray案例演示:
public class Test39 { public static void main(String[] args) { demo( ()->new int[10], (array)->array.length, (array, index) -> array[index]++, array-> System.out.println(Arrays.toString(array)) ); demo( ()-> new AtomicIntegerArray(10), (array) -> array.length(), (array, index) -> array.getAndIncrement(index), array -> System.out.println(array) ); } /** 参数1,提供数组、可以是线程不安全数组或线程安全数组 参数2,获取数组长度的方法 参数3,自增方法,回传 array, index 参数4,打印数组的方法 */ // supplier 提供者 无中生有 ()->结果 // function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果 // consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)-> private static <T> void demo( Supplier<T> arraySupplier, Function<T, Integer> lengthFun, BiConsumer<T, Integer> putConsumer, Consumer<T> printConsumer ) { List<Thread> ts = new ArrayList<>(); T array = arraySupplier.get(); int length = lengthFun.apply(array); for (int i = 0; i < length; i++) { // 每个线程对数组作 10000 次操作 ts.add(new Thread(() -> { for (int j = 0; j < 10000; j++) { putConsumer.accept(array, j%length); } })); } ts.forEach(t -> t.start()); // 启动所有线程 ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); // 等所有线程结束 printConsumer.accept(array); } } //结果 //[8975, 9032, 8981, 8998, 8946, 9046, 9811, 9808, 9765, 9811] //[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

原子更新属性字段

如果需原子地更新某个类里的某个字段时,就需要使用原子更新字段类,Atomic包提供了以下3个类进行原子字段更新。
  • AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
  • AtomicLongFieldUpdater:原子更新长整型字段的更新器。
  • AtomicReferenceFieldUpdater // 域 字段。
要想原子地更新字段类需要两步。
第一步,因为原子更新字段类都是抽象类,每次使用的时候必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
第二步,更新类的字段(属性)必须使用volatile修饰符。
AtomicIntegerFieldUpdater的实例代码如下:
@Slf4j public class Test1 { private volatile int field; public static void main(String[] args) { AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Test1.class, "field"); Test1 test5 = new Test1(); fieldUpdater.compareAndSet(test5, 0, 10); // 修改成功 field = 10 System.out.println(test5.field); // 修改成功 field = 20 fieldUpdater.compareAndSet(test5, 10, 20); System.out.println(test5.field); // 修改失败 field = 20 fieldUpdater.compareAndSet(test5, 10, 30); System.out.println(test5.field); } }

原子累加器

LongAdder

累加器性能比较

public class Test41 { public static void main(String[] args) { for (int i = 0; i < 5; i++) { demo( () -> new AtomicLong(0), (adder) -> adder.getAndIncrement() ); } for (int i = 0; i < 5; i++) { demo( () -> new LongAdder(), adder -> adder.increment() ); } } /* () -> 结果 提供累加器对象 (参数) -> 执行累加操作 */ private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) { T adder = adderSupplier.get(); List<Thread> ts = new ArrayList<>(); // 4 个线程,每人累加 50 万 for (int i = 0; i < 4; i++) { ts.add(new Thread(() -> { for (int j = 0; j < 500000; j++) { action.accept(adder); } })); } long start = System.nanoTime(); ts.forEach(t -> t.start()); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(adder + " cost:" + (end - start) / 1000_000); } } //输出 //2000000 cost:28 //2000000 cost:24 //2000000 cost:17 //2000000 cost:17 //2000000 cost:19 //2000000 cost:6 //2000000 cost:2 //2000000 cost:2 //2000000 cost:2 //2000000 cost:2
性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]... 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

LongAdder源码分析

LongAdder 是并发大师 @author Doug Lea (大哥李)的作品,设计的非常精巧
LongAdder 类有几个关键域
// 累加单元数组, 懒惰初始化 transient volatile Cell[] cells; // 基础值, 如果没有竞争, 则用 cas 累加这个域 transient volatile long base; // 在 cells 创建或扩容时, 置为 1, 表示加锁 transient volatile int cellsBusy;
 
CAS锁的缺陷: 自旋空转浪费CPU性能
@Slf4j(topic = "c.Test42") public class LockCas { // 0 没加锁 // 1 加锁 private AtomicInteger state = new AtomicInteger(0); public void lock() { while (true) { if (state.compareAndSet(0, 1)) { break; } } } public void unlock() { log.debug("unlock..."); state.set(0); } public static void main(String[] args) { LockCas lock = new LockCas(); new Thread(() -> { log.debug("begin..."); lock.lock(); try { log.debug("lock..."); sleep(1); } finally { lock.unlock(); } }).start(); new Thread(() -> { log.debug("begin..."); lock.lock(); try { log.debug("lock..."); } finally { lock.unlock(); } }).start(); } }
16:16:20.601 c.Test42 [Thread-0] - begin... 16:16:20.601 c.Test42 [Thread-1] - begin... 16:16:20.605 c.Test42 [Thread-0] - lock... 16:16:21.610 c.Test42 [Thread-0] - unlock... 16:16:21.610 c.Test42 [Thread-1] - lock... 16:16:21.610 c.Test42 [Thread-1] - unlock...

伪共享

其中 Cell 即为累加单元
// 防止缓存行伪共享 @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值 final boolean cas(long prev, long next) { return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next); } // 省略不重要代码 }
notion image
得从缓存说起,缓存与内存的速度比较
notion image
因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效
notion image
因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象。这样问题来了: Core-0 要修改 Cell[0] Core-1 要修改 Cell[1]
无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1
的缓存行失效
@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效
notion image

add流程

/** * Adds the given value. * * @param x the value to add */ public void add(long x) { // as 为累加单元数组 // b 为基础值 // x 为累加值 Cell[] as; long b, v; int m; Cell a; // 进入 if 的两个条件 // 1. as 有值, 表示已经发生过竞争, 进入 if // 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if if ((as = cells) != null || !casBase(b = base, b + x)) { // uncontended 表示 cell 没有竞争 boolean uncontended = true; if (// as 还没有创建 as == null || (m = as.length - 1) < 0 || // 当前线程对应的 cell 还没有 (a = as[getProbe() & m]) == null || // cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell ) !(uncontended = a.cas(v = a.value, v + x))) // 进入 cell 数组创建、cell 创建的流程 longAccumulate(x, null, uncontended); } }
add 流程图
add 流程图

longAccumulate

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; // 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cell if ((h = getProbe()) == 0) { // 初始化 probe ThreadLocalRandom.current(); // h 对应新的 probe 值, 用来对应 cell h = getProbe(); wasUncontended = true; } // collide 为 true 表示需要扩容 boolean collide = false; for (;;) { Cell[] as; Cell a; int n; long v; // 已经有了 cells if ((as = cells) != null && (n = as.length) > 0) { // 还没有 cell if ((a = as[(n - 1) & h]) == null) { // 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x // 成功则 break, 否则继续 continue 循环 collide = false; } // 有竞争, 改变线程对应的 cell 来重试 cas else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 null else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 cas else if (n >= NCPU || cells != as) collide = false; // At max size or stale // 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了 else if (!collide) collide = true; // 加锁 else if (cellsBusy == 0 && casCellsBusy()) { // 加锁成功, 扩容 collide = false; continue; // Retry with expanded table } // 改变线程对应的 cell h = advanceProbe(h); } // 还没有 cells, 尝试给 cellsBusy 加锁 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell // 成功则 break; } // 上两种情况失败, 尝试给 base 累加 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
longAccumulate流程1
longAccumulate流程1
longAccumulate流程2
longAccumulate流程2
longAccumulate流程3
longAccumulate流程3

sum方法

获取最终结果通过 sum 方法
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }