七:JUC并发工具

7.1 CountdownLatch

7.1.1 介绍

假设有一个订单业务,有以下步骤:

  • 1.扣减库存—业务处理时间a
  • 2.下订单—业务处理时间b
  • 3.通知商家—业务处理时间c
  • 4.给用户追加积分—业务处理时间d
  • 5.给用户分发优惠券—业务处理时间e
  • 6.给用户响应—业务处理时间f

其中1,2两个步骤为串行进行,3,4,5三个步骤可以同时进行。等到3,4,5步骤完成后,给用户响应。

如果步骤1-6全部都是串行处理的话,用的时间就是a+b+c+d+e+f。

如果1,2串行,3,4,5并行,最后执行步骤6,那么时间就是a+b+MAX(c,d,e)+f,时间就会大大节省。

CountdownLatch就可以配合线程池完成这样的操作。

介绍:CountdownLatc就是JUC包下一个计数工具,核心功能就是计数器。并且是并发安全的。

用法:给CountdownLatch初始化一个数值,主线程执行await方法,主线程会等待任务执行完毕,等一个业务执行完,执行countdown方法,让CountdownLatch减1,直到CountdownLatch为0,主线程就会被唤醒,继续处理后续任务。

使用场景:当业务中出现多个任务可以并行处理,处理完之后才可以继续下一个任务,这时候就可以使用CountdownLatch。

7.1.2 应用

countDown()方法是一个任务结束后,总数减1。

await()方法是只有等到总数为0,才会进行下面的任务。

await(time,util)是等到时间结束,总数还没有为0的话,会返回一个boolean类型的值,由开发者根据这个值来判断是否要直接执行下面的任务。

CountdownLatch用完之后是不能重新复用的。

package com.xqm.juc.jucUtils;

import java.util.concurrent.*;

public class Test01 {

    private static CountDownLatch countDownLatch = new CountDownLatch(3);
    private static ExecutorService executor=Executors.newFixedThreadPool(3);

    public static void main(String[] args) throws InterruptedException {
        System.out.println("主业务开始执行");
        sleep(1000);
        executor.execute(Test01::a);
        executor.execute(Test01::b);
        executor.execute(Test01::c);
        // 没有参数,就死等任务结束
        // countDownLatch.await();

        // 参数中加上时间,代表如果超过这时间的话,会返回一个boolean结果的值
        // 可以通过判断返回的结果来确定要不要继续往下执行
        if (countDownLatch.await(500, TimeUnit.MILLISECONDS)){
            System.out.println("三个并行业务执行完毕");
        }else {
            System.out.println("三个并行业务没有执行完毕");
        }
        System.out.println("主业务执行完毕");
        executor.shutdown();
    }

    private static void a(){
        System.out.println("开始执行A任务");
        sleep(1000);
        System.out.println("A任务执行完毕");
        countDownLatch.countDown();
    }

    private static void b(){
        System.out.println("开始执行B任务");
        sleep(1500);
        System.out.println("B任务执行完毕");
        countDownLatch.countDown();
    }
    private static void c(){
        System.out.println("开始执行C任务");
        sleep(2000);
        System.out.println("C任务执行完毕");
        countDownLatch.countDown();
    }


    private static void sleep(long timeout){
        try {
            // 休眠n毫秒
            TimeUnit.MILLISECONDS.sleep(timeout);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

7.1.3 源码-核心属性

CountdownLatch就是一个计数器,没有其他特殊的功能,查看源码也只是查看计数器的实现方式。

发现CountdownLatch内部类Sync继承了AQS,CountdownLLatch就是基于AQS实现的计数器。
AQS就是一个state属性,以及双向链表。主线程的阻塞的方式,也是阻塞在双向链表中。

 private static final class Sync extends AbstractQueuedSynchronizer {}

7.1.4 源码-有参构造

就是构建内部类Sync,并且给AQS中d的state赋值。


// 有参构造方法,只有一个有参构造
public CountDownLatch(int count) {
	// 健壮性校验
	if (count < 0) throw new IllegalArgumentException("count < 0");
	// 构建内部类
	this.sync = new Sync(count);
}

// AQS的子类Sync, 内部类就是传递count
Sync(int count) {
	// 就是给AQS中的setState方法
	setState(count);
}

// AQS中的setState方法
protected final void setState(int newState) {
	state = newState;
}

7.1.5 源码-await方法

await方法实际就是判断CountdownLatch中state是否为0,如果为0,正常执行后面任务,如果不为0,以共享锁的方式,插入到AQS的双向链表,并且挂起线程。

await(time,unit)实际和await()类似,只不过加上了过期时间,时间一到,就会执行下面的语句。

// await方法,使用await会阻塞主线程,直到state为0
public void await() throws InterruptedException {
	// 共享锁,允许中断,调用的是AQS中的方法
	// 传递的值是1,实际上根本就没用到这个值
	sync.acquireSharedInterruptibly(1);
}

// AQS提供的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
	// 查看线程是否中断,如果中断标记位是true,直接抛出异常
	if (Thread.interrupted())
		throw new InterruptedException();
	if (tryAcquireShared(arg) < 0)
		// 共享锁中断操作
		doAcquireSharedInterruptibly(arg);
}
// AQS中没有提供方法具体实现,而是需要子类去自己实现
// 这里是Sync类的具体实现
protected int tryAcquireShared(int acquires) {
	// 如果state状态是0,那就为1,否则为-1
	return (getState() == 0) ? 1 : -1;
}

// AQS中共享锁中断操作
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
	// 封装当前线程为Node,属性为共享锁
	final Node node = addWaiter(Node.SHARED);
	boolean failed = true;
	try {
		for (;;) {
			final Node p = node.predecessor();
			if (p == head) {
				int r = tryAcquireShared(arg);
				if (r >= 0) {
					setHeadAndPropagate(node, r);
					p.next = null; // help GC
					failed = false;
					return;
				}
			}
			// 在这就需要挂起当前线程
			if (shouldParkAfterFailedAcquire(p, node) &&
			        parkAndCheckInterrupt())
				throw new InterruptedException();
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

7.1.5 源码-countDown方法

countDown方法实质上就是对state-1,如果state-1之后为0,需要去AQS的链表中唤醒挂起的线程。

// countDown对计数器-1
public void countDown() {
	sync.releaseShared(1);
}

// AQS提供的功能,args就是1
public final boolean releaseShared(int arg) {
	// 就是对state-1操作,返回true代表state为0
	if (tryReleaseShared(arg)) {
		// state-1之后为0
		doReleaseShared();
		return true;
	}
	return false;
}
// CountdownLatch的tryReleaseShared,对state进行-1的操作
protected boolean tryReleaseShared(int releases) {
	// 死循环
	for (;;) {
		// 获取state
		int c = getState();
		// 如果state为0,直接返回false
		if (c == 0)
			return false;
		// 否则进行-1的操作
		int nextc = c - 1;
		// CAS操作,死循环保证成功或者应对状态改变
		if (compareAndSetState(c, nextc))
			// 赋值完发现state为0,此时可能会有线程在await方法处挂起,
			// 需要这边进行唤醒
			return nextc == 0;
	}
}

// state为0,需要唤醒在await处挂起的线程
private void doReleaseShared() {
	// 死循环
	for (;;) {
		// 拿到head
		Node h = head;
		// 说明AQS队列中有节点
		if (h != null && h != tail) {
			int ws = h.waitStatus;
			// 如果head节点的状态为-1
			if (ws == Node.SIGNAL) {
				// 将head节点的状态从-1修改为0,避免重复唤醒的情况
				// 如果head状态为-1,代表后面有节点挂起
				if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
					// 如果失败就重新来一遍
					continue;            // loop to recheck cases
				// 正常唤醒节点,先看head.next,能唤醒就唤醒
				// 如果head.next有问题,那就从后往前找有效节点
				unparkSuccessor(h);
				// 会在semaphore谈到这个位置
				// ws==0,代表后面的节点没挂起
			} else if (ws == 0 &&
			           !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
				continue;                // loop on failed CAS
		}
		if (h == head)                   // loop if head changed
			break;
	}
}

7.2 CyclicBarrier

7.2.1 介绍

CyclicBarrier从名字上看,是循环屏障的意思。

Barrier:让一个或多个线程到达一个屏障点,然后阻塞。屏障点有一个数值,当到达一个线程阻塞在屏障点时,屏障点的数值就会-1,当屏障点的数值减为0时,屏障就会打开,唤醒所有阻塞在屏障点的线程。在释放屏障之后,可以先执行一个任务,再让所有阻塞被唤醒的线程继续之后后续的任务。

Cyclic:所有线程被释放之后,屏障点的数值可以再次被重置。

CyclicBarrier一般被称为栅栏。

CyclicBarrier是一种同步机制,允许一组线程相互等待(而CountdownLatch是其他线程等待CountDownLatch为0就开始执行)。基于await方法在屏障点等待。

CyclicBarrier没有基于AQS实现,而是基于ReentrantLock锁的机制去实现屏障点,以及挂起线程的操作。(CountdownLatch本身是基于AQS,对state进行release操作后,可以-1)。

CyclicBarrier每来一个线程进行await操作后,就对数值进行-1的操作,每次-1之后,立即查看数值是否为0,如果为0,直接唤醒所有相互等待的线程。

CyclicBarrier和CountdownLatch的区别:

  • 底层实现:CyclicBarrier基于ReentrantLock,而CountdownLatch基于AQS
  • 应用场景:CountdownLatch只能使用一次。而CyclicBarrier到达0之后可以重置。其次CyclicBarrier可以实现更复杂的场景,执行业务出现错误后,可以重置计数器,再次执行。
  • CyclicBarrier可以实现更多功能
    • 可以获取阻塞的线程有多少
    • 如果有等待的线程中断,可以抛出异常,避免死锁等情况
  • CountdownLatch一般是让主线程等待,让子线程对计数器-1操作。CyclicBarrier更多的让子线程也一起计数等待,等待达到数值之后,再统一唤醒。

7.2.2 应用

在构建CyclicBarrier时,可以指定barrierAction,如果指定了,那么会在barrier归零后,优先执行barrierAction任务,然后再去唤醒所有阻塞挂起的任务,并行处理后续的任务。

如果在等待期间,有线程中断了,唤醒所有线程后,CyclicBarrier无法继续使用。


在CyclicBarrier的屏障数值到达0之后,会默认重置屏障数值,在没有线程中断时,会重复使用。

package com.xqm.juc.jucUtils;

import java.util.concurrent.CyclicBarrier;

public class Test02_CyclicBarrier {
  
    public static void main(String[] args) {
        // CyclicBarrier中如果有任务,那么等屏障数值为0,先执行这个任务,再执行其他的任务
        CyclicBarrier cyclicBarrier=new CyclicBarrier(3,()->{
            System.out.println("等到人员到位之后,分发护照");
        });

        new Thread(()->{
            System.out.println("Tom到达");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("Tom开始出发");
        },"Tom").start();

        new Thread(()->{
            System.out.println("Jack到达");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                System.out.println("人没到齐");
                e.printStackTrace();
            }
            System.out.println("Jack开始出发");
        },"Jack").start();

        new Thread(()->{
            System.out.println("Rose到达");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("Rose开始出发");
        },"Rose").start();
    }
}

结果:

Tom到达
Jack到达
Rose到达
等到人员到位之后,分发护照
Rose开始出发
Tom开始出发
Jack开始出发

线程中断后,需要调用reset方法,才能继续使用CyclicBarrier。

package com.xqm.juc.jucUtils;

import java.util.concurrent.CyclicBarrier;

public class Test03_CyclicBarrier {

    public static void main(String[] args) throws InterruptedException {
        // CyclicBarrier中如果有任务,那么等屏障数值为0,先执行这个任务,再执行其他的任务
        CyclicBarrier cyclicBarrier=new CyclicBarrier(3,()->{
            System.out.println("等到人员到位之后,分发护照");
        });

        new Thread(()->{
            System.out.println("Tom到达");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                System.out.println("tom人没到齐");
                return;
            }
            System.out.println("Tom开始出发");
        },"Tom").start();


        Thread thread=new Thread(()->{
            System.out.println("Rose到达");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                System.out.println("rose人没到齐");
                return;
            }
            System.out.println("Rose开始出发");
        },"Rose");
        thread.start();
        Thread.sleep(2000);
        thread.interrupt();
        Thread.sleep(2000);
        // 重置计数器,即便中断后,也可以使用
        cyclicBarrier.reset();
        new Thread(()->{
            System.out.println("Jack到达");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                System.out.println("jack人没到齐");
                return;
            }
            System.out.println("Jack开始出发");
        },"Jack").start();
    }
}

7.2.3 源码-核心属性

// cyclicBarrier核心属性
// 静态内部类,用来标记是否中断
private static class Generation {
	boolean broken = false;
}
// 使用ReentrantLock实现互斥操作
private final ReentrantLock lock = new ReentrantLock();
// 使用lock的condition实现线程的挂起和唤醒
private final Condition trip = lock.newCondition();
// 记录有参构造原始传入的屏障数值,不会对这个数值进行操作
private final int parties;
// 当屏障数值达到0时,优先执行当前任务
private final Runnable barrierCommand;
// 初始化默认的generation,用来标记线程中断情况
private Generation generation = new Generation();
// 每来一个线程await,就对count进行减1的操作
// 当count==0时,会触发任务,重新赋值为parties
private int count;

7.2.4 源码-有参构造

// 有参构造
public CyclicBarrier(int parties, Runnable barrierAction) {
	// 健壮性校验
	if (parties <= 0) throw new IllegalArgumentException();
	// 屏障值赋值,不会改变,用来循环使用
	this.parties = parties;
	// 真实计数的数值,每来一个await,就进行-1的操作,到达0之后,重新赋值为parties
	this.count = parties;
	// 当到达屏障时立刻执行的任务
	this.barrierCommand = barrierAction;
}

7.2.5 源码-await方法

CyclicBarrier提供了两种await方法:

  • 一种是无参构造方法,线程会死等下去,直到屏障值归0,或者有线程中断。
  • 一种是有参构造方法,传入等待的时间,要么等到时间归0,要么屏障值归0,要么有线程中断。
  • 无论哪种方法,实际调用的都是dowait方法,只不过传入的参数不同
// 实际调用的是dowait,传入的是false和0秒
public int await() throws InterruptedException, BrokenBarrierException {
	try {
		return dowait(false, 0L);
	} catch (TimeoutException toe) {
		throw new Error(toe); // cannot happen
	}
}

// 实际调用的是dowait,传入的是true和实际的纳秒
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
	       BrokenBarrierException,
	TimeoutException {
	return dowait(true, unit.toNanos(timeout));
}

7.2.6 源码-dowait方法

// await方法的核心
private int dowait(boolean timed, long nanos)
// 当前线程中断,当前线程抛出异常
throws InterruptedException,
//`	其他线程中断,当前线程抛出异常
	BrokenBarrierException,
	// 超时时间异常
	TimeoutException {
	// 获取锁
	final ReentrantLock lock = this.lock;
	// 加锁,保证多线程的安全
	lock.lock();
	try {
		// 拿到generation对象,用来标记线程中断情况
		final Generation g = generation;
		// 判断线程已经中断的话,直接抛出异常
		if (g.broken)
			throw new BrokenBarrierException();
		// 如果当前线程抛出异常
		if (Thread.interrupted()) {
			// 重置broken为true,重置屏障数值,唤醒其他等待的线程
			breakBarrier();
			// 抛出异常
			throw new InterruptedException();
		}
		// 说明没有线程中断,直接对count-1操作
		// index是减1之后的值
		int index = --count;
		// 如果减1之后为0,需要打开屏障
		if (index == 0) {
			// 一个标记
			boolean ranAction = false;
			try {
				// 拿到有参构造中传递的任务
				final Runnable command = barrierCommand;
				// 如果任务不为null
				if (command != null)
					// 优先执行当前任务
					command.run();
				// 如果任务执行的没问题,那就赋值为true
				ranAction = true;
				// 唤醒所有线程,重置count,重置中断标记位
				nextGeneration();
				return 0;
			} finally {
				// 如果执行有参构造中任务出现问题,进行重置操作
				if (!ranAction)
					breakBarrier();
			}
		}
		// 如果减1之后屏障值不为0,那就一直循环等待线程中断,或者超时
		// loop until tripped, broken, interrupted, or timed out
		for (;;) {
			try {
				// timed为false,代表没有时间限制,需要死等
				if (!timed)
					trip.await();
				// 否则就需要判断时间是否超时
				else if (nanos > 0L)
					nanos = trip.awaitNanos(nanos);
			} catch (InterruptedException ie) {
				// 到这代表线程被中断了
				// 查看generation有没有被重置,reset方法是被重置
				// 查看当前broken为false,需要做线程中断后的操作
				if (g == generation && ! g.broken) {
					// 重置一些属性
					breakBarrier();
					// 抛出异常
					throw ie;
					// 如果被重置了,或者被中断了
				} else {
					// 直接中断当前线程
					Thread.currentThread().interrupt();
				}
			}
			// 如果其他线程被中断
			if (g.broken)
				// 抛出异常
				throw new BrokenBarrierException();
			// 如果调用了reset方法重置了,或者代表任务执行完毕了自动重置
			if (g != generation)
				// 直接返回index,也就是减1之后的数值
				return index;
			// 如果指定等待的时间,但是时间到了
			if (timed && nanos <= 0L) {
				// 执行一些重置操作
				breakBarrier();
				// 抛出超时异常
				throw new TimeoutException();
			}
		}
	} finally {
		// 释放锁
		lock.unlock();
	}
}

breakBarrier:

private void breakBarrier() {
	// 重置中断标记为true
	generation.broken = true;
	// 重置屏障数值
	count = parties;
	// 唤醒所有线程
	trip.signalAll();
}

nextGeneration:重置eset或者任务执行完自动重置方法

private void nextGeneration() {
	// 唤醒所有的线程
	trip.signalAll();
	// 重置计数
	count = parties;
	// 重置generation中断标记位
	generation = new Generation();
}

7.2.7 源码-reset方法

// reset方法就是先进行中断,再进行重置操作
public void reset() {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		breakBarrier();   // break the current generation
		nextGeneration(); // start a new generation
	} finally {
		lock.unlock();
	}
}

7.3 Semaphore

7.3.1 介绍

Semaphore:信号量。Synchronized和ReentrantLock是互斥锁,保证一个资源同一时间只能被一个线程访问。而Semaphore保证一个或多个资源可以被指定数量的线程访问。Semaphore的底层是基于AQS实现的。

Semaphore的底层也是基于AQS的state属性做一个计数器的维护。

  • state的值代表当前共享资源的个数;
  • 如果一个线程需要获取一个或多个资源,需要查看state的资源个数是否足够;
  • 如果足够,直接对state-1操作,然后获取资源;
  • 如果资源不够,当前线程就需要挂起等待;
  • 直到持有资源的线程释放资源后,会归还给semaphore的state属性,挂起的线程会重新被唤醒。

Semaphore也分为公平和非公平的概念。

使用场景:

  • 连接池对象可以借助信号量去实现管理。
  • 在一些流量控制上,也可以采用信号量去是实现。
  • 比如买票和人数,票就是资源,一个人买一张票,资源就减1,直到资源不足。

7.3.2 应用

Semaphore对资源的获取和释放操作:

获取资源:

  • acquire():获取一个资源,没有资源就挂起等待,如果中断,直接抛异常
  • acquire(int):获取指定个数资源,没有资源就挂起等待,如果中断,直接抛异常
  • tryAcquire():尝试获取一个资源,没有资源就返回false,否则返回true
  • tryAcquire(int):尝试获取指定个数资源,没有资源就返回false,否则返回true
  • tryAcquire(time,unit):尝试获取一个资源,没有资源就等待time时间,如果时间内获取不到资源,就返回false
  • tryAcquire(int,time,unit):尝试获取指定个数资源,没有资源就等待time时间,如果时间内获取不到资源,就返回false
  • acquireUninterruptibly():获取一个资源,没有资源就挂起等待,中断线程不结束,继续等待
  • acquireUninterruptibly(int):获取指定个数资源,没有资源就挂起等待,中断线程不结束,继续等待

释放资源:

  • release():释放一个资源
  • release(int):释放指定个数资源
package com.xqm.juc.jucUtils;

import java.util.concurrent.Semaphore;

public class Test04_Semaphore {
    public static void main(String[] args) throws InterruptedException {
        // 假设环球影城还有10个人流量
        // 默认为非公平
        Semaphore semaphore = new Semaphore(10);

        new Thread(() -> {
            try {
                System.out.println("一家三口要去玩,准备买票");
                //无参构造,每次拿一个资源
                //有参构造,每次拿指定资源
                semaphore.acquire(3);
                System.out.println("一家三口获取到门票,准备玩了");
                Thread.sleep(10000);
                System.out.println("一家三口观赏结束了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 用完要释放
                semaphore.release(3);
            }
        }).start();

        for (int i = 0; i < 7; i++) {
            int j = i;
            new Thread(() -> {
                System.out.println("人" + j + "来了");
                try {
                    semaphore.acquire();
                    System.out.println("人" + j + "获取到门票,准备玩了");
                    Thread.sleep(10000);
                    System.out.println("人" + j + "观赏结束了");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }).start();
        }

        Thread.sleep(10);

        System.out.println("main来了");
        if (semaphore.tryAcquire()) {
            System.out.println("main获取到门票");
            semaphore.release();
        } else {
            System.out.println("main获取不到门票");
        }

        Thread.sleep(10000);

        System.out.println("main又来了");
        if (semaphore.tryAcquire()) {
            System.out.println("main获取到门票");
            semaphore.release();
        } else {
            System.out.println("main获取不到门票");
        }
    }
}

7.3.3 整体结构

Semaphore中有三个静态内部类:

  • 向上抽取的Sync
    abstract static class Sync extends AbstractQueuedSynchronizer{...}
    
  • Sync的子类NonfairSync
    static final class NonfairSync extends Sync{...}
    
  • Sync的子类FairSync
    static final class FairSync extends Sync{...}
    

Sync内部主要提供了一些公共的方法,并且有参构造传入的是资源个数,基于AQS的setState方法设置了state属性。

而NonfairSync和FairSync的区别就是tryAcquireShared()方法实现的不同。

7.3.4 源码-构造函数

构造函数有两个,一个是只有一个参数,就是只有资源个数的,默认调用的是非公平的获取资源方法。

public Semaphore(int permits) {
		// 非公平的获取资源
        sync = new NonfairSync(permits);
    }

另一种是两个参数,一个是资源个数,另一个是指定采用公平还是非公平的方式获取资源。

public Semaphore(int permits, boolean fair) {
		// 如果为true就是公平获取资源,否则就是非公平
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

7.3.5 源码-非公平获取资源

无参的acquire默认获取的资源个数是一个。并且acquire是允许中断线程,然后抛出异常。

获取资源的方式就是使用CAS直接将state-1,如果成功代表获取资源成功,如果失败则代表资源数不够,就基于共享锁的方式去将当前线程挂起在AQS双向链表中,如果基于doAcquireSharedInterruptibly拿锁成功,会执行setHeadAndPropagate方法。

acquire:

// 获取信号量的方法,默认获取一个资源
public void acquire() throws InterruptedException {
	// 跳转到AQS中获取共享锁的方法
	sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly:

// AQS提供的获取共享锁的方法,允许中断抛异常
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
	// 如果当前线程中断标记位已经是true
	if (Thread.interrupted())
		// 那么直接抛异常
		throw new InterruptedException();
	// tryAcquireShared()方法就是区分公平和非公平获取资源
	// tryAcquireShared(arg):如果返回小于0,代表获取资源失败,需要排队
	// tryAcquireShared(arg):如果返回大于等于0,代表获取资源成功,直接执行业务代码
	if (tryAcquireShared(arg) < 0)
		// 获取资源失败,进行排队
		doAcquireSharedInterruptibly(arg);
}

tryAcquireShared:

// 信号量的非公平获取资源
protected int tryAcquireShared(int acquires) {
	return nonfairTryAcquireShared(acquires);
}

nonfairTryAcquireShared:

// 信号量的内部类Sync中非公平获取资源
// acquires:就是想要拿取的资源个数
final int nonfairTryAcquireShared(int acquires) {
	// 死循环获取资源
	for (;;) {
		// 获取state的数值,也就是剩余的资源个数
		int available = getState();
		// 判断拿取过后剩余的资源个数
		int remaining = available - acquires;
		// 如果剩余资源个数小于0,直接返回资源个数
		if (remaining < 0 ||
		        // 如果资源个数大于等于0,也就是资源足够,
		        // 那么基于CAS将state原值改为remaining
		        compareAndSetState(available, remaining))
			// 所以最后只会有两种结果:
			// remaining<0:代表资源不足
			// remaining》=0:资源足够并且CAS获取资源成功
			return remaining;
	}
}

doAcquireSharedInterruptibly

// 获取资源失败,当前线程需要挂起等待
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
	// 构建Node节点,线程和共享锁标记,并且到AQS双向链表中
	final Node node = addWaiter(Node.SHARED);
	boolean failed = true;
	try {
		for (;;) {
			// 拿到上一个节点
			final Node p = node.predecessor();
			// 如果p是head节点,也就是新建的node是head.next,就可以抢一波资源
			if (p == head) {
				// 抢占资源
				int r = tryAcquireShared(arg);
				// 资源抢占成功
				if (r >= 0) {
					// 做一些操作
					// 设置head的一些操作
					setHeadAndPropagate(node, r);
					p.next = null; // help GC
					failed = false;
					return;
				}
			}
			// 如果不是head.next节点,将前继节点的状态改为-1,并挂起当前线程
			if (shouldParkAfterFailedAcquire(p, node) &&
			        // 如果线程中断,则抛出异常
			        parkAndCheckInterrupt())
				throw new InterruptedException();
		}
	} finally {
		// 如果没有获取到资源,取消当前节点
		if (failed)
			// 取消节点参考ReentrantLock
			cancelAcquire(node);
	}
}

7.3.6 其他获取资源的方法

  • acquire()和acquire(int)方法,都是执行acquireSharedInterruptibly()方法,区别就是传入的资源个数不同。
  • tryAcquire()和tryAcquire(int)两个方法,只使用非公平锁的实现方法。因为只有非公平才可能获取到锁。
  • 但是tryAcquire(int permits, long timeout, TimeUnit unit)这种方法是正常走的AQS给的acquire方法,因为这种方法可以排队一段时间,在排队时是可能获取到资源的。
  • acquireUninterruptibly()以及acquireUninterruptibly(int)只是在挂起线程后,不会因为线程的中断而抛异常。

7.3.7 源码-公平获取资源

公平和非公平方式实现的区别只是在于tryAcquireShared这个方法的不同,其他都是相同的。

公平实现,需要查看AQS中排队的情况。

protected int tryAcquireShared(int acquires) {
	// 死循环,直到没有资源获取,或者获取到资源
	for (;;) {
		// 公平锁走下述逻辑前,会先判断队列中排队的情况
		// 如果没有排队的节点,直接走下面逻辑,抢夺资源
		// 如果有排队的节点,发现当前节点在head.next位置,也不走if的逻辑
		if (hasQueuedPredecessors())
			return -1;
		// 下面这些和非公平实现一模一样
		int available = getState();
		int remaining = available - acquires;
		if (remaining < 0 ||
		        compareAndSetState(available, remaining))
			return remaining;
	}
}

7.3.8 源码-释放资源

因为信号量从头到尾都是共享锁实现,释放资源不分公平还是非公平,直接走AQS的releaseShared()方法。

// semaphore的释放资源方法
public void release() {
	sync.releaseShared(1);
}

// AQS的releaseShared方法,不分公平还是非公平
public final boolean releaseShared(int arg) {
	// 这个方法是信号量自行实现的
	if (tryReleaseShared(arg)) {
		// 只要释放资源成功,唤醒AQS中排队的线程
		doReleaseShared();
		return true;
	}
	return false;
}

// 信号量实现的释放资源方法
protected final boolean tryReleaseShared(int releases) {
	// 死循环
	for (;;) {
		// 获取到state
		int current = getState();
		// 将当前的state加上当前要释放的资源数
		int next = current + releases;
		// 归还完之后的数量如果小于没归还之前的,代表溢出了
		// next为负数,健壮性判断
		if (next < current) // overflow
			// 抛出异常
			throw new Error("Maximum permit count exceeded");
		// CAS操作,保证原子性,只会有一个线程成功将state修改成功
		if (compareAndSetState(current, next))
			return true;
	}
}

7.3.9 AQS中的PROPAGATE节点

PROPAGATE:传播

7.3.9.1 JDK1.5中Semaphore的执行情况

首先查看4个线程获取信号量资源的情况:

image.png

往下查看释放资源的过程会触发什么问题

首先t1释放资源,做了进一步处理

image.png

当线程3获取锁资源后,线程2再次释放资源,因为执行点问题,导致线程4无法被唤醒

image.png

image.png

JDK1.5实现代码

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0) 
            unparkSuccessor(h);
        return true;
    }
    return false;
}


private void setHeadAndPropagate(Node node, int propagate) {
    setHead(node);
    if (propagate > 0 && node.waitStatus != 0) {
        Node s = node.next; 
        if (s == null || s.isShared())
            unparkSuccessor(node);
    }
}

7.3.9.2 JDK1.8中Semaphore处理

image.png

JDK1.8实现

private void doReleaseShared() {
	// 死循环
	for (;;) {
		// 获取到头节点
		Node h = head;
		// 判断AQS中有排队的Node节点
		if (h != null && h != tail) {
			// 拿到头节点的状态
			int ws = h.waitStatus;
			// 如果为-1,代表后续节点有挂起,要去唤醒
			if (ws == Node.SIGNAL) {
				// 将head状态由-1改为0
				if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
					continue;            // loop to recheck cases
				// 唤醒后继节点
				unparkSuccessor(h);
				// 如果head状态为0,将0改为-3
				// 目的是为了向后面传播
			} else if (ws == 0 &&
			           !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
				// 继续循环
				continue;                // loop on failed CAS
		}
		// 当头节点没有变化,也就是没有并发情况
		if (h == head)                   // loop if head changed
			// 正常完成释放排队的线程
			break;
	}
}


private void setHeadAndPropagate(Node node, int propagate) {
	// 获取头节点
	Node h = head; // Record old head for check below
	// 将node设置为新的head
	setHead(node);
	// 如果propagate大于0,代表还有剩余资源,直接唤醒后续节点,如果不满足,也需要继续往下判断,看是否需要往后判断是否需要传播
	//  h == null 看成健壮性判断就行
	//  h.waitStatus < 0之前head节点为负数,说明并发情况下,可能还有资源,需要继续向下唤醒
	// 如果当前新head节点状态为负数,继续向下唤醒
	if (propagate > 0 || h == null || h.waitStatus < 0 ||
	        (h = head) == null || h.waitStatus < 0) {
		// 唤醒当前节点的后继节点
		Node s = node.next;
		if (s == null || s.isShared())
			doReleaseShared();
	}
}

1

7.4 Exchanger

交换两个线程之间的通信。

package com.xqm.thread_study.countdownLantch;

import java.util.concurrent.Exchanger;

/**
 * @ClassName T07_exchanger
 * @Description TODO
 * @Author xqm
 * @Date 2022/6/12 8:00
 * @Version 1.0
 *
 *
 * exchanger  交换器 两个线程之间通信 交换数据
 * 只能两个线程交换 否则会线程阻塞
 */
public class T07_exchanger {
    static Exchanger<String> exchanger=new Exchanger<>();

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            String s = "T1";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);
        }, "t1");

        Thread t2 = new Thread(() -> {
            String s = "T2";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);
        }, "t2");

        Thread t3 = new Thread(() -> {
            String s = "T3";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);
        }, "t3");

        Thread t4 = new Thread(() -> {
            String s = "T4";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);
        }, "t4");
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}