四:阻塞队列
基础概念
1.生产者消费者概念
生产者消费者是设计模式的一种,让生产者消费者通过一个容器来解决强耦合的问题
生产者和消费者之间不会直接通讯,而是通过一个容器(队列)来进行通讯。生产者生产完扔进队列中,消费者直接从容器中获取数据消费。
2.JUC阻塞队列存取方法
package java.util.concurrent;
public interface BlockingQueue<E> extends Queue<E> {}
存储数据
// 添加数据到队列,如果队列满了,那么直接抛出异常
boolean add(E e);
// 添加数据到队列,如果队列满了,返回false
boolean offer(E e);
// 添加数据到队列,如果队列满了,阻塞timeout时间,如果时间到了还在阻塞,那么返回false
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 添加数据到队列,如果队列满了,直接挂起线程,等到队列中有位置,再扔数据进去
void put(E e) throws InterruptedException;
获取数据
// 先进先出,移出元素,如果队列为空,抛异常
boolean remove(Object o);
E remove();
// 从队列中移除数据,如果队列为空,返回null
poll()
// 从队列中移除数据,如果队列为空,挂起线程timeout时间,等生产者生产数据,再移除,
E poll(long timeout, TimeUnit unit) throws InterruptedException;
// 从队列中移除数据,如果队列为空,挂起线程,一直等待有数据
E take() throws InterruptedException;
//
peek()
1.ArrayBlockingQueue
1.1 基本使用
ArrayBlockingQueue在初始化的时候,必须要指定队列的长度,因为底层是基于数组实现的。
package com.xqm.juc.lock.blockQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Test01 {
public static void main(String[] args) throws InterruptedException {
// 底层是数组 final Object[] items;
// this.items = new Object[capacity];
ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue(4);
arrayBlockingQueue.add("1");
arrayBlockingQueue.add("2");
arrayBlockingQueue.add("3");
arrayBlockingQueue.add("4");
// 存第五个会抛出异常,因为队列已经满了
// arrayBlockingQueue.add("5");
// 会返回false,没存进去,队列已满
// arrayBlockingQueue.offer("6");
// 会在两秒后返回false
// arrayBlockingQueue.offer("7",2,TimeUnit.SECONDS);
// 会一直阻塞,直到数据存入为止
// arrayBlockingQueue.put("8");
// 消费者取数据 采用先进先出,取出的是1
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
// 取出不存在的元素,抛异常
// System.out.println(arrayBlockingQueue.remove());
// 返回null
// System.out.println(arrayBlockingQueue.poll());
// 等待2s后返回null
// System.out.println(arrayBlockingQueue.poll(2,TimeUnit.SECONDS));
// 挂起等待,直到有数据才输出
// System.out.println(arrayBlockingQueue.take());
}
}
1.2 基本属性
ArrayBlockingQueue的成员变量
- lock:就是一个ReentrantLock
- count:就是当前数组元素的个数
- items:队列的底层是数组,item就是数组本身
- putIndex:下一个将要添加的数组的索引位置
- takeIndex:下一个要取数据的数组的下标
- notEmpty:消费者挂起线程和唤醒线程用到的Condition
- notFull:生产者挂起线程和唤醒线程用到的Condition
1.3 生产者实现原理
add(E e)
public boolean add(E e) {
return super.add(e);
}
// 走的是offer方法
public boolean add(E e) {
if (offer(e))
return true;
// 如果队列满了,则抛出异常
else
throw new IllegalStateException("Queue full");
}
offer(E e)
// offer方法实现
public boolean offer(E e) {
// 检查e不为null,为null则抛出空指针异常
checkNotNull(e);
// 默认是非公平锁,可以在队列初始化的时候指定为公平锁
final ReentrantLock lock = this.lock;
// 为了保证线程安全,直接加锁
lock.lock();
try {
// count是当前队列中元素的个数,items是队列底层数组
if (count == items.length)
// 如果元素满了,返回false
return false;
else {
// 执行入队操作
enqueue(e);
// 返回true
return true;
}
} finally {
// 释放锁
lock.unlock();
}
}
private static void checkNotNull(Object v) {
if (v == null)
// 元素为null则抛出异常
throw new NullPointerException();
}
// 入队操作
private void enqueue(E x) {
// 拿到数组的引用
final Object[] items = this.items;
// putIndex是数组下一个添加位置的索引
// 这里就是直接添加到数组中
items[putIndex] = x;
// 如果putIndex进行++操作,如果++后的putIndex等于items.length的话,说明数组满了,将putIndex直接置为0
if (++putIndex == items.length)
putIndex = 0;
// count是数组元素的个数,因为新增一个元素,所以++
count++;
// 将一个Condition中阻塞的线程唤醒
notEmpty.signal();
}
offer(E e, long timeout, TimeUnit unit)
生产者添加数据时,如果发现队列已满,就会进行阻塞
- 阻塞到消费者消费数据,就会唤醒当前阻塞线程
- 阻塞到time时间到达,再次尝试是否可以添加,如果不能,就返回false
// 带有时间单位的offer
// 如果线程在挂起的时候,对当前阻塞的线程进行中断标记位进行设置,当前线程会抛出异常
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 检查输入的e是否为null
checkNotNull(e);
// 将单位转换成纳秒
long nanos = unit.toNanos(timeout);
// 获取当前的锁
final ReentrantLock lock = this.lock;
// 使用可中断锁来加锁,允许线程中断并抛出异常
lock.lockInterruptibly();
try {
// 检查队列是否已满。
// 使用while是为了解决虚假唤醒
while (count == items.length) {
// 等待时间是否小于0
if (nanos <= 0)
// 直接失败
return false;
// 调用生产者Condition,进行线程休眠,返回剩余时间
// 挂起等待,同时会释放锁资源(类似sync的wait)
// 被唤醒后,还得重新竞争锁资源,得到锁才能进行添加
nanos = notFull.awaitNanos(nanos);
}
// 和offer(E e)一样入队操作
enqueue(e);
return true;
} finally {
// 释放锁
lock.unlock();
}
}
put(E e)
// put方法,一直挂起,直到被唤醒或中断
public void put(E e) throws InterruptedException {
// 非空校验
checkNotNull(e);
// 获取当前的锁
final ReentrantLock lock = this.lock;
// 使用可中断锁来加锁,允许线程中断并抛出异常
lock.lockInterruptibly();
try {
// 检查队列是否已满。
while (count == items.length)
// 一直阻塞,直到被唤醒或者中断
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
1.4 消费者实现原理
remove()
使用的就是poll()方法
// remove()
public E remove() {
// remove()方法就是调用poll()方法
// poll()如果有数据,返回数据,否则返回null
E x = poll();
if (x != null)
return x;
else
// 如果poll()返回null,直接抛异常
throw new NoSuchElementException();
}
poll()
public E poll() {
// 获得锁
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 如果队列数据为0,返回null,否则出队操作
return (count == 0) ? null : dequeue();
} finally {
// 释放锁
lock.unlock();
}
}
// 取出数据
private E dequeue() {
// 获取数组的引用
final Object[] items = this.items;
// takeIndex是数组取数据的下标,获取数据
E x = (E) items[takeIndex];
// 把取出的数据的位置置为null
items[takeIndex] = null;
// 设置下次取数据的索引位置
if (++takeIndex == items.length)
// 如果满了,就从0开始
takeIndex = 0;
// 元素总数--
count--;
// 迭代器内容
if (itrs != null)
itrs.elementDequeued();
// 唤醒当前Condition中排队的一个生产者线程
// signalAll()是唤醒所有
notFull.signal();
// 返回数据
return x;
}
poll(long timeout, TimeUnit unit)
// poll()带有时间单位
poll(long timeout, TimeUnit unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 转化时间单位
long nanos = unit.toNanos(timeout);
// 获取锁
final ReentrantLock lock = this.lock;
// 加锁(等待时间到或者被中断)
lock.lockInterruptibly();
try {
// 如果没有数据
while (count == 0) {
// 时间到,没数据,返回null
if (nanos <= 0)
return null;
// 线程休眠,等待被唤醒或者中断
nanos = notEmpty.awaitNanos(nanos);
}
// 取数据
return dequeue();
} finally {
lock.unlock();
}
}
take()
public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
// 如果没数据
while (count == 0)
// 一直阻塞
notEmpty.await();
// 返回取出的数据
return dequeue();
} finally {
// 释放锁
lock.unlock();
}
}
虚假唤醒
阻塞队列中,如果需要线程挂起操作,判断有无数据的位置使用while循环,为什么不能换成if?
一开始有线程A、线程B、线程C。线程A和B是生产者,线程C是消费者
while (count == items.length)
// A挂起
// B挂起
notFull.await();
enqueue(e);
此时C消费了一个数据 ,唤醒了A,同时线程D(生产者)也来了
// 线程D来了,获取到锁资源,但是还没走while循环
while (count == items.length)
// A唤醒,但是只是从wait到AQS队列中
// B挂起
notFull.await();
enqueue(e);
此时D判断,有位置,可以添加数据到队列,走enqueue(e)。
如果判断是if的话,A在E释放锁资源后,拿到锁资源,不会再次判断队列是否已满,而是直接增添数据,这样就会覆盖数据,出现安全问题。
如果使用while,线程A被唤醒,然后继续走while循环,然后又进行挂起,这就叫虚假唤醒,实际什么都没做。
2.LinkedBlockingQueue
和ArrayBlockingQueue类似,只不过是用链表实现的。无界队列(其实是Integer.MAX_VALUE,只不过一般达不到)
2.1 底层实现
无参构造
// 无参构造
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
有参构造
// 有参构造
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化时,会构建一个没有意义的头和尾
last = head = new Node<E>(null);
}
存储数据的节点
// Node对象就是存储数据的单位
static class Node<E> {
// 存储的数据
E item;
// 指向下一个数据的指针
Node<E> next;
// 有参构造
Node(E x) { item = x; }
}
其他属性
-
// 因为是链表,没有数组的length属性,基于AtomicInteger来记录长度 private final AtomicInteger count = new AtomicInteger();
-
// 链表的头节点,用来取数据 transient Node<E> head;
-
// 链表的尾节点,用来存数据 private transient Node<E> last;
- 锁:ArrayBlockingQueue只有一把锁,因为在一个数组中有序的进行生产和消费,生产和消费不能同时,因为索引的下标会乱套。但是链表是连续的地址空间,消费前面的数据,和生产后面的数据完全不影响。
// 消费者的锁
private final ReentrantLock takeLock = new ReentrantLock();
// 消费者的挂起和唤醒操作
private final Condition notEmpty = takeLock.newCondition();
// 生产者的锁
private final ReentrantLock putLock = new ReentrantLock();
// 生产者的挂起和唤醒操作
private final Condition notFull = putLock.newCondition();
2.2 生产者实现原理
add(E e)
还是走的offer方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer(E e)
public boolean offer(E e) {
// 非空校验
if (e == null) throw new NullPointerException();
// 把成员变量单独引用出来,拿到存储条数
final AtomicInteger count = this.count;
// 查看当前条数是否等于限制长度
if (count.get() == capacity)
// 到达最大长度
return false;
// 声明一个c变量
int c = -1;
// 将要存储的对象封装成一个node
Node<E> node = new Node<E>(e);
// 获取put锁
final ReentrantLock putLock = this.putLock;
// 竞争锁资源
putLock.lock();
try {
// 再次做判断,查看是否有空间,防止多线程
if (count.get() < capacity) {
// 入队操作
enqueue(node);
// 先把count赋值给c,然后数据总个数count++
c = count.getAndIncrement();
// 查看是否还有空间。c+1小于长度限制
if (c + 1 < capacity)
// 唤醒生产者
// 有生产者基于await挂起,这里添加完数据后,发现还有空间可以存储数据
// 唤醒前面可能挂起的生产者
// 因为这里的生产者和消费者不是互斥的,写操作同时,可能消费者也在消费
notFull.signal();
}
} finally {
// 释放锁资源
putLock.unlock();
}
// c==0,代表添加数据之前,队列元素的个数是0
// 如果有消费者在队列没有数据的时候,一定处于挂起的状态
if (c == 0)
// 唤醒消费者
signalNotEmpty();
// 返回c是不是大于等于0,添加成功返回true,添加失败c=-1返回false
return c >= 0;
}
// 将新增的数据往后放
private void enqueue(Node<E> node) {
// 将当前数据设置为last.next,并且设置为last
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
// 获取读锁,只有获取读锁才能进行唤醒读锁,和synchronized的wait、notify差不多
takeLock.lock();
try {
// 唤醒读锁
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 非空校验
if (e == null) throw new NullPointerException();
// 将时间转化为纳秒
long nanos = unit.toNanos(timeout);
// c作为返回的标记
int c = -1;
// 写锁
final ReentrantLock putLock = this.putLock;
// 总数
final AtomicInteger count = this.count;
// 允许中断的加锁方式
putLock.lockInterruptibly();
try {
// 如果元素个数到达最大
while (count.get() == capacity) {
// 时间不够,直接返回false,添加失败
if (nanos <= 0)
return false;
// 挂起线程,返回剩余的时间
nanos = notFull.awaitNanos(nanos);
}
// 加入队列
enqueue(new Node<E>(e));
// 元素个数加一
c = count.getAndIncrement();
// 唤醒生产者
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
if (c == 0)
// 唤醒消费者
signalNotEmpty();
return true;
}
put(E e)
和offer()很相似,只不过会一直阻塞
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
// 一直阻塞
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
2.3 消费者实现原理
remove()
使用的是poll()方法
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
poll()
public E poll() {
// 拿到队列长度的计数器
final AtomicInteger count = this.count;
// 如果队列数据为0
if (count.get() == 0)
// 返回null
return null;
// 队列中有数据.声明返回接口
E x = null;
// 返回标记
int c = -1;
// 获取消费锁
final ReentrantLock takeLock = this.takeLock;
// 抢夺锁资源
takeLock.lock();
try {
// 再次判断,如果队列长度大于0
if (count.get() > 0) {
// 从队列中移除数据
x = dequeue();
// 将count赋值给c,然后--
c = count.getAndDecrement();
// 如果之前元素个数大于1,减去1之后,说明还剩至少1个
if (c > 1)
// 如果依然有数据,继续唤醒一个消费者
notEmpty.signal();
}
} finally {
// 释放锁
takeLock.unlock();
}
// c是之前的元素个数,如果为当前队列的限制
// 说明有空余的位置,可以继续添加
if (c == capacity)
// 唤醒阻塞的生产者
signalNotFull();
// 如果没有数据,返回null
return x;
}
// 移除数据
private E dequeue() {
// head.next节点,就是需要移除的数据
// 这里先拿head
Node<E> h = head;
// head的next节点
Node<E> first = h.next;
h.next = h; // help GC
// head.next就是新的head
head = first;
// 返回的值,head.next.item
E x = first.item;
// 设置head.next.item为null,成为新的head
first.item = null;
return x;
}
// 先拿到生产者锁线程,然后唤醒生产者
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
poll(E,Unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 返回结果
E x = null;
// 表示
int c = -1;
// 转成纳秒
long nanos = unit.toNanos(timeout);
// 拿到计数器
final AtomicInteger count = this.count;
// 拿到消费者锁
final ReentrantLock takeLock = this.takeLock;
// 可以中断的锁
takeLock.lockInterruptibly();
try {
// 如果队列中没数据
while (count.get() == 0) {
if (nanos <= 0)
return null;
// 挂起当前线程,并获取剩余时间
nanos = notEmpty.awaitNanos(nanos);
}
// 数据出队
x = dequeue();
// count--
c = count.getAndDecrement();
if (c > 1)
// 唤醒消费者
notEmpty.signal();
} finally {
// 释放锁
takeLock.unlock();
}
if (c == capacity)
// 唤醒生产者
signalNotFull();
return x;
}
take
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 相比poll,这里的出口只有一个,中断标记为,抛出异常,否则一直等待
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
3.PriorityBlockingQueue
3.1介绍
优先级阻塞队列。不满足先进先出的概念。
会将插入的数据进行排序,排序的方式就是基于插入数据值的本身。
排序的方式是基于二叉堆的方式实现的。底层实现的是数据结构中的二叉堆。
案例
package com.xqm.juc.lock.priorityBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
public class Test01 {
public static void main(String[] args) {
PriorityBlockingQueue queue=new PriorityBlockingQueue();
queue.add(1);
queue.add(2);
queue.add(5);
queue.add(3);
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
// 输出的结果为: 1 2 3 5
}
}
3.2二叉堆
3.2.1结构介绍
排序的方式是基于二叉堆的方式实现的。
优先级队列表示为平衡二叉堆:queue[n] 的两个子节点是 queue[2*n+1] 和 queue[2*(n+1)]。
优先级队列按比较器排序,或者按元素的自然排序,
如果比较器为空:对于堆中的每个节点 n 和 n 的每个后代 d,n <= d。
最低值的元素在 queue[0] 中,假设队列是非空的。
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
private transient Object[] queue;
PriorityBlockingQueue是基于数组实现的二叉堆。
3.2.2特点
1.二叉堆就是一个完整的二叉树。
2.任意一个节点大于其父节点,称为小顶堆,任意一个节点小于其父节点,称为大顶堆。
二叉堆:
3.3 PriorityBlockingQueue的核心属性
default_initial_capacity
记录数组的初始长度,初始长度默认为11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
max_array_size
数组的最大长度。
减8的目的是为了适配各个版本的虚拟机
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
默认当前使用的Hotspot虚拟机最大支持Integer.MAX_VALUE-2,但是其他版本的虚拟机不一定。
int[] a=new int[Integer.MAX_VALUE]
// 结果
Exception in thread "main" java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at com.xqm.juc.lock.priorityBlockingQueue.Test01.main(Test01.java:9)
如果改为-2,那么只需要设置堆的大小,JVM是可以支持那么大的数组的。
int[] a=new int[Integer.MAX_VALUE-2];
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at com.xqm.juc.lock.priorityBlockingQueue.Test01.main(Test01.java:9)
queue
存储数据的数组,基于这个数组存储的二叉堆
private transient Object[] queue;
size
记录当前优先级阻塞队列中元素的个数
private transient int size;
comparator
要求使用的对象使用comparable比较器,基于comparator来确定对象的大小。
private transient Comparator<? super E> comparator;
lock
实现阻塞队列的锁。
private final ReentrantLock lock;
notEmpty
挂起线程的操作。队列为空时阻塞的条件。
private final Condition notEmpty;
allocationSpinLock
用于分配的自旋锁,通过 CAS 获取。如果为0,说明没有线程正在扩容操作。
因为数组的长度是固定的,因此数组扩容时,需要构建新数组,而且同时要迁移数据。因此PriorityBlockingQueue在扩容操作时,不会lock住扩容操作,而是释放lock锁,使用allocationSpinLock属性来进行标记,避免出现并发扩容的问题。为了提升效率。
private transient volatile int allocationSpinLock;
q
阻塞队列中,用到的原理,其实就是普通的优先级队列。
private PriorityQueue<E> q;
3.4 写操作源码分析
阻塞队列,添加的操作还是add,offer,offer(time,unit),put等。
因为优先级队列中,数组是可以扩容的,因此数组虽然也有长度的限制,但是还是属于无界队列,所以生产者不会阻塞,只有offer操作才能查看。
核心的内容还是如果保证二叉堆中小顶堆的结构,以及数组是如何扩容的。
add
add方法走的还是offer方法
public boolean add(E e) {
return offer(e);
}
offer
public boolean offer(E e) {
// 非空判断
if (e == null)
throw new NullPointerException();
// 拿到锁之后,直接上锁
final ReentrantLock lock = this.lock;
lock.lock();
// n:当前数组的元素个数
// cap:就是优先级队列这个数组的长度
int n, cap;
// array:优先级队列这个数组
Object[] array;
// 如果当前数组的元素个数大于数组的长度
// 使用while为了防止虚假唤醒
while ((n = size) >= (cap = (array = queue).length))
// 尝试扩容
tryGrow(array, cap);
// 到这说明不需要扩容,那就进行新增操作
try {
// cmp:设置为比较器
Comparator<? super E> cmp = comparator;
// 基于比较器是否为空进行判断
// 如果自己没设置比较器,那比较器是个属性,默认为null
if (cmp == null)
// 比较数据大小,存储数据,查看是否需要上移操作,保证平衡堆
siftUpComparable(n, e, array);
// 如果自己设置了比较器
else
siftUpUsingComparator(n, e, array, cmp);
// 证明数据已经存储成功
// 数组数据长度+1
size = n + 1;
// 如果有挂起的线程,需要唤醒挂起的消费者
// 因为会一直扩容,直到MAX长度,实际相当于无界队列
notEmpty.signal();
// 释放锁
} finally {
lock.unlock();
}
return true;
}
tryGrow
tryGrow是用来扩容的方法.
如果有两个线程同时执行tryGrow方法,那么只会有一个线程进行扩容操作。另一个线程可能多次走tryGrow方法,但是依然需要等待前面的线程扩容完毕。
// 扩容方法,在添加数据之前,会采用while的方法来判断是否需要扩容
private void tryGrow(Object[] array, int oldCap) {
// 释放锁。读和写操作都是同一把锁,扩容的时候防止读和写阻塞,必须先释放锁
lock.unlock(); // must release and then re-acquire main lock
// 声明新数组
Object[] newArray = null;
// 如果allocationSpinLock=0代表没有线程正在扩容
if (allocationSpinLock == 0 &&
// 基于CAS的方式,将allocationSpinLock设置为1,代表可以开始扩容
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 计算新数组的长度
// 如果老数组长度小于64,那么就扩容成2*oldCap+2(为了加快扩容的速度),否则就扩容成1.5*oldCap
int newCap = oldCap + ((oldCap < 64) ?
// 如果有人玩反射,将数组的长度设置为0,如果最后不加2,会出现问题
(oldCap + 2) : // grow faster if small
// 扩容到1.5倍
(oldCap >> 1));
// 如果新数组长度超过定义的最大数组长度
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
// 声明minCap数组为老数组+1
int minCap = oldCap + 1;
// 老数组长度已经大于最大长度,直接抛出异常
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
// 如果老数组没有达到最大长度,直接将新数组设置为最大长度
newCap = MAX_ARRAY_SIZE;
}
// 如果新数组长度大于老数组长度 并且队列没有改变
// 第二个判断确保没有并发扩容的出现
if (newCap > oldCap && queue == array)
// 设置新数组
newArray = new Object[newCap];
} finally {
// 新数组有了,直接标记位归零
allocationSpinLock = 0;
}
}
// 新数组没有构建成功
if (newArray == null) // back off if another thread is allocating
// 将当前线程时间片交出去
Thread.yield();
// 拿到锁资源
lock.lock();
// 将新数组赋值给queue
if (newArray != null && queue == array) {
// 数组赋值,实际上是地址指向改变
queue = newArray;
// 将老数组的数据导入到新数组
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
siftUpComparable
数据放到数组中,同时保持二叉堆的结构。
// k是当前数组的元素个数,也就是添加元素的索引位置
// x是存入的数据
// array就是原数组
private static <T> void siftUpComparable(int k, T x, Object[] array) {
// 将插入的元素强转为Comparable
// 强转可能会导致异常
Comparable<? super T> key = (Comparable<? super T>) x;
// 如果原来有数组
while (k > 0) {
// 获取父节点的索引位置
int parent = (k - 1) >>> 1;
// 拿到父节点的值
Object e = array[parent];
// 用子节点compareTo父节点,如果大于0,说明直接插入即可
if (key.compareTo((T) e) >= 0)
break;
// 到这说明新插入比父节点小
// 将父节点放到最新的位置
array[k] = e;
// k赋值为父节点的位置
k = parent;
}
// 走这里代表原来数组没有数据,直接放在第一个
array[k] = key;
}
实例:假如插入13
第一步:
第二步:
第三步:
第四步:
offer(time,unit)
offer加上阻塞时间实际上还是调用的offer函数,因为是无限队列,可以自动扩容,不需要阻塞。
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}
put
put调用的也是offer方法
public void put(E e) {
offer(e); // never need to block
}
3.5 读操作源码分析
读操作是会出现线程挂起的情况,如果数组中元素个数为0,执行的是take方法,那么就会出现线程挂起的情况。
其次获取数据,由于是优先级队列,因此每次拿索引为0的位置的数据,每次拿完数据后,要保证二叉堆的结构,就会有下移操作。
主要有poll()、poll(time.unit)、take()等方法。
poll
// 读取数据-poll方法
public E poll() {
// 拿到锁
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 拿到返回值,没拿到返回null
return dequeue();
} finally {
// 释放锁
lock.unlock();
}
}
poll(time,unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 转换为纳秒
long nanos = unit.toNanos(timeout);
// 拿到锁
final ReentrantLock lock = this.lock;
// 上锁
lock.lockInterruptibly();
// 定义结果变量
E result;
try {
// 如果没有结果并且时间还有
while ( (result = dequeue()) == null && nanos > 0)
// 进行阻塞
nanos = notEmpty.awaitNanos(nanos);
} finally {
// 释放锁
lock.unlock();
}
// 返回结果
return result;
}
take
public E take() throws InterruptedException {
// 拿到锁
final ReentrantLock lock = this.lock;
// 上锁
lock.lockInterruptibly();
// 结果变量
E result;
try {
// 如果一直拿不到锁,就进行阻塞
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
// 释放锁
lock.unlock();
}
// 返回结果
return result;
}
dequeue
dequeue是出队操作。
// 出队列操作,拿到数组中索引为0位置的数据
private E dequeue() {
// 定义一个变量n,表示数组中元素个数-1
int n = size - 1;
// 如果小于0,表示数组没数据
if (n < 0)
// 没有数据直接返回null
return null;
// 说明拿到数据
else {
// array定义临时变量获取queue中数据
Object[] array = queue;
// 拿到零索引位置的数据
E result = (E) array[0];
// 拿到最后一个位置数据
E x = (E) array[n];
// 最后一个位置置为null
array[n] = null;
// 拿到比较器
Comparator<? super E> cmp = comparator;
// 移除数据并保证小顶堆数据结构
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
// 元素个数减一
size = n;
// 返回result
return result;
}
}
siftDownComparable
siftDownComparable是出队后保证小顶堆的结构。
// k=0,x是数组/二叉堆最后一个元素,array是数组,n是移除数据后的数组元素个数
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
// 取完第一个数据后,数组还有数据,需要做平衡堆操作
if (n > 0) {
// 拿到最后一个数据的比较器
Comparable<? super T> key = (Comparable<? super T>)x;
// 因为二叉堆是一个满树结构,所以在保证二叉堆结构时,只需要循环一半就行
int half = n >>> 1; // loop while a non-leaf
// 做超过一半就不需要继续往下,已经到头了
while (k < half) {
// 找k的左子节点
int child = (k << 1) + 1; // assume left child is least
// c是左子节点的值
Object c = array[child];
// 右子节点
int right = child + 1;
// 确定有右节点,右子节点小于元素个数
if (right < n &&
// 左子节点的值大于右子节点,根据小顶堆,应该右子节点在上
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
// 如果左子节点大于最后一个数据,直接退出循环,让最后一个节点数据覆盖左子节点的父节点
if (key.compareTo((T) c) <= 0)
break;
// 如果左子节点小于右子节点,左子节点的值就覆盖父节点,否则就右子节点就覆盖父节点
array[k] = c;
k = child;
}
// 填充最后一个节点的数据
array[k] = key;
}
}
演示:
第一步:
第二步:
第三步:
第四步:
其实就是循环拿到父节点的左子节点和右子节点,左子节点和右子节点值小的那个,和最后一个数据进行对比,小的值进行覆盖父节点,直到最后循环一半之后(也就是层数),得到最后的平衡二叉堆。
4.DelayQueue
4.1 延迟队列介绍
1.延迟队列也是基于二叉堆实现的,和优先级队列不同的是,优先级队列是基于值的大小进行排列,而延迟队列是基于延迟时间进行排列。
2.延迟队列生产一个消息,这个消息存在被消费的延迟时间。
3.DelayQueue底层实现的是PriorityQueue,实际上数组也是可以自动扩容的,可以当作是无限队列。
4.2 延迟队列的应用测试
4.2.1 延迟队列的类
// 延迟队列,里面的泛型继承Delayed接口,必须实现getDelay这个方法和compareTo方法
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {}
// Delayed接口 继承Comparable类,具有比较的能力
public interface Delayed extends Comparable<Delayed> {
// 这里的抽象方法就是需要设置的延迟时间
long getDelay(TimeUnit unit);
}
// Comparable实现了比较
public interface Comparable<T> {
public int compareTo(T o);
}
4.2.2 自定义任务
自定义要放入延迟队列的Task类
package com.xqm.juc.lock.delayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class Task implements Delayed {
/**
* 为了区分每个任务的不同,设置name
*/
private String name;
/**
* 执行时间
*/
private Long time;
public Task(String name, Long time) {
this.name = name;
this.time = System.currentTimeMillis()+time;
}
/**
* 设置任务什么时候能出延迟队列
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
// 小于等于0,就可以执行任务
return unit.convert(time-System.currentTimeMillis(),TimeUnit.NANOSECONDS);
}
/**
* 比较两个任务的时间
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.time-((Task)o).getTime());
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
@Override
public String toString() {
return "Task{" +
"name='" + name + '\'' +
", time=" + time +
'}';
}
}
4.2.3 测试延迟队列
分别在1秒,3秒,5秒,7秒输出。
package com.xqm.juc.lock.delayQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
public class Test01<T extends Delayed> {
public T data = null;
public static void main(String[] args) throws InterruptedException {
DelayQueue<Task> queue = new DelayQueue();
Task task1 = new Task("任务1", 1000L);
Task task2 = new Task("任务2", 5000L);
Task task3 = new Task("任务3", 3000L);
Task task4 = new Task("任务4", 7000L);
queue.put(task1);
queue.put(task2);
queue.put(task3);
queue.put(task4);
// 结果
// Task{name='任务1', time=1663229491569}
// Task{name='任务3', time=1663229493569}
// Task{name='任务2', time=1663229495569}
// Task{name='任务4', time=1663229497569}
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
}
}
4.2.4 延迟队列应用
比如点外卖,15分钟商家接单,如果不接单,这个订单就会取消。每下一个订单,就会放到延迟队列中,如果规定时间内,商家没接单,直接通过消费者获取元素,取消订单。
4.3 延迟队列的核心属性
DelayQueue只有四个核心属性。
// DelayQueue属于阻塞队列,需要加锁保证线程安全。生产者和消费者使用的是同一把锁
private final transient ReentrantLock lock = new ReentrantLock();
// 基于二叉堆实现的,因此底层使用的优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// leader一般存储等待堆顶数据的消费者线程
// 在整体写入和消费的过程中,会设置leader的一些判断
private Thread leader = null;
// 生产者在插入数据时,不会进行阻塞,当前的Condition是给消费者使用的。
// 消费者在消费数据时,发现堆顶的数据还没到延迟时间
// 因此堆顶元素到达延迟时间或者生产者元素到了堆顶的时候,消费者会进行阻塞
private final Condition available = lock.newCondition();
4.4 延迟队列的写入操作
延迟队列是无界的,写入操作会自动进行扩容。不需要关注生产者的阻塞问题,生产者是不会阻塞。
4.4.1 add
add方法是使用offer.
public boolean add(E e) {
return offer(e);
}
4.4.2 put
put方法也是使用的offer。
public void put(E e) {
offer(e);
}
4.4.3 offer(e,time,unit)
offer方法带有阻塞时间的,实际上也是offer方法。
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
4.4.4 offer-延迟队列
// 延迟队列插入方法
public boolean offer(E e) {
// 获取lock
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 调用优先级队列的插入操作
q.offer(e);
// 拿到队列的第一个元素,判断是否是刚刚插入的元素
if (q.peek() == e) {
// leader赋值为null
leader = null;
// condition的唤醒方法,唤醒消费者准备等待最新插入数据,进行消费
// 因为之前消费者肯定在等待堆其他数据。如果插入新的数据变成堆顶,那么消费者就需要改变等待的数据
available.signal();
}
// 插入成功
return true;
} finally {
// 释放锁
lock.unlock();
}
}
4.4.5 offer-优先级队列
// 优先级队列的插入操作
public boolean offer(E e) {
// 如果插入元素为null,直接抛出异常
if (e == null)
throw new NullPointerException();
// 优先级队列结构修改的次数+1
modCount++;
// 队列数据设置为i
int i = size;
// 如果大于等于数组长度
if (i >= queue.length)
// 进行扩容
// 如果小于64,扩容成2倍+2,否则扩容成1.5倍
grow(i + 1);
// 数组元素+1
size = i + 1;
// 如果数组之前没数据,那就插入到第一个
if (i == 0)
queue[0] = e;
else
// 否则进行堆结构改变,和优先级队列一样
siftUp(i, e);
return true;
}
优先级队列的peek方法
// 优先级队列的peek操作,取出堆顶的元素,如果没有返回null
public E peek() {
return (size == 0) ? null : (E) queue[0];
}
4.5 延迟队列的读取操作
消费者存在阻塞的情况,有两种情况:
- 消费者要拿堆顶的数据 ,但是延迟时间还没到,因此消费者需要等待。
- 消费者要来拿数据,发现已经有消费者在等待堆顶数据,这个后来的消费者也需要等待。
4.5.1 remove方法
remove方法,如果堆顶有元素,直接返回元素,否则抛出异常。
调用的还是抽象队列AbstractQueue中的poll()方法。
public E remove() {
// 调用poll方法
E x = poll();
if (x != null)
// 拿到值则返回,否则抛出异常
return x;
else
throw new NoSuchElementException();
}
4.5.2 poll方法
poll方法拿到数据则直接返回,否则返回null。不会进行阻塞。
// poll是尝试获取数据,能拿到就拿,拿不到就直接返回null
public E poll() {
// 获取锁
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 获取堆顶的元素
E first = q.peek();
// 如果栈顶为null或者元素的延迟时间大于0,任务还在延时
if (first == null || first.getDelay(NANOSECONDS) > 0)
// 拿不到就直接返回null
return null;
else
// 拿到堆顶元素
return q.poll();
} finally {
// 释放锁
lock.unlock();
}
}
4.5.3 poll(time.unit)方法
poll(time.unit)方法拿到数据直接返回,否则在time内一直尝试拿数据,进行阻塞,到时间拿不到数据则返回null。
// 带有阻塞时间的poll方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 将时间转换为纳秒值
long nanos = unit.toNanos(timeout);
// 获取锁
final ReentrantLock lock = this.lock;
// 锁可以被中断
lock.lockInterruptibly();
try {
// 死循环
for (;;) {
// 查询堆顶的元素
E first = q.peek();
// 如果堆顶的元素为null
if (first == null) {
// 如果阻塞时间已过还拿不到数据
if (nanos <= 0)
// 返回null
return null;
else
// 挂起线程,说明队列中没有元素
nanos = available.awaitNanos(nanos);
// 堆顶有元素的话
} else {
// 查询堆顶元素的延迟时间
long delay = first.getDelay(NANOSECONDS);
// 如果延迟时间小于0,也就是可以拿到数据
if (delay <= 0)
// 直接返回拿到的元素
return q.poll();
// 到下面说明堆顶元素延迟时间没到
// 查询阻塞时间是否已到
if (nanos <= 0)
// 阻塞时间已到还没拿到数据,直接返回null
return null;
// first赋值为null
first = null; // don't retain ref while waiting
// 如果指定阻塞时间小于堆顶元素的延迟时间,也就是在没有新元素插入进来的前提下,是拿不到数据的
// 如果leader不为null,说明有消费者已经在等待了
if (nanos < delay || leader != null)
// 进行阻塞
nanos = available.awaitNanos(nanos);
// 到这说明没有消费者在等待,并且阻塞时间大于堆顶元素
else {
// 获取当前线程
Thread thisThread = Thread.currentThread();
// 当前的leader为当前线程
leader = thisThread;
try {
// 阻塞,修改线程阻塞时间
// 让当前消费者阻塞堆顶元素的延迟时间
long timeLeft = available.awaitNanos(delay);
// 重新计算当前消费者可阻塞时间
nanos -= delay - timeLeft;
} finally {
// 如果当前线程是等待的消费者,将leader置为null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 没有消费者等待元素 && 队列中的元素不为null
if (leader == null && q.peek() != null)
// 只要当前没有leader在等待,并且元素不为null,再次唤醒消费者
// 避免队列有元素,但是没有消费者的情况.
// 因为当前消费者执行完之后,其他消费者还在阻塞中
available.signal();
// 释放锁
lock.unlock();
}
}
4.5.4 take方法
take方法和带有时间的poll方法类似,差别在于poll方法是指定阻塞时间,而take则是一直在阻塞,直到拿到数据或者被中断为止。
public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 死循环
for (;;) {
// 查询队列首个元素
E first = q.peek();
// 堆顶没有元素
if (first == null)
// 进行阻塞,挂起等待,直到被唤醒为止
available.await();
// 到这说明堆顶有元素
else {
// 获取堆顶元素的延迟时间
long delay = first.getDelay(NANOSECONDS);
// 如果延迟时间小于0,说明可以获取元素了
if (delay <= 0)
// 获取元素并返回
return q.poll();
// 将first置为null
first = null; // don't retain ref while waiting
// leader不为null,说明有消费者正在等数据
if (leader != null)
// 进行挂起
available.await();
// leader为null说明没有消费者在等待
else {
// 获取当前线程
Thread thisThread = Thread.currentThread();
// leader为当前线程
leader = thisThread;
try {
// 将消费者等待时间置为延迟时间
available.awaitNanos(delay);
} finally {
// 正在等待的消费者是等待线程
if (leader == thisThread)
// 将leader置为null
leader = null;
}
}
}
}
} finally {
// 没有消费者在等待,并且队列中有元素
if (leader == null && q.peek() != null)
// 唤醒消费者
available.signal();
// 释放锁
lock.unlock();
}
}
5. SynchronousQueue
5.1 SynchronousQueue介绍
SynchronousQueue阻塞队列和其他阻塞队列有很大区别。
在概念中,队列肯定是要存储数据的,但是SynchronousQueue阻塞队列不会存储数据。
SynchronousQueue不存储数据,只会存储生产者或者消费者。当存储一个生产者到SynchronousQueue队列中之后,生产者会阻塞(比如调用put方法的时候)。
生产者最终会有几种结果:
- 如果在阻塞期间有消费者来匹配,生产者就会将绑定的消息交给消费者
- 生产者等到阻塞结束或者不允许阻塞,那么就直接失败
- 生产者在阻塞期间,如果线程中断,那么也直接失败
同理,消费者和生产者的效果是一样的。
生产者和消费者的数据是直接传递的,不会经过SynchronousQueue。
生产者和消费者的方法:
生产者:
- add()
- offer():生产者在放到SynchronousQueue的同时,如果有消费者在等待消息,直接配对。如果没有消费者在等待数据,直接返回,数据没有给出去。
- offer(time,unit):生产者在放到SynchronousQueue的同时,如果有消费者在等待消息,直接配对。如果没有消费者在等待数据,阻塞time时间,如果还没有消费者,直接返回,数据没有给出去。
- put():生产者在放到SynchronousQueue的同时,如果有消费者在等待消息,直接配对。如果没有消费者在等待数据,一直阻塞,直到等到消费者,
消费者:道理和上面生产者一样
- remove()
- poll()
- poll(time,unit)
- take()
5.2 SynchronousQueue例子
5.2.1 没有消费者拿数据
没有消费者拿数据,直接返回false。
package com.xqm.juc.lock.synchronousQueue;
import java.util.concurrent.SynchronousQueue;
public class Test {
public static void main(String[] args) {
// 因为当前队列不存储数据,因此没有长度概念
// 里面存储的生产者和消费者是链表的结构形式存储
SynchronousQueue queue=new SynchronousQueue();
String msg="消息";
new Thread(()->{
boolean offer = queue.offer(msg);
// 返回false,代表没有消费者拿
System.out.println(offer);
},"Thread1").start();
}
}
5.2.2 消费者拿到数据
package com.xqm.juc.lock.synchronousQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class Test01 {
public static void main(String[] args) {
// 因为当前队列不存储数据,因此没有长度概念
// 里面存储的生产者和消费者是链表的结构形式存储
SynchronousQueue queue=new SynchronousQueue();
String msg="消息";
new Thread(()->{
boolean offer = false;
try {
offer = queue.offer(msg,10,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 返回true,代表消费者拿到数据
System.out.println(offer);
},"Thread1").start();
new Thread(()->{
// 打印出消息
System.out.println(queue.remove());
}).start();
}
}
5.3 SynchronousQueue核心属性
进入SynchronousQueue类的内部后,发现一个静态内部类Transferer,Transferer类提供了一个transfer方法。
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
当前这个类中提供的transfer方法,就是生产者和消费者在读写数据时要用到的核心方法
生产者在调用transfer方法时,第一个参数e会正常传递数据
消费者在调用transfer方法时,第一个参数e会传递null
SynchronousQueue针对抽象类Transferer做了两种实现:
static final class TransferStack<E> extends Transferer<E> {}
static final class TransferQueue<E> extends Transferer<E> {}
这两种类继承了Transferer抽象内部类,在构建SynchronousQueue时,会指定使用哪种实现类。
// 到底采用哪种实现方式,需要把对应的对象存放到这个属性中
private transient volatile Transferer<E> transferer;
// 无参构造
public SynchronousQueue() {
// 无参构造时,会再次调用有参构造方法
this(false);
}
// 有参构造,fair为公平,采用Queue,如果不公平,采用Stack
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
TransferQueue是先进先出的概念,被称为公平的效果。
TransferStack是先进后出的概念,先入栈的生产者或消费者不是先出栈的,后入栈的先匹配。
测试:
设置fair为false,生产者的顺序为消息1、消息2、消息3,打印的结果为消息3、消息2、消息1。使用的是栈。
package com.xqm.juc.lock.synchronousQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class Test02 {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue queue=new SynchronousQueue(false);
String msg="消息";
new Thread(()->{
try {
queue.put("消息1");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Thread1").start();
new Thread(()->{
try {
queue.put("消息2");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Thread2").start();
new Thread(()->{
try {
queue.put("消息3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Thread3").start();
Thread.sleep(200);
new Thread(()->{
System.out.println(queue.poll());
}).start();
Thread.sleep(200);
new Thread(()->{
System.out.println(queue.poll());
}).start();
Thread.sleep(200);
new Thread(()->{
System.out.println(queue.poll());
}).start();
}
}
//结果
消息3
消息2
消息1
设置fair为true,生产者的顺序为消息1、消息2、消息3,打印的结果为消息3、消息2、消息1。使用的是队列。
package com.xqm.juc.lock.synchronousQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class Test02 {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue queue=new SynchronousQueue(true);
String msg="消息";
new Thread(()->{
try {
queue.put("消息1");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Thread1").start();
new Thread(()->{
try {
queue.put("消息2");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Thread2").start();
new Thread(()->{
try {
queue.put("消息3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Thread3").start();
Thread.sleep(200);
new Thread(()->{
System.out.println(queue.poll());
}).start();
Thread.sleep(200);
new Thread(()->{
System.out.println(queue.poll());
}).start();
Thread.sleep(200);
new Thread(()->{
System.out.println(queue.poll());
}).start();
}
}
//结果
消息1
消息2
消息3
5.4 SynchronousQueue的TransferQueue源码
5.4.1 QNode
队列和栈中的节点信息。
static final class QNode {
// 当前节点的next节点
volatile QNode next;
// 节点数据
// 如果是生产者,item是数据,如果是消费者,就为null
volatile Object item;
// 当前线程
volatile Thread waiter;
// 当前属性是为了区分消费者和生产者属性
// true代表是生产者,false代表是消费者
final boolean isData;
// 省略大量CAS操作
// 最终,生产者需要将item交给消费者
// 消费者需要获取到生产者的信息
}
无参构造会将头尾节点都设置为伪节点
TransferQueue() {
// false是isData,代表不是生产者
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
5.4.2 transfer方法
// 当前方式就是TransferQueue的核心内容
// e是传递的数据
// timed为false,代表无限阻塞,如果为true,代表是自己自定义时间的阻塞
// nacos代表阻塞时间
E transfer(E e, boolean timed, long nanos) {
// 当前QNode就是封装生产者或消费者信息
QNode s = null;
// 如果是true,代表是生产者,否则是消费者
boolean isData = (e != null);
// 死循环,由于没有加锁,因此需要大量的判断来防止并发问题
for (;;) {
// 获取头节点和尾节点
QNode t = tail;
QNode h = head;
// 为了避免TransferQueue还没有初始化
if (t == null || h == null)
continue; // spin
// 1.如果头尾节点相等,代表队列为空
// 2.如果队列中有节点,同时尾节点和新插入的节点的类型相同,说明是插入队列
// if中的逻辑是进到队列
if (h == t || t.isData == isData) { // empty or same-mode
// 拿到尾节点的next
QNode tn = t.next;
// 如果t不为尾节点,说明有并发问题,重新走一遍
if (t != tail)
continue;
// 尾节点的next不为null,说明在这之前,有同一种角色插入进来
if (tn != null) { // lagging tail
// 直接帮助那个并发线程,修改tail指向最后一个元素
advanceTail(t, tn);
// 重走一遍
continue;
}
// 这里判断代表当前线程是否可以阻塞,如果不能阻塞,就直接返回null,不能添加,因为没有适配的消费者或生产者
// 如果timed为true并且nanos <= 0,也就是不能阻塞的线程
if (timed && nanos <= 0) // can't wait
return null;
// 这里的s就是要封装的QNode
// 做if判断,表示只需要new一次,因为是死循环
if (s == null)
// 构建QNode
s = new QNode(e, isData);
// 基于CAS操作,将tail节点的next设置为当前节点
// 从null改为s
if (!t.casNext(null, s)) // failed to link in
// 如果修改失败,重新执行for循环
continue;
// 如果CAS操作成功,那就修改tail的指向,将tail指向tail的next
advanceTail(t, s); // swing tail and wait
// 如果新节点进入到队列中,挂起线程,要么等生产者,要么等消费者
// s是返回替换后的数据
Object x = awaitFulfill(s, e, timed, nanos);
// 如果元素和节点一致,说明节点取消了
if (x == s) { // wait was cancelled
// 清空当前节点,将上一个节点的next指向当前节点的next
clean(t, s);
// 节点取消,直接返回
return null;
}
// 走这,说明当前节点没有取消
// 判断当前节点是否还在队列中
if (!s.isOffList()) { // not already unlinked
// 将当前节点设置尾head
advanceHead(t, s); // unlink if head
// 如果队列中为生产者,新增为消费者
if (x != null) // and forget fields
// 将当前节点的item设置为自己,当前节点已经没有意义了
s.item = s;
// 线程位设置为null
s.waiter = null;
}
// 匹配好了,数据也交换了,直接返回
// 如果x!=null,说明队列中是生产者,当前是消费者,这边直接返回x的具体数据
// 反之队列中是消费者,当前是生产者,直接返回自己的额数据
return (x != null) ? (E)x : e;
// 匹配队列中的角色
// 到这说明队列中有数据,且进来的数据和队列中的类型不同
} else { // complementary-mode
// 因为是queue,先进先出,所以获取头结点的next节点
// 作为要匹配的节点
QNode m = h.next; // node to fulfill
// 作并发判断,如果头节点、尾节点、头节点的next节点发生变化
if (t != tail || m == null || h != head)
// 重新走循环
continue; // inconsistent read
// 获取头节点的next节点的数据,开始做匹配
Object x = m.item;
// isData,true代表生产者,false代表消费者
// 如果满足isData == (x != null),代表当前出现了并发问题,
// 因为走到这一步,假如x有数据,代表队列是生产者,isData一定为false
if (isData == (x != null) || // m already fulfilled
// 如果排队的节点取消,就会出现x==m,就会将当前的QNode中的item指向QNode,也就是指向自己
x == m || // m cancelled
// 如果前面两个条件都没满足,就可以开始交换数据
// 将head的next换成最新的插入的数据
// 数据交换的目的就是为了将生产者的数据传递给消费者
// 如果交换失败,说明有并发问题,重新设置head节点,并且再走一次循环
!m.casItem(x, e)) { // lost CAS
// 进来说明有并发问题,交换失败,需要重新设置head并重走循环
advanceHead(h, m); // dequeue and retry
continue;
}
// 重新设置head
advanceHead(h, m); // successfully fulfilled
// 唤醒head.next中的线程
LockSupport.unpark(m.waiter);
// 匹配好了,数据也交换了,直接返回
// 如果x!=null,说明队列中是生产者,当前是消费者,这边直接返回x的具体数据
// 反之队列中是消费者,当前是生产者,直接返回自己的额数据
return (x != null) ? (E)x : e;
}
}
}