四:阻塞队列

基础概念

1.生产者消费者概念

生产者消费者是设计模式的一种,让生产者消费者通过一个容器来解决强耦合的问题

生产者和消费者之间不会直接通讯,而是通过一个容器(队列)来进行通讯。生产者生产完扔进队列中,消费者直接从容器中获取数据消费。

2.JUC阻塞队列存取方法

package java.util.concurrent;
public interface BlockingQueue<E> extends Queue<E> {}

存储数据

// 添加数据到队列,如果队列满了,那么直接抛出异常
boolean add(E e);
// 添加数据到队列,如果队列满了,返回false
boolean offer(E e);
// 添加数据到队列,如果队列满了,阻塞timeout时间,如果时间到了还在阻塞,那么返回false
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 添加数据到队列,如果队列满了,直接挂起线程,等到队列中有位置,再扔数据进去
void put(E e) throws InterruptedException;

获取数据

// 先进先出,移出元素,如果队列为空,抛异常
boolean remove(Object o);
E remove();
// 从队列中移除数据,如果队列为空,返回null
poll()
// 从队列中移除数据,如果队列为空,挂起线程timeout时间,等生产者生产数据,再移除,
E poll(long timeout, TimeUnit unit) throws InterruptedException;
// 从队列中移除数据,如果队列为空,挂起线程,一直等待有数据
E take() throws InterruptedException;
//
peek()

1.ArrayBlockingQueue

1.1 基本使用

ArrayBlockingQueue在初始化的时候,必须要指定队列的长度,因为底层是基于数组实现的。

package com.xqm.juc.lock.blockQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class Test01 {
    public static void main(String[] args) throws InterruptedException {

        // 底层是数组  final Object[] items;
        // this.items = new Object[capacity];
        ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue(4);

        arrayBlockingQueue.add("1");
        arrayBlockingQueue.add("2");
        arrayBlockingQueue.add("3");
        arrayBlockingQueue.add("4");
        // 存第五个会抛出异常,因为队列已经满了
        // arrayBlockingQueue.add("5");
  
        // 会返回false,没存进去,队列已满
        // arrayBlockingQueue.offer("6");

        // 会在两秒后返回false
        // arrayBlockingQueue.offer("7",2,TimeUnit.SECONDS);

        // 会一直阻塞,直到数据存入为止
        // arrayBlockingQueue.put("8");
  
        // 消费者取数据 采用先进先出,取出的是1
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());


        // 取出不存在的元素,抛异常
        // System.out.println(arrayBlockingQueue.remove());

        // 返回null
        // System.out.println(arrayBlockingQueue.poll());
  
        // 等待2s后返回null
        // System.out.println(arrayBlockingQueue.poll(2,TimeUnit.SECONDS));


        // 挂起等待,直到有数据才输出
        // System.out.println(arrayBlockingQueue.take());
  
    }
}

1.2 基本属性

ArrayBlockingQueue的成员变量

  • lock:就是一个ReentrantLock
  • count:就是当前数组元素的个数
  • items:队列的底层是数组,item就是数组本身
  • putIndex:下一个将要添加的数组的索引位置
  • takeIndex:下一个要取数据的数组的下标
  • notEmpty:消费者挂起线程和唤醒线程用到的Condition
  • notFull:生产者挂起线程和唤醒线程用到的Condition

1.3 生产者实现原理

add(E e)

public boolean add(E e) {
    return super.add(e);
}

// 走的是offer方法
public boolean add(E e) {
    if (offer(e))
        return true;
    // 如果队列满了,则抛出异常
    else
        throw new IllegalStateException("Queue full");
}

offer(E e)

// offer方法实现
public boolean offer(E e) {
    // 检查e不为null,为null则抛出空指针异常
    checkNotNull(e);
    // 默认是非公平锁,可以在队列初始化的时候指定为公平锁
    final ReentrantLock lock = this.lock;
    // 为了保证线程安全,直接加锁
    lock.lock();
    try {
        // count是当前队列中元素的个数,items是队列底层数组
        if (count == items.length)
            // 如果元素满了,返回false
            return false;
        else {
            // 执行入队操作
            enqueue(e);
            // 返回true
            return true;
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}

private static void checkNotNull(Object v) {
    if (v == null)
        // 元素为null则抛出异常
        throw new NullPointerException();
}

// 入队操作
private void enqueue(E x) {
    // 拿到数组的引用
    final Object[] items = this.items;
    // putIndex是数组下一个添加位置的索引
    // 这里就是直接添加到数组中
    items[putIndex] = x;
    // 如果putIndex进行++操作,如果++后的putIndex等于items.length的话,说明数组满了,将putIndex直接置为0
    if (++putIndex == items.length)
        putIndex = 0;
    // count是数组元素的个数,因为新增一个元素,所以++
    count++;
    // 将一个Condition中阻塞的线程唤醒
    notEmpty.signal();
}

offer(E e, long timeout, TimeUnit unit)

生产者添加数据时,如果发现队列已满,就会进行阻塞

  • 阻塞到消费者消费数据,就会唤醒当前阻塞线程
  • 阻塞到time时间到达,再次尝试是否可以添加,如果不能,就返回false
// 带有时间单位的offer
// 如果线程在挂起的时候,对当前阻塞的线程进行中断标记位进行设置,当前线程会抛出异常
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
    // 检查输入的e是否为null
    checkNotNull(e);
    // 将单位转换成纳秒
    long nanos = unit.toNanos(timeout);
    // 获取当前的锁
    final ReentrantLock lock = this.lock;
    // 使用可中断锁来加锁,允许线程中断并抛出异常
    lock.lockInterruptibly();
    try {
        // 检查队列是否已满。
        // 使用while是为了解决虚假唤醒
        while (count == items.length) {
            // 等待时间是否小于0
            if (nanos <= 0)
                // 直接失败
                return false;
            // 调用生产者Condition,进行线程休眠,返回剩余时间
            // 挂起等待,同时会释放锁资源(类似sync的wait)
            // 被唤醒后,还得重新竞争锁资源,得到锁才能进行添加
            nanos = notFull.awaitNanos(nanos);
        }
        // 和offer(E e)一样入队操作
        enqueue(e);
        return true;
    } finally {
        // 释放锁
        lock.unlock();
    }
}

put(E e)

// put方法,一直挂起,直到被唤醒或中断
public void put(E e) throws InterruptedException {
    // 非空校验
    checkNotNull(e);
    // 获取当前的锁
    final ReentrantLock lock = this.lock;
    // 使用可中断锁来加锁,允许线程中断并抛出异常
    lock.lockInterruptibly();
    try {
        // 检查队列是否已满。
        while (count == items.length)
            // 一直阻塞,直到被唤醒或者中断
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

1.4 消费者实现原理

remove()

使用的就是poll()方法

// remove()
public E remove() {
    // remove()方法就是调用poll()方法
    // poll()如果有数据,返回数据,否则返回null
    E x = poll();
    if (x != null)
        return x;
    else
        // 如果poll()返回null,直接抛异常
        throw new NoSuchElementException();
}

poll()

public E poll() {
    // 获得锁
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 如果队列数据为0,返回null,否则出队操作
        return (count == 0) ? null : dequeue();
    } finally {
        // 释放锁
        lock.unlock();
    }
}

// 取出数据
private E dequeue() {
    // 获取数组的引用
    final Object[] items = this.items;
    // takeIndex是数组取数据的下标,获取数据
    E x = (E) items[takeIndex];
    // 把取出的数据的位置置为null
    items[takeIndex] = null;
    // 设置下次取数据的索引位置
    if (++takeIndex == items.length)
        // 如果满了,就从0开始
        takeIndex = 0;
    // 元素总数--
    count--;
    // 迭代器内容
    if (itrs != null)
        itrs.elementDequeued();
    // 唤醒当前Condition中排队的一个生产者线程
    // signalAll()是唤醒所有
    notFull.signal();
    // 返回数据
    return x;
}

poll(long timeout, TimeUnit unit)

// poll()带有时间单位
poll(long timeout, TimeUnit unit)


public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 转化时间单位
    long nanos = unit.toNanos(timeout);
    // 获取锁
    final ReentrantLock lock = this.lock;
    // 加锁(等待时间到或者被中断)
    lock.lockInterruptibly();
    try {
        // 如果没有数据
        while (count == 0) {
            // 时间到,没数据,返回null
            if (nanos <= 0)
                return null;
            // 线程休眠,等待被唤醒或者中断
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 取数据
        return dequeue();
    } finally {
        lock.unlock();
    }
}

take()

public E take() throws InterruptedException {
    // 获取锁
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lockInterruptibly();
    try {
        // 如果没数据
        while (count == 0)
            // 一直阻塞
            notEmpty.await();
        // 返回取出的数据
        return dequeue();
    } finally {
        // 释放锁
        lock.unlock();
    }
}

虚假唤醒

阻塞队列中,如果需要线程挂起操作,判断有无数据的位置使用while循环,为什么不能换成if?

一开始有线程A、线程B、线程C。线程A和B是生产者,线程C是消费者

while (count == items.length)
	// A挂起
	// B挂起
    notFull.await();
enqueue(e);

此时C消费了一个数据 ,唤醒了A,同时线程D(生产者)也来了

// 线程D来了,获取到锁资源,但是还没走while循环
while (count == items.length)
	// A唤醒,但是只是从wait到AQS队列中
	// B挂起
    notFull.await();
enqueue(e);

此时D判断,有位置,可以添加数据到队列,走enqueue(e)。

如果判断是if的话,A在E释放锁资源后,拿到锁资源,不会再次判断队列是否已满,而是直接增添数据,这样就会覆盖数据,出现安全问题。

如果使用while,线程A被唤醒,然后继续走while循环,然后又进行挂起,这就叫虚假唤醒,实际什么都没做。

2.LinkedBlockingQueue

和ArrayBlockingQueue类似,只不过是用链表实现的。无界队列(其实是Integer.MAX_VALUE,只不过一般达不到)

2.1 底层实现

无参构造

// 无参构造
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

有参构造

// 有参构造
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
	// 初始化时,会构建一个没有意义的头和尾
    last = head = new Node<E>(null);
}

存储数据的节点

// Node对象就是存储数据的单位
static class Node<E> {
    // 存储的数据
    E item;
    // 指向下一个数据的指针
    Node<E> next;
    // 有参构造
    Node(E x) { item = x; }
}

其他属性

  • // 因为是链表,没有数组的length属性,基于AtomicInteger来记录长度
    private final AtomicInteger count = new AtomicInteger();
    
  • // 链表的头节点,用来取数据
    transient Node<E> head;
    
  • // 链表的尾节点,用来存数据
    private transient Node<E> last;
    
  • 锁:ArrayBlockingQueue只有一把锁,因为在一个数组中有序的进行生产和消费,生产和消费不能同时,因为索引的下标会乱套。但是链表是连续的地址空间,消费前面的数据,和生产后面的数据完全不影响。
// 消费者的锁
private final ReentrantLock takeLock = new ReentrantLock();

// 消费者的挂起和唤醒操作
private final Condition notEmpty = takeLock.newCondition();

// 生产者的锁
private final ReentrantLock putLock = new ReentrantLock();

// 生产者的挂起和唤醒操作
private final Condition notFull = putLock.newCondition();

2.2 生产者实现原理

add(E e)

还是走的offer方法

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

offer(E e)

public boolean offer(E e) {
    // 非空校验
    if (e == null) throw new NullPointerException();
    // 把成员变量单独引用出来,拿到存储条数
    final AtomicInteger count = this.count;
    // 查看当前条数是否等于限制长度
    if (count.get() == capacity)
        // 到达最大长度
        return false;
    // 声明一个c变量
    int c = -1;
    // 将要存储的对象封装成一个node
    Node<E> node = new Node<E>(e);
    // 获取put锁
    final ReentrantLock putLock = this.putLock;
    // 竞争锁资源
    putLock.lock();
    try {
        // 再次做判断,查看是否有空间,防止多线程
        if (count.get() < capacity) {
            // 入队操作
            enqueue(node);
            // 先把count赋值给c,然后数据总个数count++
            c = count.getAndIncrement();
            // 查看是否还有空间。c+1小于长度限制
            if (c + 1 < capacity)
                // 唤醒生产者
                // 有生产者基于await挂起,这里添加完数据后,发现还有空间可以存储数据
                // 唤醒前面可能挂起的生产者
                // 因为这里的生产者和消费者不是互斥的,写操作同时,可能消费者也在消费
                notFull.signal();
        }
    } finally {
        // 释放锁资源
        putLock.unlock();
    }
    // c==0,代表添加数据之前,队列元素的个数是0
    // 如果有消费者在队列没有数据的时候,一定处于挂起的状态
    if (c == 0)
        // 唤醒消费者
        signalNotEmpty();
    // 返回c是不是大于等于0,添加成功返回true,添加失败c=-1返回false
    return c >= 0;
}

// 将新增的数据往后放
private void enqueue(Node<E> node) {
    // 将当前数据设置为last.next,并且设置为last
    last = last.next = node;
}


private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    // 获取读锁,只有获取读锁才能进行唤醒读锁,和synchronized的wait、notify差不多
    takeLock.lock();
    try {
        // 唤醒读锁
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

offer(E e, long timeout, TimeUnit unit)

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
    // 非空校验
    if (e == null) throw new NullPointerException();
    // 将时间转化为纳秒
    long nanos = unit.toNanos(timeout);
    // c作为返回的标记
    int c = -1;
    // 写锁
    final ReentrantLock putLock = this.putLock;
    // 总数
    final AtomicInteger count = this.count;
    // 允许中断的加锁方式
    putLock.lockInterruptibly();
    try {
        // 如果元素个数到达最大
        while (count.get() == capacity) {
            // 时间不够,直接返回false,添加失败
            if (nanos <= 0)
                return false;
            // 挂起线程,返回剩余的时间
            nanos = notFull.awaitNanos(nanos);
        }
        // 加入队列
        enqueue(new Node<E>(e));
        // 元素个数加一
        c = count.getAndIncrement();
        // 唤醒生产者
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 释放锁
        putLock.unlock();
    }
    if (c == 0)
        // 唤醒消费者
        signalNotEmpty();
    return true;
}

put(E e)

和offer()很相似,只不过会一直阻塞

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {

        while (count.get() == capacity) {
            // 一直阻塞
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

2.3 消费者实现原理

remove()

使用的是poll()方法

public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

poll()

public E poll() {
    // 拿到队列长度的计数器
    final AtomicInteger count = this.count;
    // 如果队列数据为0
    if (count.get() == 0)
        // 返回null
        return null;
    // 队列中有数据.声明返回接口
    E x = null;
    // 返回标记
    int c = -1;
    // 获取消费锁
    final ReentrantLock takeLock = this.takeLock;
    // 抢夺锁资源
    takeLock.lock();
    try {
        // 再次判断,如果队列长度大于0
        if (count.get() > 0) {
            // 从队列中移除数据
            x = dequeue();
            // 将count赋值给c,然后--
            c = count.getAndDecrement();
            // 如果之前元素个数大于1,减去1之后,说明还剩至少1个
            if (c > 1)
                // 如果依然有数据,继续唤醒一个消费者
                notEmpty.signal();
        }
    } finally {
        // 释放锁
        takeLock.unlock();
    }
    // c是之前的元素个数,如果为当前队列的限制
    // 说明有空余的位置,可以继续添加
    if (c == capacity)
        // 唤醒阻塞的生产者
        signalNotFull();
    // 如果没有数据,返回null
    return x;
}


// 移除数据
private E dequeue() {
    // head.next节点,就是需要移除的数据
    // 这里先拿head
    Node<E> h = head;
    // head的next节点
    Node<E> first = h.next;
    h.next = h; // help GC
    // head.next就是新的head
    head = first;
    // 返回的值,head.next.item
    E x = first.item;
    // 设置head.next.item为null,成为新的head
    first.item = null;
    return x;
}



// 先拿到生产者锁线程,然后唤醒生产者
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

poll(E,Unit)

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 返回结果
    E x = null;
    // 表示
    int c = -1;
    // 转成纳秒
    long nanos = unit.toNanos(timeout);
    // 拿到计数器
    final AtomicInteger count = this.count;
    // 拿到消费者锁
    final ReentrantLock takeLock = this.takeLock;
    // 可以中断的锁
    takeLock.lockInterruptibly();
    try {
        // 如果队列中没数据
        while (count.get() == 0) {
            if (nanos <= 0)
                return null;
            // 挂起当前线程,并获取剩余时间
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 数据出队
        x = dequeue();
        // count--
        c = count.getAndDecrement();
        if (c > 1)
            // 唤醒消费者
            notEmpty.signal();
    } finally {
        // 释放锁
        takeLock.unlock();
    }
    if (c == capacity)
        // 唤醒生产者
        signalNotFull();
    return x;
}

take

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 相比poll,这里的出口只有一个,中断标记为,抛出异常,否则一直等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

3.PriorityBlockingQueue

3.1介绍

优先级阻塞队列。不满足先进先出的概念。

会将插入的数据进行排序,排序的方式就是基于插入数据值的本身。

排序的方式是基于二叉堆的方式实现的。底层实现的是数据结构中的二叉堆。

案例

package com.xqm.juc.lock.priorityBlockingQueue;

import java.util.concurrent.PriorityBlockingQueue;

public class Test01 {
    public static void main(String[] args) {
        PriorityBlockingQueue queue=new PriorityBlockingQueue();
        queue.add(1);
        queue.add(2);
        queue.add(5);
        queue.add(3);
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());

        // 输出的结果为: 1 2 3 5
    }
}

3.2二叉堆

3.2.1结构介绍

排序的方式是基于二叉堆的方式实现的。

优先级队列表示为平衡二叉堆:queue[n] 的两个子节点是 queue[2*n+1] 和 queue[2*(n+1)]。 
优先级队列按比较器排序,或者按元素的自然排序,
如果比较器为空:对于堆中的每个节点 n 和 n 的每个后代 d,n <= d。
最低值的元素在 queue[0] 中,假设队列是非空的。
/**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     */
    private transient Object[] queue;

PriorityBlockingQueue是基于数组实现的二叉堆。

3.2.2特点

1.二叉堆就是一个完整的二叉树。

2.任意一个节点大于其父节点,称为小顶堆,任意一个节点小于其父节点,称为大顶堆。

二叉堆:

image.png

3.3 PriorityBlockingQueue的核心属性

default_initial_capacity

记录数组的初始长度,初始长度默认为11

private static final int DEFAULT_INITIAL_CAPACITY = 11;

max_array_size

数组的最大长度。

减8的目的是为了适配各个版本的虚拟机

private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

默认当前使用的Hotspot虚拟机最大支持Integer.MAX_VALUE-2,但是其他版本的虚拟机不一定。

 int[] a=new int[Integer.MAX_VALUE]
// 结果
Exception in thread "main" java.lang.OutOfMemoryError: Requested array size exceeds VM limit
	at com.xqm.juc.lock.priorityBlockingQueue.Test01.main(Test01.java:9)

如果改为-2,那么只需要设置堆的大小,JVM是可以支持那么大的数组的。

int[] a=new int[Integer.MAX_VALUE-2];
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
	at com.xqm.juc.lock.priorityBlockingQueue.Test01.main(Test01.java:9)

queue

存储数据的数组,基于这个数组存储的二叉堆

private transient Object[] queue;

size

记录当前优先级阻塞队列中元素的个数

private transient int size;

comparator

要求使用的对象使用comparable比较器,基于comparator来确定对象的大小。

private transient Comparator<? super E> comparator;

lock

实现阻塞队列的锁。

private final ReentrantLock lock;

notEmpty

挂起线程的操作。队列为空时阻塞的条件。

private final Condition notEmpty;

allocationSpinLock

用于分配的自旋锁,通过 CAS 获取。如果为0,说明没有线程正在扩容操作。

因为数组的长度是固定的,因此数组扩容时,需要构建新数组,而且同时要迁移数据。因此PriorityBlockingQueue在扩容操作时,不会lock住扩容操作,而是释放lock锁,使用allocationSpinLock属性来进行标记,避免出现并发扩容的问题。为了提升效率。

private transient volatile int allocationSpinLock;

q

阻塞队列中,用到的原理,其实就是普通的优先级队列。

private PriorityQueue<E> q;

3.4 写操作源码分析

阻塞队列,添加的操作还是add,offer,offer(time,unit),put等。

因为优先级队列中,数组是可以扩容的,因此数组虽然也有长度的限制,但是还是属于无界队列,所以生产者不会阻塞,只有offer操作才能查看。

核心的内容还是如果保证二叉堆中小顶堆的结构,以及数组是如何扩容的。

add

add方法走的还是offer方法

public boolean add(E e) {
        return offer(e);
    }

offer

public boolean offer(E e) {
	// 非空判断
	if (e == null)
		throw new NullPointerException();
	// 拿到锁之后,直接上锁
	final ReentrantLock lock = this.lock;
	lock.lock();
	// n:当前数组的元素个数
	// cap:就是优先级队列这个数组的长度
	int n, cap;
	// array:优先级队列这个数组
	Object[] array;
	// 如果当前数组的元素个数大于数组的长度
	// 使用while为了防止虚假唤醒
	while ((n = size) >= (cap = (array = queue).length))
		// 尝试扩容
		tryGrow(array, cap);
	// 到这说明不需要扩容,那就进行新增操作
	try {
		// cmp:设置为比较器
		Comparator<? super E> cmp = comparator;
		// 基于比较器是否为空进行判断
		// 如果自己没设置比较器,那比较器是个属性,默认为null
		if (cmp == null)
			// 比较数据大小,存储数据,查看是否需要上移操作,保证平衡堆
			siftUpComparable(n, e, array);
		// 如果自己设置了比较器
		else
			siftUpUsingComparator(n, e, array, cmp);
		// 证明数据已经存储成功
		// 数组数据长度+1
		size = n + 1;
		// 如果有挂起的线程,需要唤醒挂起的消费者
		// 因为会一直扩容,直到MAX长度,实际相当于无界队列
		notEmpty.signal();
		// 释放锁
	} finally {
		lock.unlock();
	}
	return true;
}

tryGrow

tryGrow是用来扩容的方法.

如果有两个线程同时执行tryGrow方法,那么只会有一个线程进行扩容操作。另一个线程可能多次走tryGrow方法,但是依然需要等待前面的线程扩容完毕。

// 扩容方法,在添加数据之前,会采用while的方法来判断是否需要扩容
private void tryGrow(Object[] array, int oldCap) {
	// 释放锁。读和写操作都是同一把锁,扩容的时候防止读和写阻塞,必须先释放锁
	lock.unlock(); // must release and then re-acquire main lock
	// 声明新数组
	Object[] newArray = null;
	// 如果allocationSpinLock=0代表没有线程正在扩容
	if (allocationSpinLock == 0 &&
		// 基于CAS的方式,将allocationSpinLock设置为1,代表可以开始扩容
	        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
	                                 0, 1)) {
		try {
			// 计算新数组的长度
			// 如果老数组长度小于64,那么就扩容成2*oldCap+2(为了加快扩容的速度),否则就扩容成1.5*oldCap
			int newCap = oldCap + ((oldCap < 64) ?
				// 如果有人玩反射,将数组的长度设置为0,如果最后不加2,会出现问题
			                       (oldCap + 2) : // grow faster if small
			                       // 扩容到1.5倍
			                       (oldCap >> 1));
			// 如果新数组长度超过定义的最大数组长度
			if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
				// 声明minCap数组为老数组+1
				int minCap = oldCap + 1;
				// 老数组长度已经大于最大长度,直接抛出异常
				if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
					throw new OutOfMemoryError();
				// 如果老数组没有达到最大长度,直接将新数组设置为最大长度
				newCap = MAX_ARRAY_SIZE;
			}
			// 如果新数组长度大于老数组长度 并且队列没有改变
			// 第二个判断确保没有并发扩容的出现
			if (newCap > oldCap && queue == array)
				// 设置新数组
				newArray = new Object[newCap];
		} finally {
			// 新数组有了,直接标记位归零
			allocationSpinLock = 0;
		}
	}
	// 新数组没有构建成功
	if (newArray == null) // back off if another thread is allocating
	// 将当前线程时间片交出去
		Thread.yield();
	// 拿到锁资源
	lock.lock();
	// 将新数组赋值给queue
	if (newArray != null && queue == array) {
		// 数组赋值,实际上是地址指向改变
		queue = newArray;
		// 将老数组的数据导入到新数组
		System.arraycopy(array, 0, newArray, 0, oldCap);
	}
}

siftUpComparable

数据放到数组中,同时保持二叉堆的结构。

// k是当前数组的元素个数,也就是添加元素的索引位置
// x是存入的数据
// array就是原数组
private static <T> void siftUpComparable(int k, T x, Object[] array) {
	// 将插入的元素强转为Comparable
	// 强转可能会导致异常
	Comparable<? super T> key = (Comparable<? super T>) x;
	// 如果原来有数组
	while (k > 0) {
		// 获取父节点的索引位置
		int parent = (k - 1) >>> 1;
		// 拿到父节点的值
		Object e = array[parent];
		// 用子节点compareTo父节点,如果大于0,说明直接插入即可
		if (key.compareTo((T) e) >= 0)
			break;
		// 到这说明新插入比父节点小
		// 将父节点放到最新的位置
		array[k] = e;
		// k赋值为父节点的位置
		k = parent;
	}
	// 走这里代表原来数组没有数据,直接放在第一个
	array[k] = key;
}

实例:假如插入13

第一步:

image.png

第二步:

image.png

第三步:

image.png

第四步:

image.png

offer(time,unit)

offer加上阻塞时间实际上还是调用的offer函数,因为是无限队列,可以自动扩容,不需要阻塞。

public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e); // never need to block
    }

put

put调用的也是offer方法

 public void put(E e) {
        offer(e); // never need to block
    }

3.5 读操作源码分析

读操作是会出现线程挂起的情况,如果数组中元素个数为0,执行的是take方法,那么就会出现线程挂起的情况。

其次获取数据,由于是优先级队列,因此每次拿索引为0的位置的数据,每次拿完数据后,要保证二叉堆的结构,就会有下移操作。

主要有poll()、poll(time.unit)、take()等方法。

poll

// 读取数据-poll方法
public E poll() {
	// 拿到锁
	final ReentrantLock lock = this.lock;
	// 加锁
	lock.lock();
	try {
		// 拿到返回值,没拿到返回null
		return dequeue();
	} finally {
		// 释放锁
		lock.unlock();
	}
}

poll(time,unit)

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
	// 转换为纳秒
	long nanos = unit.toNanos(timeout);
	// 拿到锁
	final ReentrantLock lock = this.lock;
	// 上锁
	lock.lockInterruptibly();
	// 定义结果变量
	E result;
	try {
		// 如果没有结果并且时间还有
		while ( (result = dequeue()) == null && nanos > 0)
			// 进行阻塞
			nanos = notEmpty.awaitNanos(nanos);
	} finally {
		// 释放锁
		lock.unlock();
	}
	// 返回结果
	return result;
}

take

public E take() throws InterruptedException {
	// 拿到锁
	final ReentrantLock lock = this.lock;
	// 上锁
	lock.lockInterruptibly();
	// 结果变量
	E result;
	try {
		// 如果一直拿不到锁,就进行阻塞
		while ( (result = dequeue()) == null)
			notEmpty.await();
	} finally {
		// 释放锁
		lock.unlock();
	}
	// 返回结果
	return result;
}

dequeue

dequeue是出队操作。

// 出队列操作,拿到数组中索引为0位置的数据
private E dequeue() {
	// 定义一个变量n,表示数组中元素个数-1
	int n = size - 1;
	// 如果小于0,表示数组没数据
	if (n < 0)
		// 没有数据直接返回null
		return null;
	// 说明拿到数据
	else {
		// array定义临时变量获取queue中数据
		Object[] array = queue;
		// 拿到零索引位置的数据
		E result = (E) array[0];
		// 拿到最后一个位置数据
		E x = (E) array[n];
		// 最后一个位置置为null
		array[n] = null;
		// 拿到比较器
		Comparator<? super E> cmp = comparator;
		// 移除数据并保证小顶堆数据结构
		if (cmp == null)
			siftDownComparable(0, x, array, n);
		else
			siftDownUsingComparator(0, x, array, n, cmp);
		// 元素个数减一
		size = n;
		// 返回result
		return result;
	}
}

siftDownComparable

siftDownComparable是出队后保证小顶堆的结构。

// k=0,x是数组/二叉堆最后一个元素,array是数组,n是移除数据后的数组元素个数
private static <T> void siftDownComparable(int k, T x, Object[] array,
        int n) {
	// 取完第一个数据后,数组还有数据,需要做平衡堆操作
	if (n > 0) {
		// 拿到最后一个数据的比较器
		Comparable<? super T> key = (Comparable<? super T>)x;
		// 因为二叉堆是一个满树结构,所以在保证二叉堆结构时,只需要循环一半就行
		int half = n >>> 1;           // loop while a non-leaf
		// 做超过一半就不需要继续往下,已经到头了
		while (k < half) {
			// 找k的左子节点
			int child = (k << 1) + 1; // assume left child is least
			// c是左子节点的值
			Object c = array[child];
			// 右子节点
			int right = child + 1;
			// 确定有右节点,右子节点小于元素个数
			if (right < n &&
				// 左子节点的值大于右子节点,根据小顶堆,应该右子节点在上
			        ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
				c = array[child = right];
			// 如果左子节点大于最后一个数据,直接退出循环,让最后一个节点数据覆盖左子节点的父节点
			if (key.compareTo((T) c) <= 0)
				break;
			// 如果左子节点小于右子节点,左子节点的值就覆盖父节点,否则就右子节点就覆盖父节点
			array[k] = c;
			k = child;
		}
		// 填充最后一个节点的数据
		array[k] = key;
	}
}

演示:

第一步:

image.png

第二步:

image.png

第三步:

image.png

第四步:

image.png

其实就是循环拿到父节点的左子节点和右子节点,左子节点和右子节点值小的那个,和最后一个数据进行对比,小的值进行覆盖父节点,直到最后循环一半之后(也就是层数),得到最后的平衡二叉堆。

4.DelayQueue

4.1 延迟队列介绍

1.延迟队列也是基于二叉堆实现的,和优先级队列不同的是,优先级队列是基于值的大小进行排列,而延迟队列是基于延迟时间进行排列。

2.延迟队列生产一个消息,这个消息存在被消费的延迟时间。

3.DelayQueue底层实现的是PriorityQueue,实际上数组也是可以自动扩容的,可以当作是无限队列。

4.2 延迟队列的应用测试

4.2.1 延迟队列的类

// 延迟队列,里面的泛型继承Delayed接口,必须实现getDelay这个方法和compareTo方法
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {}

// Delayed接口 继承Comparable类,具有比较的能力
public interface Delayed extends Comparable<Delayed> {
	// 这里的抽象方法就是需要设置的延迟时间
	long getDelay(TimeUnit unit);
}

// Comparable实现了比较
public interface Comparable<T> {
    public int compareTo(T o);
}

4.2.2 自定义任务

自定义要放入延迟队列的Task类

package com.xqm.juc.lock.delayQueue;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class Task implements Delayed {

    /**
     * 为了区分每个任务的不同,设置name
     */
    private String name;

    /**
     * 执行时间
     */
    private Long time;

    public Task(String name, Long time) {
        this.name = name;
        this.time = System.currentTimeMillis()+time;
    }

    /**
     * 设置任务什么时候能出延迟队列
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        // 小于等于0,就可以执行任务
        return unit.convert(time-System.currentTimeMillis(),TimeUnit.NANOSECONDS);
    }

    /**
     * 比较两个任务的时间
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.time-((Task)o).getTime());
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Long getTime() {
        return time;
    }

    public void setTime(Long time) {
        this.time = time;
    }

    @Override
    public String toString() {
        return "Task{" +
                "name='" + name + '\'' +
                ", time=" + time +
                '}';
    }
}

4.2.3 测试延迟队列

分别在1秒,3秒,5秒,7秒输出。

package com.xqm.juc.lock.delayQueue;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;

public class Test01<T extends Delayed> {
    public T data = null;

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<Task> queue = new DelayQueue();
        Task task1 = new Task("任务1", 1000L);
        Task task2 = new Task("任务2", 5000L);
        Task task3 = new Task("任务3", 3000L);
        Task task4 = new Task("任务4", 7000L);

        queue.put(task1);
        queue.put(task2);
        queue.put(task3);
        queue.put(task4);

        // 结果
        // Task{name='任务1', time=1663229491569}
        // Task{name='任务3', time=1663229493569}
        // Task{name='任务2', time=1663229495569}
        // Task{name='任务4', time=1663229497569}
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());

    }
}

4.2.4 延迟队列应用

比如点外卖,15分钟商家接单,如果不接单,这个订单就会取消。每下一个订单,就会放到延迟队列中,如果规定时间内,商家没接单,直接通过消费者获取元素,取消订单。

4.3 延迟队列的核心属性

DelayQueue只有四个核心属性。

// DelayQueue属于阻塞队列,需要加锁保证线程安全。生产者和消费者使用的是同一把锁
private final transient ReentrantLock lock = new ReentrantLock();
// 基于二叉堆实现的,因此底层使用的优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// leader一般存储等待堆顶数据的消费者线程
// 在整体写入和消费的过程中,会设置leader的一些判断
private Thread leader = null;
// 生产者在插入数据时,不会进行阻塞,当前的Condition是给消费者使用的。
// 消费者在消费数据时,发现堆顶的数据还没到延迟时间
// 因此堆顶元素到达延迟时间或者生产者元素到了堆顶的时候,消费者会进行阻塞
private final Condition available = lock.newCondition();

4.4 延迟队列的写入操作

延迟队列是无界的,写入操作会自动进行扩容。不需要关注生产者的阻塞问题,生产者是不会阻塞。

4.4.1 add

add方法是使用offer.

public boolean add(E e) {
        return offer(e);
    }

4.4.2 put

put方法也是使用的offer。

public void put(E e) {
        offer(e);
    }

4.4.3 offer(e,time,unit)

offer方法带有阻塞时间的,实际上也是offer方法。

public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e);
    }

4.4.4 offer-延迟队列

// 延迟队列插入方法
public boolean offer(E e) {
	// 获取lock
	final ReentrantLock lock = this.lock;
	// 加锁
	lock.lock();
	try {
		// 调用优先级队列的插入操作
		q.offer(e);
		// 拿到队列的第一个元素,判断是否是刚刚插入的元素
		if (q.peek() == e) {
			// leader赋值为null
			leader = null;
			// condition的唤醒方法,唤醒消费者准备等待最新插入数据,进行消费
			// 因为之前消费者肯定在等待堆其他数据。如果插入新的数据变成堆顶,那么消费者就需要改变等待的数据
			available.signal();
		}
		// 插入成功
		return true;
	} finally {
		// 释放锁
		lock.unlock();
	}
}

4.4.5 offer-优先级队列

// 优先级队列的插入操作
public boolean offer(E e) {
	// 如果插入元素为null,直接抛出异常
	if (e == null)
		throw new NullPointerException();
	// 优先级队列结构修改的次数+1
	modCount++;
	// 队列数据设置为i
	int i = size;
	// 如果大于等于数组长度
	if (i >= queue.length)
		// 进行扩容
		// 如果小于64,扩容成2倍+2,否则扩容成1.5倍
		grow(i + 1);
	// 数组元素+1
	size = i + 1;
	// 如果数组之前没数据,那就插入到第一个
	if (i == 0)
		queue[0] = e;
	else
		// 否则进行堆结构改变,和优先级队列一样
		siftUp(i, e);
	return true;
}

优先级队列的peek方法

// 优先级队列的peek操作,取出堆顶的元素,如果没有返回null
public E peek() {
	return (size == 0) ? null : (E) queue[0];
}

4.5 延迟队列的读取操作

消费者存在阻塞的情况,有两种情况:

  • 消费者要拿堆顶的数据 ,但是延迟时间还没到,因此消费者需要等待。
  • 消费者要来拿数据,发现已经有消费者在等待堆顶数据,这个后来的消费者也需要等待。

4.5.1 remove方法

remove方法,如果堆顶有元素,直接返回元素,否则抛出异常。

调用的还是抽象队列AbstractQueue中的poll()方法。

public E remove() {
	// 调用poll方法
	E x = poll();
	if (x != null)
		// 拿到值则返回,否则抛出异常
		return x;
	else
		throw new NoSuchElementException();
}

4.5.2 poll方法

poll方法拿到数据则直接返回,否则返回null。不会进行阻塞。

// poll是尝试获取数据,能拿到就拿,拿不到就直接返回null
public E poll() {
	// 获取锁
	final ReentrantLock lock = this.lock;
	// 加锁
	lock.lock();
	try {
		// 获取堆顶的元素
		E first = q.peek();
		// 如果栈顶为null或者元素的延迟时间大于0,任务还在延时
		if (first == null || first.getDelay(NANOSECONDS) > 0)
			// 拿不到就直接返回null
			return null;
		else
			// 拿到堆顶元素
			return q.poll();
	} finally {
		// 释放锁
		lock.unlock();
	}
}

4.5.3 poll(time.unit)方法

poll(time.unit)方法拿到数据直接返回,否则在time内一直尝试拿数据,进行阻塞,到时间拿不到数据则返回null。

// 带有阻塞时间的poll方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
	// 将时间转换为纳秒值
	long nanos = unit.toNanos(timeout);
	// 获取锁
	final ReentrantLock lock = this.lock;
	// 锁可以被中断
	lock.lockInterruptibly();
	try {
		// 死循环
		for (;;) {
			// 查询堆顶的元素
			E first = q.peek();
			// 如果堆顶的元素为null
			if (first == null) {
				// 如果阻塞时间已过还拿不到数据
				if (nanos <= 0)
					// 返回null
					return null;
				else
					// 挂起线程,说明队列中没有元素
					nanos = available.awaitNanos(nanos);
			// 堆顶有元素的话
			} else {
				// 查询堆顶元素的延迟时间
				long delay = first.getDelay(NANOSECONDS);
				// 如果延迟时间小于0,也就是可以拿到数据
				if (delay <= 0)
					// 直接返回拿到的元素
					return q.poll();
				// 到下面说明堆顶元素延迟时间没到
				// 查询阻塞时间是否已到
				if (nanos <= 0)
					// 阻塞时间已到还没拿到数据,直接返回null
					return null;
				// first赋值为null
				first = null; // don't retain ref while waiting
				// 如果指定阻塞时间小于堆顶元素的延迟时间,也就是在没有新元素插入进来的前提下,是拿不到数据的
				// 如果leader不为null,说明有消费者已经在等待了
				if (nanos < delay || leader != null)
					// 进行阻塞
					nanos = available.awaitNanos(nanos);
				// 到这说明没有消费者在等待,并且阻塞时间大于堆顶元素
				else {
					// 获取当前线程
					Thread thisThread = Thread.currentThread();
					// 当前的leader为当前线程
					leader = thisThread;
					try {
						// 阻塞,修改线程阻塞时间
						// 让当前消费者阻塞堆顶元素的延迟时间
						long timeLeft = available.awaitNanos(delay);
						// 重新计算当前消费者可阻塞时间
						nanos -= delay - timeLeft;
					} finally {
						// 如果当前线程是等待的消费者,将leader置为null
						if (leader == thisThread)
							leader = null;
					}
				}
			}
		}
	} finally {
		// 没有消费者等待元素  && 队列中的元素不为null
		if (leader == null && q.peek() != null)
			// 只要当前没有leader在等待,并且元素不为null,再次唤醒消费者
			// 避免队列有元素,但是没有消费者的情况.
			// 因为当前消费者执行完之后,其他消费者还在阻塞中
			available.signal();
		// 释放锁
		lock.unlock();
	}
}

4.5.4 take方法

take方法和带有时间的poll方法类似,差别在于poll方法是指定阻塞时间,而take则是一直在阻塞,直到拿到数据或者被中断为止。

public E take() throws InterruptedException {
	// 获取锁
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		// 死循环
		for (;;) {
			// 查询队列首个元素
			E first = q.peek();
			// 堆顶没有元素
			if (first == null)
				// 进行阻塞,挂起等待,直到被唤醒为止
				available.await();
			// 到这说明堆顶有元素
			else {
				// 获取堆顶元素的延迟时间
				long delay = first.getDelay(NANOSECONDS);
				// 如果延迟时间小于0,说明可以获取元素了
				if (delay <= 0)
					// 获取元素并返回
					return q.poll();
				// 将first置为null
				first = null; // don't retain ref while waiting
				// leader不为null,说明有消费者正在等数据
				if (leader != null)
					// 进行挂起
					available.await();
				// leader为null说明没有消费者在等待
				else {
					// 获取当前线程
					Thread thisThread = Thread.currentThread();
					// leader为当前线程
					leader = thisThread;
					try {
						// 将消费者等待时间置为延迟时间
						available.awaitNanos(delay);
					} finally {
						// 正在等待的消费者是等待线程
						if (leader == thisThread)
							// 将leader置为null
							leader = null;
					}
				}
			}
		}
	} finally {
		// 没有消费者在等待,并且队列中有元素
		if (leader == null && q.peek() != null)
			// 唤醒消费者
			available.signal();
		// 释放锁
		lock.unlock();
	}
}

5. SynchronousQueue

5.1 SynchronousQueue介绍

SynchronousQueue阻塞队列和其他阻塞队列有很大区别。

在概念中,队列肯定是要存储数据的,但是SynchronousQueue阻塞队列不会存储数据。

SynchronousQueue不存储数据,只会存储生产者或者消费者。当存储一个生产者到SynchronousQueue队列中之后,生产者会阻塞(比如调用put方法的时候)。

生产者最终会有几种结果:

  • 如果在阻塞期间有消费者来匹配,生产者就会将绑定的消息交给消费者
  • 生产者等到阻塞结束或者不允许阻塞,那么就直接失败
  • 生产者在阻塞期间,如果线程中断,那么也直接失败

同理,消费者和生产者的效果是一样的。

生产者和消费者的数据是直接传递的,不会经过SynchronousQueue。

生产者和消费者的方法:

生产者:

  • add()
  • offer():生产者在放到SynchronousQueue的同时,如果有消费者在等待消息,直接配对。如果没有消费者在等待数据,直接返回,数据没有给出去。
  • offer(time,unit):生产者在放到SynchronousQueue的同时,如果有消费者在等待消息,直接配对。如果没有消费者在等待数据,阻塞time时间,如果还没有消费者,直接返回,数据没有给出去。
  • put():生产者在放到SynchronousQueue的同时,如果有消费者在等待消息,直接配对。如果没有消费者在等待数据,一直阻塞,直到等到消费者,

消费者:道理和上面生产者一样

  • remove()
  • poll()
  • poll(time,unit)
  • take()

5.2 SynchronousQueue例子

5.2.1 没有消费者拿数据

没有消费者拿数据,直接返回false。

package com.xqm.juc.lock.synchronousQueue;

import java.util.concurrent.SynchronousQueue;

public class Test {

    public static void main(String[] args) {
        // 因为当前队列不存储数据,因此没有长度概念
        // 里面存储的生产者和消费者是链表的结构形式存储
        SynchronousQueue queue=new SynchronousQueue();
        String msg="消息";
        new Thread(()->{
            boolean offer = queue.offer(msg);
            // 返回false,代表没有消费者拿
            System.out.println(offer);
        },"Thread1").start();
    }
}

5.2.2 消费者拿到数据

package com.xqm.juc.lock.synchronousQueue;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class Test01 {

    public static void main(String[] args) {
        // 因为当前队列不存储数据,因此没有长度概念
        // 里面存储的生产者和消费者是链表的结构形式存储
        SynchronousQueue queue=new SynchronousQueue();
        String msg="消息";
        new Thread(()->{
            boolean offer = false;
            try {
                offer = queue.offer(msg,10,TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 返回true,代表消费者拿到数据
            System.out.println(offer);
        },"Thread1").start();

        new Thread(()->{
            // 打印出消息
            System.out.println(queue.remove());
        }).start();
    }
}

5.3 SynchronousQueue核心属性

进入SynchronousQueue类的内部后,发现一个静态内部类Transferer,Transferer类提供了一个transfer方法。

abstract static class Transferer<E> {
	abstract E transfer(E e, boolean timed, long nanos);
}

当前这个类中提供的transfer方法,就是生产者和消费者在读写数据时要用到的核心方法
生产者在调用transfer方法时,第一个参数e会正常传递数据
消费者在调用transfer方法时,第一个参数e会传递null
SynchronousQueue针对抽象类Transferer做了两种实现:

static final class TransferStack<E> extends Transferer<E> {}
static final class TransferQueue<E> extends Transferer<E> {}

这两种类继承了Transferer抽象内部类,在构建SynchronousQueue时,会指定使用哪种实现类。

// 到底采用哪种实现方式,需要把对应的对象存放到这个属性中
private transient volatile Transferer<E> transferer;

// 无参构造
public SynchronousQueue() {
	// 无参构造时,会再次调用有参构造方法
	this(false);
}
// 有参构造,fair为公平,采用Queue,如果不公平,采用Stack
public SynchronousQueue(boolean fair) {
	transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

TransferQueue是先进先出的概念,被称为公平的效果。

image.png

TransferStack是先进后出的概念,先入栈的生产者或消费者不是先出栈的,后入栈的先匹配。

测试:

设置fair为false,生产者的顺序为消息1、消息2、消息3,打印的结果为消息3、消息2、消息1。使用的是栈。

package com.xqm.juc.lock.synchronousQueue;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class Test02 {

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue queue=new SynchronousQueue(false);
        String msg="消息";
        new Thread(()->{
            try {
                queue.put("消息1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread1").start();
        new Thread(()->{
            try {
                queue.put("消息2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread2").start();
        new Thread(()->{
            try {
                queue.put("消息3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread3").start();


        Thread.sleep(200);
        new Thread(()->{
            System.out.println(queue.poll());
        }).start();
        Thread.sleep(200);
        new Thread(()->{
            System.out.println(queue.poll());
        }).start();
        Thread.sleep(200);
        new Thread(()->{
            System.out.println(queue.poll());
        }).start();
    }
}

//结果
消息3
消息2
消息1

设置fair为true,生产者的顺序为消息1、消息2、消息3,打印的结果为消息3、消息2、消息1。使用的是队列。

package com.xqm.juc.lock.synchronousQueue;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class Test02 {

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue queue=new SynchronousQueue(true);
        String msg="消息";
        new Thread(()->{
            try {
                queue.put("消息1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread1").start();
        new Thread(()->{
            try {
                queue.put("消息2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread2").start();
        new Thread(()->{
            try {
                queue.put("消息3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Thread3").start();


        Thread.sleep(200);
        new Thread(()->{
            System.out.println(queue.poll());
        }).start();
        Thread.sleep(200);
        new Thread(()->{
            System.out.println(queue.poll());
        }).start();
        Thread.sleep(200);
        new Thread(()->{
            System.out.println(queue.poll());
        }).start();
    }
}

//结果
消息1
消息2
消息3

5.4 SynchronousQueue的TransferQueue源码

5.4.1 QNode

队列和栈中的节点信息。


static final class QNode {
	// 当前节点的next节点
	volatile QNode next;  
	// 节点数据
	// 如果是生产者,item是数据,如果是消费者,就为null
	volatile Object item;  
	// 当前线程
	volatile Thread waiter;  
	// 当前属性是为了区分消费者和生产者属性
	// true代表是生产者,false代表是消费者
	final boolean isData;

	// 省略大量CAS操作 
	// 最终,生产者需要将item交给消费者
	// 消费者需要获取到生产者的信息
}

无参构造会将头尾节点都设置为伪节点

TransferQueue() {
		// false是isData,代表不是生产者
            QNode h = new QNode(null, false); // initialize to dummy node.
            head = h;
            tail = h;
        }

5.4.2 transfer方法

// 当前方式就是TransferQueue的核心内容
// e是传递的数据
// timed为false,代表无限阻塞,如果为true,代表是自己自定义时间的阻塞
// nacos代表阻塞时间
E transfer(E e, boolean timed, long nanos) {
	// 当前QNode就是封装生产者或消费者信息
	QNode s = null; 
	// 如果是true,代表是生产者,否则是消费者
	boolean isData = (e != null);
	// 死循环,由于没有加锁,因此需要大量的判断来防止并发问题
	for (;;) {
		// 获取头节点和尾节点
		QNode t = tail;
		QNode h = head;
		// 为了避免TransferQueue还没有初始化
		if (t == null || h == null)
			continue;                       // spin
		// 1.如果头尾节点相等,代表队列为空
		// 2.如果队列中有节点,同时尾节点和新插入的节点的类型相同,说明是插入队列
		// if中的逻辑是进到队列
		if (h == t || t.isData == isData) { // empty or same-mode
			// 拿到尾节点的next
			QNode tn = t.next;
			// 如果t不为尾节点,说明有并发问题,重新走一遍
			if (t != tail)
				continue;
			// 尾节点的next不为null,说明在这之前,有同一种角色插入进来
			if (tn != null) {               // lagging tail
				// 直接帮助那个并发线程,修改tail指向最后一个元素
				advanceTail(t, tn);
				// 重走一遍
				continue;
			}
			// 这里判断代表当前线程是否可以阻塞,如果不能阻塞,就直接返回null,不能添加,因为没有适配的消费者或生产者
			// 如果timed为true并且nanos <= 0,也就是不能阻塞的线程
			if (timed && nanos <= 0)        // can't wait
				return null;
			// 这里的s就是要封装的QNode
			// 做if判断,表示只需要new一次,因为是死循环
			if (s == null)
				// 构建QNode
				s = new QNode(e, isData);
			// 基于CAS操作,将tail节点的next设置为当前节点
			// 从null改为s
			if (!t.casNext(null, s))        // failed to link in
			// 如果修改失败,重新执行for循环
				continue;
			// 如果CAS操作成功,那就修改tail的指向,将tail指向tail的next
			advanceTail(t, s);              // swing tail and wait
			// 如果新节点进入到队列中,挂起线程,要么等生产者,要么等消费者
			// s是返回替换后的数据
			Object x = awaitFulfill(s, e, timed, nanos);
			// 如果元素和节点一致,说明节点取消了
			if (x == s) {                   // wait was cancelled
				// 清空当前节点,将上一个节点的next指向当前节点的next
				clean(t, s);
				// 节点取消,直接返回
				return null;
			}
			// 走这,说明当前节点没有取消
			// 判断当前节点是否还在队列中
			if (!s.isOffList()) {           // not already unlinked
				// 将当前节点设置尾head
				advanceHead(t, s);          // unlink if head
				// 如果队列中为生产者,新增为消费者
				if (x != null)              // and forget fields
				// 将当前节点的item设置为自己,当前节点已经没有意义了
					s.item = s;
					// 线程位设置为null
				s.waiter = null;
			}
			// 匹配好了,数据也交换了,直接返回
			// 如果x!=null,说明队列中是生产者,当前是消费者,这边直接返回x的具体数据
			// 反之队列中是消费者,当前是生产者,直接返回自己的额数据
			return (x != null) ? (E)x : e;
		// 匹配队列中的角色
		// 到这说明队列中有数据,且进来的数据和队列中的类型不同
		} else {                            // complementary-mode
			// 因为是queue,先进先出,所以获取头结点的next节点
			// 作为要匹配的节点
			QNode m = h.next;               // node to fulfill
			// 作并发判断,如果头节点、尾节点、头节点的next节点发生变化
			if (t != tail || m == null || h != head)
				// 重新走循环
				continue;                   // inconsistent read
			// 获取头节点的next节点的数据,开始做匹配
			Object x = m.item;
			// isData,true代表生产者,false代表消费者
			// 如果满足isData == (x != null),代表当前出现了并发问题,
			// 因为走到这一步,假如x有数据,代表队列是生产者,isData一定为false
			if (isData == (x != null) ||    // m already fulfilled
				// 如果排队的节点取消,就会出现x==m,就会将当前的QNode中的item指向QNode,也就是指向自己
			        x == m ||                   // m cancelled
			        // 如果前面两个条件都没满足,就可以开始交换数据
			        // 将head的next换成最新的插入的数据
			        // 数据交换的目的就是为了将生产者的数据传递给消费者
			        // 如果交换失败,说明有并发问题,重新设置head节点,并且再走一次循环
			        !m.casItem(x, e)) {         // lost CAS
				// 进来说明有并发问题,交换失败,需要重新设置head并重走循环
				advanceHead(h, m);          // dequeue and retry
				continue;
			}
			// 重新设置head
			advanceHead(h, m);              // successfully fulfilled
			// 唤醒head.next中的线程
			LockSupport.unpark(m.waiter);
			// 匹配好了,数据也交换了,直接返回
			// 如果x!=null,说明队列中是生产者,当前是消费者,这边直接返回x的具体数据
			// 反之队列中是消费者,当前是生产者,直接返回自己的额数据
			return (x != null) ? (E)x : e;
		}
	}
}