共享模型之无锁 John Doe 2022-09-23 2025-07-26 共享模型之无锁
CAS 与 volatile
原子整数
原子引用
原子累加器
ThreadLocal
Unsafe
1. 问题提出 有如下需求,保证 account.withdraw 取款方法的线程安全
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 interface Account { Integer getBalance () ; void withdraw (Integer amount) ; static void demo (Account account) { List<Thread> ts = new ArrayList <>(); for (int i = 0 ; i < 1000 ; i++) { ts.add(new Thread (() -> { account.withdraw(10 ); })); } long start = System.nanoTime(); ts.forEach(Thread::start); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + " cost: " + (end-start)/1000_000 + " ms" ); } }
原有实现并不是线程安全的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class AccountUnsafe implements Account { private Integer balance; public AccountUnsafe (Integer balance) { this .balance = balance; } @Override public Integer getBalance () { return balance; } @Override public void withdraw (Integer amount) { balance -= amount; } }
测试
1 2 3 public static void main (String[] args) { Account.demo(new AccountUnsafe (10000 )); }
1. 安全分析 withdraw 方法
1 2 3 public void withdraw (Integer amount) { balance -= amount; }
对应的字节码
1 2 3 4 5 6 7 8 9 ALOAD 0 ALOAD 0 GETFIELD cn/itcast/AccountUnsafe.balance : Ljava/lang/Integer; INVOKEVIRTUAL java/lang/Integer.intValue ()I ALOAD 1 INVOKEVIRTUAL java/lang/Integer.intValue ()I ISUB INVOKESTATIC java/lang/Integer.valueOf (I)Ljava/lang/Integer; PUTFIELD cn/itcast/AccountUnsafe.balance : Ljava/lang/Integer;
多线程执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ALOAD 0 ALOAD 0 GETFIELD cn/itcast/AccountUnsafe.balance INVOKEVIRTUAL java/lang/Integer.intValue ALOAD 1 INVOKEVIRTUAL java/lang/Integer.intValue ISUB INVOKESTATIC java/lang/Integer.valueOf PUTFIELD cn/itcast/AccountUnsafe.balance ALOAD 0 ALOAD 0 GETFIELD cn/itcast/AccountUnsafe.balance INVOKEVIRTUAL java/lang/Integer.intValue ALOAD 1 INVOKEVIRTUAL java/lang/Integer.intValue ISUB INVOKESTATIC java/lang/Integer.valueOf PUTFIELD cn/itcast/AccountUnsafe.balance
2. 解决思路-锁 首先想到的是给 Account 对象加锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class AccountUnsafe implements Account { private Integer balance; public AccountUnsafe (Integer balance) { this .balance = balance; } @Override public synchronized Integer getBalance () { return balance; } @Override public synchronized void withdraw (Integer amount) { balance -= amount; } }
3. 解决思路-无锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class AccountSafe implements Account { private AtomicInteger balance; public AccountSafe (Integer balance) { this .balance = new AtomicInteger (balance); } @Override public Integer getBalance () { return balance.get(); } @Override public void withdraw (Integer amount) { while (true ) { int prev = balance.get(); int next = prev - amount; if (balance.compareAndSet(prev, next)) { break ; } } } }
执行测试代码
1 2 3 public static void main (String[] args) { Account.demo(new AccountSafe (10000 )); }
2. CAS 与 volatile 前面看到的 AtomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void withdraw (Integer amount) { while (true ) { while (true ) { int prev = balance.get(); int next = prev - amount; if (balance.compareAndSet(prev, next)) { break ; } } } }
其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。
注意
其实 CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交 换】的原子性。
在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。
1. 慢动作分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf4j public class SlowMotion { public static void main (String[] args) { AtomicInteger balance = new AtomicInteger (10000 ); int mainPrev = balance.get(); log.debug("try get {}" , mainPrev); new Thread (() -> { sleep(1000 ); int prev = balance.get(); balance.compareAndSet(prev, 9000 ); log.debug(balance.toString()); }, "t1" ).start(); sleep(2000 ); log.debug("try set 8000..." ); boolean isSuccess = balance.compareAndSet(mainPrev, 8000 ); log.debug("is success ? {}" , isSuccess); if (!isSuccess){ mainPrev = balance.get(); log.debug("try set 8000..." ); isSuccess = balance.compareAndSet(mainPrev, 8000 ); log.debug("is success ? {}" , isSuccess); } } private static void sleep (int millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
1 2 3 4 5 6 7 # 结果 2019-10-13 11:28:37.134 [main] try get 10000 2019-10-13 11:28:38.154 [t1] 9000 2019-10-13 11:28:39.154 [main] try set 8000... 2019-10-13 11:28:39.154 [main] is success ? false 2019-10-13 11:28:39.154 [main] try set 8000... 2019-10-13 11:28:39.154 [main] is success ? true
2. volatile 获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。
它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取 它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。
注意
volatile 仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原 子性)
CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果
3. 无锁效率高
无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时 候,发生上下文切换,进入阻塞。打个比喻
线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火, 等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大
线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火, 等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大
4. CAS 的特点 结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
CAS 是基于乐观锁 的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再 重试呗。
synchronized 是基于悲观锁 的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想 改,我改完了解开锁,你们才有机会。
CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响
3. 原子整数 J.U.C 并发包提供了:
AtomicBoolean
AtomicInteger
AtomicLong
以 AtomicInteger 为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 AtomicInteger i = new AtomicInteger (0 );System.out.println(i.getAndIncrement()); System.out.println(i.incrementAndGet()); System.out.println(i.decrementAndGet()); System.out.println(i.getAndDecrement()); System.out.println(i.getAndAdd(5 )); System.out.println(i.addAndGet(-5 )); System.out.println(i.getAndUpdate(p -> p - 2 )); System.out.println(i.updateAndGet(p -> p + 2 )); System.out.println(i.getAndAccumulate(10 , (p, x) -> p + x)); System.out.println(i.accumulateAndGet(-10 , (p, x) -> p + x))
4. 原子引用 为什么需要原子引用类型?
AtomicReference
AtomicMarkableReference
AtomicStampedReference
有如下方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public interface DecimalAccount { BigDecimal getBalance () ; void withdraw (BigDecimal amount) ; static void demo (DecimalAccount account) { List<Thread> ts = new ArrayList <>(); for (int i = 0 ; i < 1000 ; i++) { ts.add(new Thread (() -> { account.withdraw(BigDecimal.TEN); })); } ts.forEach(Thread::start); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(account.getBalance()); } }
试着提供不同的 DecimalAccount 实现,实现安全的取款操作
1. 非安全 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class DecimalAccountUnsafe implements DecimalAccount { BigDecimal balance; public DecimalAccountUnsafe (BigDecimal balance) { this .balance = balance; } @Override public BigDecimal getBalance () { return balance; } @Override public void withdraw (BigDecimal amount) { BigDecimal balance = this .getBalance(); this .balance = balance.subtract(amount); } }
2. 安全-锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class DecimalAccountSafeLock implements DecimalAccount { private final Object lock = new Object (); BigDecimal balance; public DecimalAccountSafeLock (BigDecimal balance) { this .balance = balance; } @Override public BigDecimal getBalance () { return balance; } @Override public void withdraw (BigDecimal amount) { synchronized (lock) { BigDecimal balance = this .getBalance(); this .balance = balance.subtract(amount); } } }
3. 安全-CAS 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class DecimalAccountSafeCas implements DecimalAccount { AtomicReference<BigDecimal> ref; public DecimalAccountSafeCas (BigDecimal balance) { ref = new AtomicReference <>(balance); } @Override public BigDecimal getBalance () { return ref.get(); } @Override public void withdraw (BigDecimal amount) { while (true ) { BigDecimal prev = ref.get(); BigDecimal next = prev.subtract(amount); if (ref.compareAndSet(prev, next)) { break ; } } } }
用法与 AtomicInteger 类似
4. ABA 🚀 1. ABA 问题 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 static AtomicReference<String> ref = new AtomicReference <>("A" );public static void main (String[] args) throws InterruptedException { log.debug("main start..." ); String prev = ref.get(); other(); sleep(1 ); log.debug("change A->C {}" , ref.compareAndSet(prev, "C" )); } private static void other () { new Thread (() -> { log.debug("change A->B {}" , ref.compareAndSet(ref.get(), "B" )); }, "t1" ).start(); sleep(0.5 ); new Thread (() -> { log.debug("change B->A {}" , ref.compareAndSet(ref.get(), "A" )); }, "t2" ).start(); }
1 2 3 4 5 # 结果 11:29:52.325 c.Test36 [main] - main start... 11:29:52.379 c.Test36 [t1] - change A->B true 11:29:52.879 c.Test36 [t2] - change B->A true 11:29:53.880 c.Test36 [main] - change A->C true
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程 希望:
只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号
2. AtomicStampedReference 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf4j(topic = "c.Test36") public class Test36 { static AtomicStampedReference<String> ref = new AtomicStampedReference <>("A" , 0 ); public static void main (String[] args) throws InterruptedException { log.debug("main start..." ); String prev = ref.getReference(); int stamp = ref.getStamp(); log.debug("版本 {}" , stamp); other(); sleep(1 ); log.debug("change A->C {}" , ref.compareAndSet(prev, "C" , stamp, stamp + 1 )); } private static void other () { new Thread (() -> { log.debug("change A->B {}" , ref.compareAndSet(ref.getReference(), "B" , ref.getStamp(), ref.getStamp() + 1 )); log.debug("更新版本为 {}" , ref.getStamp()); }, "t1" ).start(); sleep(0.5 ); new Thread (() -> { log.debug("change B->A {}" , ref.compareAndSet(ref.getReference(), "A" , ref.getStamp(), ref.getStamp() + 1 )); log.debug("更新版本为 {}" , ref.getStamp()); }, "t2" ).start(); } }
1 2 3 4 5 6 7 8 # 结果 10:45:22.059 c.Test36 [main] - main start... 10:45:22.071 c.Test36 [main] - 版本 0 10:45:22.195 c.Test36 [t1] - change A->B true 10:45:22.195 c.Test36 [t1] - 更新版本为 1 10:45:22.728 c.Test36 [t2] - change B->A true 10:45:22.728 c.Test36 [t2] - 更新版本为 2 10:45:23.725 c.Test36 [main] - change A->C false
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A -> C ,通过 AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
3. AtomicMarkableReference 但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过 ,所以就有了 AtomicMarkableReference
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Slf4j(topic = "c.Test38") public class Test38 { public static void main (String[] args) throws InterruptedException { GarbageBag bag = new GarbageBag ("装满了垃圾" ); AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference <>(bag, true ); log.debug("start..." ); GarbageBag prev = ref.getReference(); log.debug(prev.toString()); new Thread (() -> { log.debug("start..." ); bag.setDesc("空垃圾袋" ); ref.compareAndSet(bag, bag, true , false ); log.debug(bag.toString()); },"保洁阿姨" ).start(); sleep(1 ); log.debug("想换一只新垃圾袋?" ); boolean success = ref.compareAndSet(prev, new GarbageBag ("空垃圾袋" ), true , false ); log.debug("换了么?" + success); log.debug(ref.getReference().toString()); } } class GarbageBag { String desc; public GarbageBag (String desc) { this .desc = desc; } public void setDesc (String desc) { this .desc = desc; } @Override public String toString () { return super .toString() + " " + desc; } }
5. 原子数组
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private static <T> void demo ( Supplier<T> arraySupplier, Function<T, Integer> lengthFun, BiConsumer<T, Integer> putConsumer, Consumer<T> printConsumer ) { List<Thread> ts = new ArrayList <>(); T array = arraySupplier.get(); int length = lengthFun.apply(array); for (int i = 0 ; i < length; i++) { ts.add(new Thread (() -> { for (int j = 0 ; j < 10000 ; j++) { putConsumer.accept(array, j%length); } })); } ts.forEach(t -> t.start()); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); printConsumer.accept(array); }
1. 不安全数组 1 2 3 4 5 6 7 8 public static void main (String[] args) { demo( ()->new int [10 ], array -> array.length, (array, index) -> array[index]++, array -> System.out.println(Arrays.toString(array)) ); }
1 2 # 结果 [9965, 9977, 9974, 9970, 9965, 9978, 9977, 9977, 9977, 9975]
2. 安全数组 1 2 3 4 5 6 7 8 public static void main (String[] args) { demo( ()-> new AtomicIntegerArray (10 ), (array) -> array.length(), (array, index) -> array.getAndIncrement(index), array -> System.out.println(array) ); }
1 2 # 结果 [10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
6. 字段更新器
AtomicReferenceFieldUpdater
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常
1 Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class Test40 { public static void main (String[] args) { Student stu = new Student (); AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name" ); System.out.println(updater.compareAndSet(stu, null , "张三" )); System.out.println(stu); } } class Student { volatile String name; @Override public String toString () { return "Student{" + "name='" + name + '\'' + '}' ; } }
7. 原子累加器 1. 累加器性能比较 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private static <T> void demo (Supplier<T> adderSupplier, Consumer<T> action) { T adder = adderSupplier.get(); List<Thread> ts = new ArrayList <>(); for (int i = 0 ; i < 4 ; i++) { ts.add(new Thread (() -> { for (int j = 0 ; j < 500000 ; j++) { action.accept(adder); } })); } long start = System.nanoTime(); ts.forEach(t -> t.start()); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(adder + " cost:" + (end - start) / 1000_000 ); }
比较 AtomicLong 与 LongAdder
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) { for (int i = 0 ; i < 5 ; i++) { demo( () -> new LongAdder (), adder -> adder.increment() ); } for (int i = 0 ; i < 5 ; i++) { demo( () -> new AtomicLong (0 ), (adder) -> adder.getAndIncrement() ); } }
1 2 3 4 5 6 7 8 9 10 11 12 # 结果 2000000 cost:38 2000000 cost:29 2000000 cost:17 2000000 cost:15 2000000 cost:16 2000000 cost:37 2000000 cost:34 2000000 cost:25 2000000 cost:29 2000000 cost:33
性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性 能。
2. LongAdder源码 LongAdder 是并发大师 @author Doug Lea (大哥李)的作品,设计的非常精巧
LongAdder 类有几个关键域
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static final int NCPU = Runtime.getRuntime().availableProcessors();transient volatile Cell[] cells;transient volatile long base;transient volatile int cellsBusy;
cas 锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class LockCas { private AtomicInteger state = new AtomicInteger (0 ); public void lock () { while (true ) { if (state.compareAndSet(0 , 1 )) { break ; } } } public void unlock () { log.debug("unlock..." ); state.set(0 ); } public static void main (String[] args) { LockCas lock = new LockCas (); new Thread (() -> { log.debug("begin..." ); lock.lock(); try { log.debug("lock..." ); sleep(1 ); } finally { lock.unlock(); } }).start(); new Thread (() -> { log.debug("begin..." ); lock.lock(); try { log.debug("lock..." ); } finally { lock.unlock(); } }).start(); } }
1 2 3 4 5 6 7 # 结果 11:56:42.532 c.Test42 [Thread-0] - begin... 11:56:42.529 c.Test42 [Thread-1] - begin... 11:56:42.536 c.Test42 [Thread-0] - lock... 11:56:43.540 c.Test42 [Thread-0] - unlock... 11:56:43.540 c.Test42 [Thread-1] - lock... 11:56:43.540 c.Test42 [Thread-1] - unlock...
3. 伪共享原理 Cell 即为累加单元
1 2 3 4 5 6 7 8 9 10 11 12 @sun .misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas (long prev, long next) { return UNSAFE.compareAndSwapLong(this , valueOffset, prev, next); } }
1. 内存与缓存 缓存与内存的速度比较
从 cpu 到
大约需要的时钟周期
寄存器
1 cycle (4GHz 的 CPU 约为0.25ns)
L1
3~4 cycle
L2
10~20 cycle
L3
40~45 cycle
内存
120~240 cycle
因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)
缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中
CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效
因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因 此缓存行可以存下 2 个的 Cell 对象。这样问题来了:
Core-0 要修改 Cell[0]
Core-1 要修改 Cell[1]
无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效
@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的 padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效
2. Striped64
base:类似于AtomicLong中全局的value值。在没有竞争情况下数据直接累加到base上,或者cells扩容时,也需要将数据写入到base上
collide:表示扩容意向,false一定不会扩容,true可能会扩容。
cellsBusy:初始化cells或者扩容cells需要获取锁,0:表示无锁状态;1:表示其他线程已经持有了锁
casCellsBusy():通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true
NCPU:当前计算机CPU数量,CelI数组扩容时会使用到
getProbe():获取当前线程的hash值
advanceProbe():重置当前线程的hash值
2. add 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public void add (long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true ; if ( as == null || (m = as.length - 1 ) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x)) ) { longAccumulate(x, null , uncontended); } } }
add 流程图
3. longAccumulate 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 final void longAccumulate (long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0 ) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true ; } boolean collide = false ; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { } else if (!wasUncontended) wasUncontended = true ; else if (a.cas(v = a.value, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; else if (n >= NCPU || cells != as) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && casCellsBusy()) { continue ; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { } else if (casBase(v = base, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; } }
longAccumulate 流程图
每个线程刚进入 longAccumulate 时,会尝试对应一个 cell 对象(找到一个坑位)
4. sum 获取最终结果通过 sum 方法
1 2 3 4 5 6 7 8 9 10 11 public long sum () { Cell[] as = cells; Cell a; long sum = base; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
8. ThreadLocal 1. 简介 ThreadLocal提供线程局部变量。这些变量与正常的变量不同,因为每一个线程在访问ThreadLocal实例的时候(通过其get或set方法)都有自己的、独立初始化的变量副本。ThreadLocal实例通常是类中的私有静态字段,使用它的目的是希望将状态(例如,用户ID或事务ID)与线程关联起来。
实现每一个线程都有自己专属的本地变量副本(自己用自己的变量不麻烦别人,不和其他人共享,人人有份,人各一份),主要解决了让每个线程绑定自己的值,通过使用get()和set()方法,获取默认值或将其值更改为当前线程所存的副本的值从而避免了线程安全问题。
1. API
变量和类型
方法
描述
T
get()
返回当前线程的此线程局部变量副本中的值
protected T
initialvalue()
返回此线程局部变量的当前线程的初始值
void
remove()
删除此线程局部变量的当前线程值
void
set(T value)
将此线程局部变量的当前线程副本设置为指定值
static <S> ThreadLocal<S>
withInitial(Supplier<? extends S> supplier)
创建一个线程局部变量
2. 应用案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 /资源类 class House { int saleCount = 0 ; public synchronized void saleHouse () { ++saleCount; } ThreadLocal<Integer> saleVolume = ThreadLocal.withInitial(() -> 0 ); public void saleVolumeByThreadLocal () { saleVolume.set(1 + saleVolume.get()); } } public class ThreadLocalDemo { public static void main (String[] args) throws InterruptedException { House house = new House (); for (int i = 1 ; i <= 5 ; i++) { new Thread (() -> { int size = new Random ().nextInt(5 ) + 1 ; try { for (int j = 1 ; j <= size; j++) { house.saleHouse(); house.saleVolumeByThreadLocal(); } System.out.println(Thread.currentThread().getName() + "\t" + "号销售卖出:" + house.saleVolume.get()); } finally { house.saleVolume.remove(); } }, String.valueOf(i)).start(); } try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t" + "共计卖出多少套: " + house.saleCount); } }
**【强制】**必须回收自定义的ThreadLocal变量,尤其在线程池场景下,线程经常会被复用,如果不清理自定义的ThreadLocal变量,可能会影响后续业务逻辑和造成内存泄露等问题。尽量在代理中使用try-finally块进行回收。
2. 源码分析 Thread,ThreadLocal,ThreadLocalMap 关系
threadLocalMap实际上就是一个以threadLocal实例为key,任意对象为value的Entry对象。 当我们为threadLocal变量赋值,实际上就是以当前threadLocal实例为key,值为value的Entry往这个threadLocalMap中存放。
近似的可以理解为:
ThreadLocalMap从字面上就可以看出这是一个保存ThreadLocal对象的map(其实是以ThreadLocal为Key),不过是经过了两层包装的ThreadLocal对象:
JVM内部维护了一个线程版的Map<Thread,T>(通过ThreadLocal对象的set方法,结果把ThreadLocal对象自己当做key,放进了ThreadLoalMap中),每个线程要用到这个T的时候,用当前的线程去Map里面获取,通过这样让每个线程都拥有了自己独立的变量,人手一份,竞争条件被彻底消除,在并发模式下是绝对安全的变量。
ThreadLocal是一个壳子,真正的存储结构是TheadLocal里有ThreadLocalMap这么个内部类,每个Thread对象维护着一个ThreadLoclMap的引用,ThreadLocalMap是ThreadLocal的内部类,用Entry来进行存储。
调用ThreadLocali的set()方法时,实际上就是往ThreadLocalMap设置值,key是ThreadLocal对象,值Value是传递进来的对象
调用ThreadLocal的get()方法时,实际上就是往ThreadLocalMap获取值,key是ThreadLocal对象
ThreadLocal本身并不存储值(ThreadLocal是一个壳子),它只是自己作为一个key来让线程从ThreadLocalMap获取value。正因为这个原理,所以ThreadLocal能够实现“数据隔离”,获取当前线程的局部变量值,不受其他线程影响。
3. 引用类型
java 技术允许使用 finalize() 方法在垃圾收集器将对象从内存中清除出去之前做必要的清理工作。
1 2 3 4 5 6 7 8 class MyObject { @Override protected void finalize () throws Throwable { System.out.println("-------invoke finalize method~!!!" ); } }
1. 强引用 当内存不足,JVM开始垃圾回收,对于强引用的对象,就算是出现了OOM也不会对该对象进行回收,死都不收。
强引用是我们最常见的普通对象引用,只要还有强引用指向一个对象,就能表明对象还“活着”,垃圾收集器不会碰这种对象。在 Java 中最常见的就是强引用,把一个对象赋给一个引用变量,这个引用变量就是一个强引用。当一个对象被强引用变量引用时,它处于可达状态,它是不可能被垃圾回收机制回收的,即使该对象以后永远都不会被用到JVM也不会回收。因此强引用是造成Java内存泄漏的主要原因之一。
对于一个普通的对象,如果没有其他的引用关系,只要超过了引用的作用域或者显式地将相应(强)引用赋值为 null,一般认为就是可以被垃圾收集的了(当然具体回收时机还是要看垃圾收集策略)。
1 2 3 4 5 6 7 8 9 10 public static void strongReference () { MyObject myObject = new MyObject (); System.out.println("-----gc before: " +myObject); myObject = null ; System.gc(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("-----gc after: " +myObject); }
2. 软引用 软引用是一种相对强引用弱化了一些的引用,需要用java.lang.ref.SoftReference类来实现,可以让对象豁免一些垃圾收集。
对于只有软引用的对象来说,
当系统内存充足时它 不会 被回收,
当系统内存不足时它 会 被回收。
软引用通常用在对内存敏感的程序中,比如高速缓存就有用到软引用,内存够用的时候就保留,不够用就回收!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private static void softReference () { SoftReference<MyObject> softReference = new SoftReference <>(new MyObject ()); System.gc(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("-----gc after内存够用: " + softReference.get()); try { byte [] bytes = new byte [20 * 1024 * 1024 ]; } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("-----gc after内存不够: " + softReference.get()); } }
3. 弱引用 弱引用需要用java.lang.ref.WeakReference类来实现,它比软引用的生存期更短,对于只有弱引用的对象来说,只要垃圾回收机制一运行,不管JVM的内存空间是否足够,都会回收该对象占用的内存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private static void weakReference () { WeakReference<MyObject> weakReference = new WeakReference <>(new MyObject ()); System.out.println("-----gc before 内存够用: " + weakReference.get()); System.gc(); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("-----gc after 内存够用: " + weakReference.get()); }
软引用和弱引用的适用场景
假如有一个应用需要读取大量的本地图片:
如果每次读取图片都从硬盘读取则会严重影响性能,
如果一次性全部加载到内存中又可能造成内存溢出。
此时使用软引用可以解决这个问题。
设计思路是:用一个HashMap来保存图片的路径和相应图片对象关联的软引用之间的映射关系,在内存不足时,JVM会自动回收这些缓存图片对象所占用的空间,从而有效地避免了OOM的问题。
Map<String, SoftReference> imageCache = new HashMap<String, SoftReference>();
4. 虚引用 虚引用需要java.lang.ref.PhantomReference类来实现。
顾名思义,就是形同虚设,与其他几种引用都不同,虚引用并不会决定对象的生命周期。如果一个对象仅持有虚引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收器回收,它不能单独使用也不能通过它访问对象,虚引用必须和引用队列 (ReferenceQueue)联合使用。
虚引用的主要作用是跟踪对象被垃圾回收的状态。 仅仅是提供了一种确保对象被 finalize以后,做某些事情的机制。 PhantomReference的get方法总是返回null,因此无法访问对应的引用对象。
其意义在于:说明一个对象已经进入finalization阶段,可以被gc回收,用来实现比finalization机制更灵活的回收操作。
换句话说,设置虚引用关联的唯一目的,就是在这个对象被收集器回收的时候收到一个系统通知或者后续添加进一步的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private static void phantomReference () { MyObject myObject = new MyObject (); ReferenceQueue<MyObject> referenceQueue = new ReferenceQueue <>(); PhantomReference<MyObject> phantomReference = new PhantomReference <>(myObject, referenceQueue); System.out.println(phantomReference.get()); List<byte []> list = new ArrayList <>(); new Thread (() -> { while (true ) { list.add(new byte [1024 * 1024 ]); try { TimeUnit.MILLISECONDS.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(phantomReference.get() + "\t" + "list add ok" ); } }, "t1" ).start(); new Thread (() -> { while (true ) { Reference<? extends MyObject > reference = referenceQueue.poll(); if (reference != null ) { System.out.println("-----有虚对象回收加入了队列" ); break ; } } }, "t2" ).start(); }
5. 小结
4. 内存泄漏 不再会被使用的对象或者变量占用的内存不能被回收,就是内存泄露。
ThreadLocalMap从字面上就可以看出这是一个保存ThreadLocal对象的map(其实是以它为Key),不过是经过了两层包装的ThreadLocal对象:
第一层包装是使用 WeakReference<ThreadLocal<?>> 将ThreadLocal对象变成一个弱引用的对象
第二层包装是定义了一个专门的类 Entry 来扩展 WeakReference<ThreadLocal<?>>
5. 为何用弱引用 1 2 3 4 5 6 7 public void function01 () { ThreadLocal tl = new ThreadLocal <Integer>(); tl.set(2021 ); tl.get(); }
当function01方法执行完毕后,栈帧销毁强引用 tl 也就没有了。但此时线程的ThreadLocalMap里某个entry的key引用还指向这个对象
若这个key引用是强引用,就会导致key指向的ThreadLocal对象及v指向的对象不能被gc回收,造成内存泄漏
若这个key引用是弱引用就大概率会减少内存泄漏的问题(还有一个key为null的雷)。使用弱引用,就可以使ThreadLocal对象在方法执行完毕后顺利被回收且Entry的key引用指向为null。
1. 仅用它就安全?
当我们为threadLocal变量赋值,实际上就是当前的Entry(threadLocal实例为key,值为value)往这个threadLocalMap中存放。Entry中的key是弱引用,当threadLocal外部强引用被置为null(tl=null),那么系统 GC 的时候,根据可达性分析,这个threadLocal实例就没有任何一条链路能够引用到它,这个ThreadLocal势必会被回收,这样一来,ThreadLocalMap中就会出现key为null的Entry,就没有办法访问这些key为null的Entry的value,如果当前线程再迟迟不结束的话,这些key为null的Entry的value就会一直存在一条强引用链:Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value永远无法回收,造成内存泄漏。
当然,如果当前thread运行结束,threadLocal,threadLocalMap,Entry没有引用链可达,在垃圾回收的时候都会被系统进行回收。
但在实际使用中我们有时候会用线程池去维护我们的线程,比如在Executors.newFixedThreadPool()时创建线程的时候,为了复用线程是不会结束的,所以threadLocal内存泄漏就值得我们小心
2. key 为null的Entry ThreadLocalMap使用ThreadLocal的弱引用作为key,如果一个ThreadLocal没有外部强引用引用他,那么系统gc的时候,这个ThreadLocal势必会被回收,这样一来,ThreadLocalMap中就会出现key为null的Entry,就没有办法访问这些key为null的Entry的value,如果当前线程再迟迟不结束的话(比如正好用在线程池),这些key为null的Entry的value就会一直存在一条强引用链。
虽然弱引用,保证了key指向的ThreadLocal对象能被及时回收,但是v指向的value对象是需要ThreadLocalMap调用get、set时发现key为null时才会去回收整个entry、value,因此弱引用不能100%保证内存不泄露。我们要在不使用某个ThreadLocal对象后,手动调用remoev方法来删除它,尤其是在线程池中,不仅仅是内存泄露的问题,因为线程池中的线程是重复使用的,意味着这个线程的ThreadLocalMap对象也是重复使用的,如果我们不手动调用remove方法,那么后面的线程就有可能获取到上个线程遗留下来的value值,造成bug。
3. 清除脏 Entry 源码 从ThreadLocal的set,get,remove方法源码看出,在threadLocal的生命周期里,针对threadLocal存在的内存泄漏的问题, 都会通过expungeStaleEntry,cleanSomeSlots,replaceStaleEntry这三个方法清理掉key为null的脏entry。
6. 总结
ThreadLocal 并不解决线程间共享数据的问题
ThreadLocal 适用于变量在线程间隔离且在方法间共享的场景
ThreadLocal 通过隐式的在不同线程内创建独立实例副本避免了实例线程安全的问题
每个线程持有一个只属于自己的专属Map并维护了ThreadLocal对象与具体实例的映射,该Map由于只被持有它的线程访问,故不存在线程安全以及锁的问题
ThreadLocalMap的Entry对ThreadLocal的引用为弱引用,避免了ThreadLocal对象无法被回收的问题
都会通过expungeStaleEntry,cleanSomeSlots,replaceStaleEntry这三个方法回收键为 null 的 Entry 对象的值(即为具体实例)以及 Entry 对象本身从而防止内存泄漏,属于安全加固的方法
群雄逐鹿起纷争,人各一份天下安
9. Unsafe 1. 概述 Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class UnsafeAccessor { static Unsafe unsafe; static { try { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe" ); theUnsafe.setAccessible(true ); unsafe = (Unsafe) theUnsafe.get(null ); } catch (NoSuchFieldException | IllegalAccessException e) { throw new Error (e); } } static Unsafe getUnsafe () { return unsafe; }
2. Unsafe CAS 操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class TestUnsafe { public static void main (String[] args) throws NoSuchFieldException, IllegalAccessException { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe" ); theUnsafe.setAccessible(true ); Unsafe unsafe = (Unsafe) theUnsafe.get(null ); System.out.println(unsafe); long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id" )); long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name" )); Teacher t = new Teacher (); unsafe.compareAndSwapInt(t, idOffset, 0 , 1 ); unsafe.compareAndSwapObject(t, nameOffset, null , "张三" ); System.out.println(t); } } @Data class Teacher { volatile int id; volatile String name; }
1 2 3 # 结果 sun.misc.Unsafe@5acf9800 Teacher(id=1, name=张三)
3. 自定义原子类 使用自定义的 AtomicData 实现之前线程安全的原子整数 Account 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 class AtomicData implements Account { private volatile int value; private static final long valueOffset; private static final Unsafe UNSAFE; static { UNSAFE = UnsafeAccessor.getUnsafe(); try { valueOffset = UNSAFE.objectFieldOffset(AtomicData.class.getDeclaredField("value" )); } catch (NoSuchFieldException e) { e.printStackTrace(); throw new RuntimeException (e); } } public int getValue () { return value; } public void decrement (int amount) { while (true ) { int prev = this .value; int next = prev - amount; if (UNSAFE.compareAndSwapInt(this , valueOffset, prev, next)) { break ; } } } public AtomicData (int value) { this .value = value; } @Override public Integer getBalance () { return getValue(); } @Override public void withdraw (Integer amount) { decrement(amount); } } interface Account { Integer getBalance () ; void withdraw (Integer amount) ; static void demo (Account account) { List<Thread> ts = new ArrayList <>(); for (int i = 0 ; i < 1000 ; i++) { ts.add(new Thread (() -> { account.withdraw(10 ); })); } long start = System.nanoTime(); ts.forEach(Thread::start); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + " cost: " + (end-start)/1000_000 + " ms" ); } }
测试
1 2 3 public static void main (String[] args) { Account.demo(new AtomicData (10000 )); }
10. 小结
CAS 与 volatile
API
原子整数
原子引用
原子数组
字段更新器
原子累加器
ThreadLocal
Unsafe
原理方面