七:JUC并发工具
7.1 CountdownLatch
7.1.1 介绍
假设有一个订单业务,有以下步骤:
- 1.扣减库存—业务处理时间a
- 2.下订单—业务处理时间b
- 3.通知商家—业务处理时间c
- 4.给用户追加积分—业务处理时间d
- 5.给用户分发优惠券—业务处理时间e
- 6.给用户响应—业务处理时间f
其中1,2两个步骤为串行进行,3,4,5三个步骤可以同时进行。等到3,4,5步骤完成后,给用户响应。
如果步骤1-6全部都是串行处理的话,用的时间就是a+b+c+d+e+f。
如果1,2串行,3,4,5并行,最后执行步骤6,那么时间就是a+b+MAX(c,d,e)+f,时间就会大大节省。
CountdownLatch就可以配合线程池完成这样的操作。
介绍:CountdownLatc就是JUC包下一个计数工具,核心功能就是计数器。并且是并发安全的。
用法:给CountdownLatch初始化一个数值,主线程执行await方法,主线程会等待任务执行完毕,等一个业务执行完,执行countdown方法,让CountdownLatch减1,直到CountdownLatch为0,主线程就会被唤醒,继续处理后续任务。
使用场景:当业务中出现多个任务可以并行处理,处理完之后才可以继续下一个任务,这时候就可以使用CountdownLatch。
7.1.2 应用
countDown()方法是一个任务结束后,总数减1。
await()方法是只有等到总数为0,才会进行下面的任务。
await(time,util)是等到时间结束,总数还没有为0的话,会返回一个boolean类型的值,由开发者根据这个值来判断是否要直接执行下面的任务。
CountdownLatch用完之后是不能重新复用的。
package com.xqm.juc.jucUtils;
import java.util.concurrent.*;
public class Test01 {
private static CountDownLatch countDownLatch = new CountDownLatch(3);
private static ExecutorService executor=Executors.newFixedThreadPool(3);
public static void main(String[] args) throws InterruptedException {
System.out.println("主业务开始执行");
sleep(1000);
executor.execute(Test01::a);
executor.execute(Test01::b);
executor.execute(Test01::c);
// 没有参数,就死等任务结束
// countDownLatch.await();
// 参数中加上时间,代表如果超过这时间的话,会返回一个boolean结果的值
// 可以通过判断返回的结果来确定要不要继续往下执行
if (countDownLatch.await(500, TimeUnit.MILLISECONDS)){
System.out.println("三个并行业务执行完毕");
}else {
System.out.println("三个并行业务没有执行完毕");
}
System.out.println("主业务执行完毕");
executor.shutdown();
}
private static void a(){
System.out.println("开始执行A任务");
sleep(1000);
System.out.println("A任务执行完毕");
countDownLatch.countDown();
}
private static void b(){
System.out.println("开始执行B任务");
sleep(1500);
System.out.println("B任务执行完毕");
countDownLatch.countDown();
}
private static void c(){
System.out.println("开始执行C任务");
sleep(2000);
System.out.println("C任务执行完毕");
countDownLatch.countDown();
}
private static void sleep(long timeout){
try {
// 休眠n毫秒
TimeUnit.MILLISECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
7.1.3 源码-核心属性
CountdownLatch就是一个计数器,没有其他特殊的功能,查看源码也只是查看计数器的实现方式。
发现CountdownLatch内部类Sync继承了AQS,CountdownLLatch就是基于AQS实现的计数器。
AQS就是一个state属性,以及双向链表。主线程的阻塞的方式,也是阻塞在双向链表中。
private static final class Sync extends AbstractQueuedSynchronizer {}
7.1.4 源码-有参构造
就是构建内部类Sync,并且给AQS中d的state赋值。
// 有参构造方法,只有一个有参构造
public CountDownLatch(int count) {
// 健壮性校验
if (count < 0) throw new IllegalArgumentException("count < 0");
// 构建内部类
this.sync = new Sync(count);
}
// AQS的子类Sync, 内部类就是传递count
Sync(int count) {
// 就是给AQS中的setState方法
setState(count);
}
// AQS中的setState方法
protected final void setState(int newState) {
state = newState;
}
7.1.5 源码-await方法
await方法实际就是判断CountdownLatch中state是否为0,如果为0,正常执行后面任务,如果不为0,以共享锁的方式,插入到AQS的双向链表,并且挂起线程。
await(time,unit)实际和await()类似,只不过加上了过期时间,时间一到,就会执行下面的语句。
// await方法,使用await会阻塞主线程,直到state为0
public void await() throws InterruptedException {
// 共享锁,允许中断,调用的是AQS中的方法
// 传递的值是1,实际上根本就没用到这个值
sync.acquireSharedInterruptibly(1);
}
// AQS提供的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 查看线程是否中断,如果中断标记位是true,直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
// 共享锁中断操作
doAcquireSharedInterruptibly(arg);
}
// AQS中没有提供方法具体实现,而是需要子类去自己实现
// 这里是Sync类的具体实现
protected int tryAcquireShared(int acquires) {
// 如果state状态是0,那就为1,否则为-1
return (getState() == 0) ? 1 : -1;
}
// AQS中共享锁中断操作
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 封装当前线程为Node,属性为共享锁
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 在这就需要挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
7.1.5 源码-countDown方法
countDown方法实质上就是对state-1,如果state-1之后为0,需要去AQS的链表中唤醒挂起的线程。
// countDown对计数器-1
public void countDown() {
sync.releaseShared(1);
}
// AQS提供的功能,args就是1
public final boolean releaseShared(int arg) {
// 就是对state-1操作,返回true代表state为0
if (tryReleaseShared(arg)) {
// state-1之后为0
doReleaseShared();
return true;
}
return false;
}
// CountdownLatch的tryReleaseShared,对state进行-1的操作
protected boolean tryReleaseShared(int releases) {
// 死循环
for (;;) {
// 获取state
int c = getState();
// 如果state为0,直接返回false
if (c == 0)
return false;
// 否则进行-1的操作
int nextc = c - 1;
// CAS操作,死循环保证成功或者应对状态改变
if (compareAndSetState(c, nextc))
// 赋值完发现state为0,此时可能会有线程在await方法处挂起,
// 需要这边进行唤醒
return nextc == 0;
}
}
// state为0,需要唤醒在await处挂起的线程
private void doReleaseShared() {
// 死循环
for (;;) {
// 拿到head
Node h = head;
// 说明AQS队列中有节点
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果head节点的状态为-1
if (ws == Node.SIGNAL) {
// 将head节点的状态从-1修改为0,避免重复唤醒的情况
// 如果head状态为-1,代表后面有节点挂起
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// 如果失败就重新来一遍
continue; // loop to recheck cases
// 正常唤醒节点,先看head.next,能唤醒就唤醒
// 如果head.next有问题,那就从后往前找有效节点
unparkSuccessor(h);
// 会在semaphore谈到这个位置
// ws==0,代表后面的节点没挂起
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
7.2 CyclicBarrier
7.2.1 介绍
CyclicBarrier从名字上看,是循环屏障的意思。
Barrier:让一个或多个线程到达一个屏障点,然后阻塞。屏障点有一个数值,当到达一个线程阻塞在屏障点时,屏障点的数值就会-1,当屏障点的数值减为0时,屏障就会打开,唤醒所有阻塞在屏障点的线程。在释放屏障之后,可以先执行一个任务,再让所有阻塞被唤醒的线程继续之后后续的任务。
Cyclic:所有线程被释放之后,屏障点的数值可以再次被重置。
CyclicBarrier一般被称为栅栏。
CyclicBarrier是一种同步机制,允许一组线程相互等待(而CountdownLatch是其他线程等待CountDownLatch为0就开始执行)。基于await方法在屏障点等待。
CyclicBarrier没有基于AQS实现,而是基于ReentrantLock锁的机制去实现屏障点,以及挂起线程的操作。(CountdownLatch本身是基于AQS,对state进行release操作后,可以-1)。
CyclicBarrier每来一个线程进行await操作后,就对数值进行-1的操作,每次-1之后,立即查看数值是否为0,如果为0,直接唤醒所有相互等待的线程。
CyclicBarrier和CountdownLatch的区别:
- 底层实现:CyclicBarrier基于ReentrantLock,而CountdownLatch基于AQS
- 应用场景:CountdownLatch只能使用一次。而CyclicBarrier到达0之后可以重置。其次CyclicBarrier可以实现更复杂的场景,执行业务出现错误后,可以重置计数器,再次执行。
- CyclicBarrier可以实现更多功能
- 可以获取阻塞的线程有多少
- 如果有等待的线程中断,可以抛出异常,避免死锁等情况
- CountdownLatch一般是让主线程等待,让子线程对计数器-1操作。CyclicBarrier更多的让子线程也一起计数等待,等待达到数值之后,再统一唤醒。
7.2.2 应用
在构建CyclicBarrier时,可以指定barrierAction,如果指定了,那么会在barrier归零后,优先执行barrierAction任务,然后再去唤醒所有阻塞挂起的任务,并行处理后续的任务。
如果在等待期间,有线程中断了,唤醒所有线程后,CyclicBarrier无法继续使用。
在CyclicBarrier的屏障数值到达0之后,会默认重置屏障数值,在没有线程中断时,会重复使用。
package com.xqm.juc.jucUtils;
import java.util.concurrent.CyclicBarrier;
public class Test02_CyclicBarrier {
public static void main(String[] args) {
// CyclicBarrier中如果有任务,那么等屏障数值为0,先执行这个任务,再执行其他的任务
CyclicBarrier cyclicBarrier=new CyclicBarrier(3,()->{
System.out.println("等到人员到位之后,分发护照");
});
new Thread(()->{
System.out.println("Tom到达");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Tom开始出发");
},"Tom").start();
new Thread(()->{
System.out.println("Jack到达");
try {
cyclicBarrier.await();
} catch (Exception e) {
System.out.println("人没到齐");
e.printStackTrace();
}
System.out.println("Jack开始出发");
},"Jack").start();
new Thread(()->{
System.out.println("Rose到达");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Rose开始出发");
},"Rose").start();
}
}
结果:
Tom到达
Jack到达
Rose到达
等到人员到位之后,分发护照
Rose开始出发
Tom开始出发
Jack开始出发
线程中断后,需要调用reset方法,才能继续使用CyclicBarrier。
package com.xqm.juc.jucUtils;
import java.util.concurrent.CyclicBarrier;
public class Test03_CyclicBarrier {
public static void main(String[] args) throws InterruptedException {
// CyclicBarrier中如果有任务,那么等屏障数值为0,先执行这个任务,再执行其他的任务
CyclicBarrier cyclicBarrier=new CyclicBarrier(3,()->{
System.out.println("等到人员到位之后,分发护照");
});
new Thread(()->{
System.out.println("Tom到达");
try {
cyclicBarrier.await();
} catch (Exception e) {
System.out.println("tom人没到齐");
return;
}
System.out.println("Tom开始出发");
},"Tom").start();
Thread thread=new Thread(()->{
System.out.println("Rose到达");
try {
cyclicBarrier.await();
} catch (Exception e) {
System.out.println("rose人没到齐");
return;
}
System.out.println("Rose开始出发");
},"Rose");
thread.start();
Thread.sleep(2000);
thread.interrupt();
Thread.sleep(2000);
// 重置计数器,即便中断后,也可以使用
cyclicBarrier.reset();
new Thread(()->{
System.out.println("Jack到达");
try {
cyclicBarrier.await();
} catch (Exception e) {
System.out.println("jack人没到齐");
return;
}
System.out.println("Jack开始出发");
},"Jack").start();
}
}
7.2.3 源码-核心属性
// cyclicBarrier核心属性
// 静态内部类,用来标记是否中断
private static class Generation {
boolean broken = false;
}
// 使用ReentrantLock实现互斥操作
private final ReentrantLock lock = new ReentrantLock();
// 使用lock的condition实现线程的挂起和唤醒
private final Condition trip = lock.newCondition();
// 记录有参构造原始传入的屏障数值,不会对这个数值进行操作
private final int parties;
// 当屏障数值达到0时,优先执行当前任务
private final Runnable barrierCommand;
// 初始化默认的generation,用来标记线程中断情况
private Generation generation = new Generation();
// 每来一个线程await,就对count进行减1的操作
// 当count==0时,会触发任务,重新赋值为parties
private int count;
7.2.4 源码-有参构造
// 有参构造
public CyclicBarrier(int parties, Runnable barrierAction) {
// 健壮性校验
if (parties <= 0) throw new IllegalArgumentException();
// 屏障值赋值,不会改变,用来循环使用
this.parties = parties;
// 真实计数的数值,每来一个await,就进行-1的操作,到达0之后,重新赋值为parties
this.count = parties;
// 当到达屏障时立刻执行的任务
this.barrierCommand = barrierAction;
}
7.2.5 源码-await方法
CyclicBarrier提供了两种await方法:
- 一种是无参构造方法,线程会死等下去,直到屏障值归0,或者有线程中断。
- 一种是有参构造方法,传入等待的时间,要么等到时间归0,要么屏障值归0,要么有线程中断。
- 无论哪种方法,实际调用的都是dowait方法,只不过传入的参数不同
// 实际调用的是dowait,传入的是false和0秒
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
// 实际调用的是dowait,传入的是true和实际的纳秒
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
7.2.6 源码-dowait方法
// await方法的核心
private int dowait(boolean timed, long nanos)
// 当前线程中断,当前线程抛出异常
throws InterruptedException,
//` 其他线程中断,当前线程抛出异常
BrokenBarrierException,
// 超时时间异常
TimeoutException {
// 获取锁
final ReentrantLock lock = this.lock;
// 加锁,保证多线程的安全
lock.lock();
try {
// 拿到generation对象,用来标记线程中断情况
final Generation g = generation;
// 判断线程已经中断的话,直接抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 如果当前线程抛出异常
if (Thread.interrupted()) {
// 重置broken为true,重置屏障数值,唤醒其他等待的线程
breakBarrier();
// 抛出异常
throw new InterruptedException();
}
// 说明没有线程中断,直接对count-1操作
// index是减1之后的值
int index = --count;
// 如果减1之后为0,需要打开屏障
if (index == 0) {
// 一个标记
boolean ranAction = false;
try {
// 拿到有参构造中传递的任务
final Runnable command = barrierCommand;
// 如果任务不为null
if (command != null)
// 优先执行当前任务
command.run();
// 如果任务执行的没问题,那就赋值为true
ranAction = true;
// 唤醒所有线程,重置count,重置中断标记位
nextGeneration();
return 0;
} finally {
// 如果执行有参构造中任务出现问题,进行重置操作
if (!ranAction)
breakBarrier();
}
}
// 如果减1之后屏障值不为0,那就一直循环等待线程中断,或者超时
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// timed为false,代表没有时间限制,需要死等
if (!timed)
trip.await();
// 否则就需要判断时间是否超时
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 到这代表线程被中断了
// 查看generation有没有被重置,reset方法是被重置
// 查看当前broken为false,需要做线程中断后的操作
if (g == generation && ! g.broken) {
// 重置一些属性
breakBarrier();
// 抛出异常
throw ie;
// 如果被重置了,或者被中断了
} else {
// 直接中断当前线程
Thread.currentThread().interrupt();
}
}
// 如果其他线程被中断
if (g.broken)
// 抛出异常
throw new BrokenBarrierException();
// 如果调用了reset方法重置了,或者代表任务执行完毕了自动重置
if (g != generation)
// 直接返回index,也就是减1之后的数值
return index;
// 如果指定等待的时间,但是时间到了
if (timed && nanos <= 0L) {
// 执行一些重置操作
breakBarrier();
// 抛出超时异常
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}
breakBarrier:
private void breakBarrier() {
// 重置中断标记为true
generation.broken = true;
// 重置屏障数值
count = parties;
// 唤醒所有线程
trip.signalAll();
}
nextGeneration:重置eset或者任务执行完自动重置方法
private void nextGeneration() {
// 唤醒所有的线程
trip.signalAll();
// 重置计数
count = parties;
// 重置generation中断标记位
generation = new Generation();
}
7.2.7 源码-reset方法
// reset方法就是先进行中断,再进行重置操作
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
7.3 Semaphore
7.3.1 介绍
Semaphore:信号量。Synchronized和ReentrantLock是互斥锁,保证一个资源同一时间只能被一个线程访问。而Semaphore保证一个或多个资源可以被指定数量的线程访问。Semaphore的底层是基于AQS实现的。
Semaphore的底层也是基于AQS的state属性做一个计数器的维护。
- state的值代表当前共享资源的个数;
- 如果一个线程需要获取一个或多个资源,需要查看state的资源个数是否足够;
- 如果足够,直接对state-1操作,然后获取资源;
- 如果资源不够,当前线程就需要挂起等待;
- 直到持有资源的线程释放资源后,会归还给semaphore的state属性,挂起的线程会重新被唤醒。
Semaphore也分为公平和非公平的概念。
使用场景:
- 连接池对象可以借助信号量去实现管理。
- 在一些流量控制上,也可以采用信号量去是实现。
- 比如买票和人数,票就是资源,一个人买一张票,资源就减1,直到资源不足。
7.3.2 应用
Semaphore对资源的获取和释放操作:
获取资源:
- acquire():获取一个资源,没有资源就挂起等待,如果中断,直接抛异常
- acquire(int):获取指定个数资源,没有资源就挂起等待,如果中断,直接抛异常
- tryAcquire():尝试获取一个资源,没有资源就返回false,否则返回true
- tryAcquire(int):尝试获取指定个数资源,没有资源就返回false,否则返回true
- tryAcquire(time,unit):尝试获取一个资源,没有资源就等待time时间,如果时间内获取不到资源,就返回false
- tryAcquire(int,time,unit):尝试获取指定个数资源,没有资源就等待time时间,如果时间内获取不到资源,就返回false
- acquireUninterruptibly():获取一个资源,没有资源就挂起等待,中断线程不结束,继续等待
- acquireUninterruptibly(int):获取指定个数资源,没有资源就挂起等待,中断线程不结束,继续等待
释放资源:
- release():释放一个资源
- release(int):释放指定个数资源
package com.xqm.juc.jucUtils;
import java.util.concurrent.Semaphore;
public class Test04_Semaphore {
public static void main(String[] args) throws InterruptedException {
// 假设环球影城还有10个人流量
// 默认为非公平
Semaphore semaphore = new Semaphore(10);
new Thread(() -> {
try {
System.out.println("一家三口要去玩,准备买票");
//无参构造,每次拿一个资源
//有参构造,每次拿指定资源
semaphore.acquire(3);
System.out.println("一家三口获取到门票,准备玩了");
Thread.sleep(10000);
System.out.println("一家三口观赏结束了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 用完要释放
semaphore.release(3);
}
}).start();
for (int i = 0; i < 7; i++) {
int j = i;
new Thread(() -> {
System.out.println("人" + j + "来了");
try {
semaphore.acquire();
System.out.println("人" + j + "获取到门票,准备玩了");
Thread.sleep(10000);
System.out.println("人" + j + "观赏结束了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
Thread.sleep(10);
System.out.println("main来了");
if (semaphore.tryAcquire()) {
System.out.println("main获取到门票");
semaphore.release();
} else {
System.out.println("main获取不到门票");
}
Thread.sleep(10000);
System.out.println("main又来了");
if (semaphore.tryAcquire()) {
System.out.println("main获取到门票");
semaphore.release();
} else {
System.out.println("main获取不到门票");
}
}
}
7.3.3 整体结构
Semaphore中有三个静态内部类:
- 向上抽取的Sync
abstract static class Sync extends AbstractQueuedSynchronizer{...}
- Sync的子类NonfairSync
static final class NonfairSync extends Sync{...}
- Sync的子类FairSync
static final class FairSync extends Sync{...}
Sync内部主要提供了一些公共的方法,并且有参构造传入的是资源个数,基于AQS的setState方法设置了state属性。
而NonfairSync和FairSync的区别就是tryAcquireShared()方法实现的不同。
7.3.4 源码-构造函数
构造函数有两个,一个是只有一个参数,就是只有资源个数的,默认调用的是非公平的获取资源方法。
public Semaphore(int permits) {
// 非公平的获取资源
sync = new NonfairSync(permits);
}
另一种是两个参数,一个是资源个数,另一个是指定采用公平还是非公平的方式获取资源。
public Semaphore(int permits, boolean fair) {
// 如果为true就是公平获取资源,否则就是非公平
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
7.3.5 源码-非公平获取资源
无参的acquire默认获取的资源个数是一个。并且acquire是允许中断线程,然后抛出异常。
获取资源的方式就是使用CAS直接将state-1,如果成功代表获取资源成功,如果失败则代表资源数不够,就基于共享锁的方式去将当前线程挂起在AQS双向链表中,如果基于doAcquireSharedInterruptibly拿锁成功,会执行setHeadAndPropagate方法。
acquire:
// 获取信号量的方法,默认获取一个资源
public void acquire() throws InterruptedException {
// 跳转到AQS中获取共享锁的方法
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly:
// AQS提供的获取共享锁的方法,允许中断抛异常
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果当前线程中断标记位已经是true
if (Thread.interrupted())
// 那么直接抛异常
throw new InterruptedException();
// tryAcquireShared()方法就是区分公平和非公平获取资源
// tryAcquireShared(arg):如果返回小于0,代表获取资源失败,需要排队
// tryAcquireShared(arg):如果返回大于等于0,代表获取资源成功,直接执行业务代码
if (tryAcquireShared(arg) < 0)
// 获取资源失败,进行排队
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared:
// 信号量的非公平获取资源
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
nonfairTryAcquireShared:
// 信号量的内部类Sync中非公平获取资源
// acquires:就是想要拿取的资源个数
final int nonfairTryAcquireShared(int acquires) {
// 死循环获取资源
for (;;) {
// 获取state的数值,也就是剩余的资源个数
int available = getState();
// 判断拿取过后剩余的资源个数
int remaining = available - acquires;
// 如果剩余资源个数小于0,直接返回资源个数
if (remaining < 0 ||
// 如果资源个数大于等于0,也就是资源足够,
// 那么基于CAS将state原值改为remaining
compareAndSetState(available, remaining))
// 所以最后只会有两种结果:
// remaining<0:代表资源不足
// remaining》=0:资源足够并且CAS获取资源成功
return remaining;
}
}
doAcquireSharedInterruptibly
// 获取资源失败,当前线程需要挂起等待
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 构建Node节点,线程和共享锁标记,并且到AQS双向链表中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 拿到上一个节点
final Node p = node.predecessor();
// 如果p是head节点,也就是新建的node是head.next,就可以抢一波资源
if (p == head) {
// 抢占资源
int r = tryAcquireShared(arg);
// 资源抢占成功
if (r >= 0) {
// 做一些操作
// 设置head的一些操作
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 如果不是head.next节点,将前继节点的状态改为-1,并挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
// 如果线程中断,则抛出异常
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 如果没有获取到资源,取消当前节点
if (failed)
// 取消节点参考ReentrantLock
cancelAcquire(node);
}
}
7.3.6 其他获取资源的方法
- acquire()和acquire(int)方法,都是执行acquireSharedInterruptibly()方法,区别就是传入的资源个数不同。
- tryAcquire()和tryAcquire(int)两个方法,只使用非公平锁的实现方法。因为只有非公平才可能获取到锁。
- 但是tryAcquire(int permits, long timeout, TimeUnit unit)这种方法是正常走的AQS给的acquire方法,因为这种方法可以排队一段时间,在排队时是可能获取到资源的。
- acquireUninterruptibly()以及acquireUninterruptibly(int)只是在挂起线程后,不会因为线程的中断而抛异常。
7.3.7 源码-公平获取资源
公平和非公平方式实现的区别只是在于tryAcquireShared这个方法的不同,其他都是相同的。
公平实现,需要查看AQS中排队的情况。
protected int tryAcquireShared(int acquires) {
// 死循环,直到没有资源获取,或者获取到资源
for (;;) {
// 公平锁走下述逻辑前,会先判断队列中排队的情况
// 如果没有排队的节点,直接走下面逻辑,抢夺资源
// 如果有排队的节点,发现当前节点在head.next位置,也不走if的逻辑
if (hasQueuedPredecessors())
return -1;
// 下面这些和非公平实现一模一样
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
7.3.8 源码-释放资源
因为信号量从头到尾都是共享锁实现,释放资源不分公平还是非公平,直接走AQS的releaseShared()方法。
// semaphore的释放资源方法
public void release() {
sync.releaseShared(1);
}
// AQS的releaseShared方法,不分公平还是非公平
public final boolean releaseShared(int arg) {
// 这个方法是信号量自行实现的
if (tryReleaseShared(arg)) {
// 只要释放资源成功,唤醒AQS中排队的线程
doReleaseShared();
return true;
}
return false;
}
// 信号量实现的释放资源方法
protected final boolean tryReleaseShared(int releases) {
// 死循环
for (;;) {
// 获取到state
int current = getState();
// 将当前的state加上当前要释放的资源数
int next = current + releases;
// 归还完之后的数量如果小于没归还之前的,代表溢出了
// next为负数,健壮性判断
if (next < current) // overflow
// 抛出异常
throw new Error("Maximum permit count exceeded");
// CAS操作,保证原子性,只会有一个线程成功将state修改成功
if (compareAndSetState(current, next))
return true;
}
}
7.3.9 AQS中的PROPAGATE节点
PROPAGATE:传播
7.3.9.1 JDK1.5中Semaphore的执行情况
首先查看4个线程获取信号量资源的情况:
往下查看释放资源的过程会触发什么问题
首先t1释放资源,做了进一步处理
当线程3获取锁资源后,线程2再次释放资源,因为执行点问题,导致线程4无法被唤醒
JDK1.5实现代码
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
7.3.9.2 JDK1.8中Semaphore处理
JDK1.8实现
private void doReleaseShared() {
// 死循环
for (;;) {
// 获取到头节点
Node h = head;
// 判断AQS中有排队的Node节点
if (h != null && h != tail) {
// 拿到头节点的状态
int ws = h.waitStatus;
// 如果为-1,代表后续节点有挂起,要去唤醒
if (ws == Node.SIGNAL) {
// 将head状态由-1改为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继节点
unparkSuccessor(h);
// 如果head状态为0,将0改为-3
// 目的是为了向后面传播
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
// 继续循环
continue; // loop on failed CAS
}
// 当头节点没有变化,也就是没有并发情况
if (h == head) // loop if head changed
// 正常完成释放排队的线程
break;
}
}
private void setHeadAndPropagate(Node node, int propagate) {
// 获取头节点
Node h = head; // Record old head for check below
// 将node设置为新的head
setHead(node);
// 如果propagate大于0,代表还有剩余资源,直接唤醒后续节点,如果不满足,也需要继续往下判断,看是否需要往后判断是否需要传播
// h == null 看成健壮性判断就行
// h.waitStatus < 0之前head节点为负数,说明并发情况下,可能还有资源,需要继续向下唤醒
// 如果当前新head节点状态为负数,继续向下唤醒
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 唤醒当前节点的后继节点
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
1
7.4 Exchanger
交换两个线程之间的通信。
package com.xqm.thread_study.countdownLantch;
import java.util.concurrent.Exchanger;
/**
* @ClassName T07_exchanger
* @Description TODO
* @Author xqm
* @Date 2022/6/12 8:00
* @Version 1.0
*
*
* exchanger 交换器 两个线程之间通信 交换数据
* 只能两个线程交换 否则会线程阻塞
*/
public class T07_exchanger {
static Exchanger<String> exchanger=new Exchanger<>();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
String s = "T1";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t1");
Thread t2 = new Thread(() -> {
String s = "T2";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t2");
Thread t3 = new Thread(() -> {
String s = "T3";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t3");
Thread t4 = new Thread(() -> {
String s = "T4";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t4");
t1.start();
t2.start();
t3.start();
t4.start();
}
}