三:锁
一:锁的分类
可重入锁-不可重入锁
java中提供的synchronized、ReentrantLock、ReentrantReadWriteLock都是可重入锁
ThreadPoolExecutor中的Worker实现的是不可重入锁
重入:当前线程获取A锁,获取之后尝试再次获取A锁是可以直接拿到的
不可重入:当前线程获取A锁,获取之后尝试再次获取A锁是不可以直接拿到的,需要等待自己释放锁之后再次竞争锁才能得到
package com.xqm.juc.lock.one;
public class Test01 {
private static volatile Test01 test;
private Test01(){};
// DCL:double check lock双重检查锁
private static Test01 getInstance(){
if (test==null){
synchronized (Test01.class){
if (test ==null){
// 再次获取锁能获取直接获取到,这就是可重入锁
synchronized (Test01.class){
test=new Test01();
}
}
}
}
return test;
}
}
乐观锁-悲观锁
java中提供的synchronized、ReentrantLock、ReentrantReadWriteLock都是悲观锁
CAS操作使用的是乐观锁的实现
Atomic原子类中,底层就是使用CAS乐观锁实现的
乐观锁:乐观状态,每次都认为不会发生多线程并发问题,等待之后处理。获取不到锁资源,可以让CPU再次调度,重新获取锁资源。
悲观锁:悲观状态,认为每一次都会发生并发问题。当获取不到锁资源时,会将当前线程挂起,进入到(BLOCKED、WAITING)状态。线程挂起会涉及到用户态和内核态的切换,非常消耗资源。
用户态:JVM可以自行执行的指令,不需要借助操作系统执行
内核态:JVM不可以自行执行,需要操作系统才能执行
公平锁-非公平锁
java中提供的synchronized只能是非公平锁,ReentrantLock、ReentrantReadWriteLock默认是非公平锁,但是也可以实现公平锁。
公平锁:根据想要获取的时间进行排队来一个一个获取锁资源,先到先得。
非公平锁:一个线程想要获取锁,不管锁是否被占有,先尝试是否能够获取到锁。
-
拿到锁资源:插队成功
-
未拿到锁资源:进行排队,当之前线程释放锁之后,再次尝试获取锁
共享锁-互斥锁
java中提供的synchronized、ReentrantLock是互斥锁,ReentrantReadWriteLock中写锁是互斥锁,读锁是共享锁
共享锁:同一时间,可以有多个线程获取共享锁
互斥锁:同一时间,只能有一个线程获取到互斥锁
二:Synchronized
类锁、对象锁
synchronized使用方法:同步方法、同步代码块
synchronzied锁是基于对象实现的
同步方法:
- static:使用的是当前类.class作为锁(类锁)
- 非static:使用的是当前对象(也就是this)作为锁(对象锁)
package com.xqm.juc.lock.one;
public class Test02 {
public static void main(String[] args) {
// 锁的是整个Test类
Test.a();
Test b=new Test();
// 锁的是new出来的对象
b.b();
}
}
class Test{
/**
* 锁的是当前类,也就是Test.class
*/
public static synchronized void a(){
System.out.println("aaa");
}
/**
* 锁的是当前类对象,也就是this
*/
public synchronized void b(){
System.out.println("bbb");
}
}
synchronized的优化
JDK1.5之后,推出了ReentrantLock,LocK的性能高于synchronized。因此在JDK1.6中,JDK团队就对synchronized做出大量优化。
- 锁消除:在synchronized修饰的代码中,如果不存在操作临界资源的情况,也就是没有多线程的问题,会触发锁消除,即便写了synchronized,也不会执行
package com.xqm.juc.lock.one; public class Test03 { public synchronized void method(){ // 没有操作临界资源 此时这个synchronized可以认为是没有 } }
- 锁粗化/膨胀:如果在一个循环中,频繁的获取和释放锁资源,这样会带来很大的消耗。锁膨胀就是将锁的范围扩大,避免频繁的竞争和获取锁资源带来不必要的消耗。
public void method1(){ for (int i = 0; i < 10000; i++) { synchronized (对象){ // 这里的synchronized就会触发锁膨胀 } } }
- 锁升级:ReentrantLock的实现,是基于CAS获取锁资源,如果拿不到锁资源,才会挂起线程。而synchronized在JDK1.6之前,如果获取不到锁,则立即挂起线程。在JDK1.6之后,就会从无锁-偏向级锁-轻量级锁-重量级锁进行锁升级。
- 无锁、匿名偏向:当前对象没有锁存在
- 偏向锁:如果当前锁资源,只有一个线程在频繁的获取和释放锁资源,那么就判断偏向标志位指向的线程是否是当前的线程,如果是,就直接拿到锁资源,不需要竞争。如果当前线程不是偏向标志位指向的那个线程,那么就基于CAS的方式,尝试获取锁资源。如果获取不到,就会触发锁升级,升级为轻量级锁
- 轻量级锁:会采用自旋锁的方式去频繁的以CAS的形式获取锁资源。如果成功获取到锁,直接获取资源。如果获取不到锁资源,则进行锁升级,升级为重量级锁。自旋锁采用的是自适应自旋锁(JVM会根据上次自旋的次数来决定)
- 重量级锁:最传统的方式,如果拿不到锁资源,就会挂起线程,出现用户态和内核态的切换
synchronized的实现原理
synchronized是基于对象实现的。
对象组成:
- 对象头:包括了关于堆对象的布局、类型、GC状态、同步状态和标识哈希码的基本信息。Java对象和vm内部对象都有一个共同的对象头格式。
- 实例数据:主要是存放类的数据信息,父类的信息,对象字段属性信息。
- 对齐填充:为了字节对齐,填充的数据。
对象
- 对象头:mark word、class point(类型指针)、数组长度
32位mark word
64位mark word
synchronized的锁升级
为了在程序中看到对象头的信息,需要导入依赖。
<!-- https://mvnrepository.com/artifact/org.openjdk.jol/jol-core -->
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.9</version>
</dependency>
Object对象
package com.xqm.juc.lock.one;
import org.openjdk.jol.info.ClassLayout;
public class Test04_Object {
public static void main(String[] args) {
Object o=new Object();
System.out.println(ClassLayout.parseInstance(o).toPrintable());
synchronized (o){
System.out.println(ClassLayout.parseInstance(o).toPrintable());
}
}
}
java.lang.Object object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 01 00 00 00 (00000001 00000000 00000000 00000000) (1)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total
java.lang.Object object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) e8 f0 c7 02 (11101000 11110000 11000111 00000010) (46657768)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total
第一个例子:
第一行:0 4 (object header) 01 00 00 00 (00000001 00000000 00000000 00000000) (1)
其中的00000001的最后三位001代表不是偏向锁(0),现在是无锁状态(01)
第二个例子:
第一行:0 4 (object header) e8 f0 c7 02 (11101000 11110000 11000111 00000010) (46657768)
其中的11101000最后两位00代表轻量级锁。(从无锁一下子到轻量级锁?)
原因如下:
锁默认情况下,开启了偏向锁延迟。
偏向锁采用CAS来获取锁
偏向锁升级为轻量级锁时,会涉及到偏向锁撤销,需要等到一个安全点,才可以撤销。在明知道有并发情况下,可以选择不开启偏向锁,或者设置偏向锁延迟开启。
因为JVM在启动时,需要加载大量的class文件到内存中,这个操作会涉及到synchronized的使用,为了避免出现偏向锁撤销操作,JVM启动初期,有个延迟4s启动偏向锁的操作。
如果正常开启了偏向锁,那么不会出现无锁状态,对象会编程匿名偏向。
这里是因为开启了偏向锁延迟开启,因此直接从无锁到轻量级锁。
因此,如果在代码前面sleep(4000);的话,那么直接开启了偏向锁,也就是101。
package com.xqm.juc.lock.one;
import org.openjdk.jol.info.ClassLayout;
public class Test04_Object {
public static void main(String[] args) throws InterruptedException {
Thread.sleep(4000);
Object o=new Object();
System.out.println(ClassLayout.parseInstance(o).toPrintable());
synchronized (o){
System.out.println(ClassLayout.parseInstance(o).toPrintable());
}
}
}
java.lang.Object object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 05 00 00 00 (00000101 00000000 00000000 00000000) (5)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total
java.lang.Object object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 05 60 e1 02 (00000101 01100000 11100001 00000010) (48324613)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total
轻量级锁->重量级锁(这里可能是偏向锁、轻量级锁、重量级锁,取决于cpu运行的不同情况)
package com.xqm.juc.lock.one;
import org.openjdk.jol.info.ClassLayout;
public class Test04_Object {
public static void main(String[] args) throws InterruptedException {
Thread.sleep(5000);
Object o=new Object();
System.out.println(ClassLayout.parseInstance(o).toPrintable());
new Thread(
()->{
synchronized (o){
System.out.println("thread:"+ClassLayout.parseInstance(o).toPrintable());
}
}
).start();
synchronized (o){
System.out.println("main:"+ClassLayout.parseInstance(o).toPrintable());
}
}
}
java.lang.Object object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 05 00 00 00 (00000101 00000000 00000000 00000000) (5)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total
main:java.lang.Object object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 05 60 55 03 (00000101 01100000 01010101 00000011) (55926789)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total
thread:java.lang.Object object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) da 2d 26 1d (11011010 00101101 00100110 00011101) (489041370)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total
状态:
Lock Record以及ObjectMonitor存储的内容
Lock Record是在栈中
重量级锁底层ObjectMonitor
需要找到openJDK。
网址:https://openjdk.org/ 或者 http://hg.openjdk.java.net/
找到ObjectMonitor的两个文件,hpp,cpp。
目录在:directory /src/share/vm/runtime/
ObjectMonitor() {
_header = NULL; // 存储着mark word
_count = 0; // 存储竞争锁的线程个数
_waiters = 0, // 现在wait线程的个数
_recursions = 0; // 标志synchronized锁重入的次数
_object = NULL;
_owner = NULL; // 持有锁的线程
_WaitSet = NULL; // 保存wait的线程信息,是双向链表
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ; // 获取锁竞争资源失败,线程要放到当前的链表中
FreeNext = NULL ;
_EntryList = NULL ; // _cxq以及被唤醒的waitSet中的线程,在一定机制下,放入到entrySet中
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
_previous_owner_tid = 0;
}
tryLock函数
int ObjectMonitor::TryLock (Thread * Self) {
for (;;) {
// 拿到持有锁的线程
void * own = _owner ;
// 如果有线程持有锁,直接返回
if (own != NULL) return 0 ;
// 说明没有线程持有锁,cmpxchg指令就是底层的CAS实现
if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
// 成功获取锁资源
return 1 ;
}
// 这里重试操作没什么意义,表示没拿到锁,直接返回-1
if (true) return -1 ;
}
}
try_enty
bool ObjectMonitor::try_enter(Thread* THREAD) {
// 当前线程不是持有锁的线程,判断owner是不是当前线程
if (THREAD != _owner) {
// 判断当前持有锁的线程是否是当前线程,也就是拿到锁的线程不是owner,说明轻量级锁刚刚升级为重量级锁,还没转换,因为轻量级锁是在栈中
if (THREAD->is_lock_owned ((address)_owner)) {
_owner = THREAD ;
_recursions = 1 ;
OwnerIsThread = 1 ;
return true;
}
// 如果既不是owner,也不是持有锁的线程,那么就CAS操作,尝试获取锁
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
// 没拿到锁,告辞
return false;
}
// 拿到锁
return true;
} else {
// 是当前线程,说明是锁重入操作
_recursions++;
return true;
}
}
entry:想方设法拿到锁资源,如果没拿到,挂起放到_cxq单向链表中
void ATTR ObjectMonitor::enter(TRAPS) {
// 拿到当前线程
Thread * const Self = THREAD ;
void * cur ;
// CAS
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
if (cur == NULL) {
// 拿到锁资源
return ;
}
// 锁重入操作
if (cur == Self) {
_recursions ++ ;
return ;
}
// 轻量级锁过来,升级为重量级锁
if (Self->is_lock_owned ((address)cur)) {
_recursions = 1 ;
_owner = Self ;
OwnerIsThread = 1 ;
return ;
}
Self->_Stalled = intptr_t(this) ;
if (Knob_SpinEarly && TrySpin (Self) > 0) {
Self->_Stalled = 0 ;
return ;
}
JavaThread * jt = (JavaThread *) Self ;
// 没拿到锁资源,count++
Atomic::inc_ptr(&_count);
JFR_ONLY(JfrConditionalFlushWithStacktrace<EventJavaMonitorEnter> flush(jt);)
EventJavaMonitorEnter event;
if (event.should_commit()) {
event.set_monitorClass(((oop)this->object())->klass());
event.set_address((uintptr_t)(this->object_addr()));
}
{
JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this);
Self->set_current_pending_monitor(this);
DTRACE_MONITOR_PROBE(contended__enter, this, object(), jt);
if (JvmtiExport::should_post_monitor_contended_enter()) {
JvmtiExport::post_monitor_contended_enter(jt, this);
}
OSThreadContendState osts(Self->osthread());
ThreadBlockInVM tbivm(jt);
for (;;) {
jt->set_suspend_equivalent();
// 入队操作,进入到cxq中
EnterI (THREAD) ;
if (!ExitSuspendEquivalent(jt)) break ;
_recursions = 0 ;
_succ = NULL ;
exit (false, Self) ;
jt->java_suspend_self();
}
Self->set_current_pending_monitor(NULL);
}
//count--,表示开始去拿锁资源
Atomic::dec_ptr(&_count);
Self->_Stalled = 0 ;
DTRACE_MONITOR_PROBE(contended__entered, this, object(), jt);
if (JvmtiExport::should_post_monitor_contended_entered()) {
JvmtiExport::post_monitor_contended_entered(jt, this);
}
if (event.should_commit()) {
event.set_previousOwner((uintptr_t)_previous_owner_tid);
event.commit();
}
if (ObjectMonitor::_sync_ContendedLockAttempts != NULL) {
ObjectMonitor::_sync_ContendedLockAttempts->inc() ;
}
}
EnterI
ObjectWaiter * nxt ;
// 循环入队,入队之前还得tryLock获取锁资源
for (;;) {
node._next = nxt = _cxq ;
// CAS的方式入队
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// Interference - the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}
三:ReentrantLock
ReentrantLock和synchronized的区别
- synchronized是java关键字,ReentrantLock是java.util.concurrent包下的类。都是在JVM层面实现互斥锁。
- synchronized是非公平锁,ReentrantLock可以是非公平锁也可以是公平锁
- synchronized加在同步方法和同步代码块上,ReentrantLock在try-catch-finally中使用,并且在finally中使用unlock释放
- synchronized存在锁升级,升级到重量级锁就不能进行锁降级。ReentrantLock不存在锁升级概念
- synchronized是基于不同锁级别实现的,比如重量级锁是ObjectMonitor,轻量级锁是自旋锁,偏向锁是偏向指针指向线程ID。ReentrantLock是基于AQS实现的。
- ReentrantLock支持等待可释放锁。可以添加多个条件Condition
AQS概述
AQS
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {.....}
AQS实际上就是JUC下的一个基类。JUC下很多内容都是基于AQS实现部分内容,比如ReentrantLock、ThreadPoolExecutor、阻塞队列、CountdownLatch、Semaphore、CyclicBarrier等等
首先AQS提供了一个由volatile修饰的,采用CAS修改int类型的state变量
private volatile int state;
其次AQS中维护了一个双向链表,每个节点都是Node对象,有prev和next
static final class Node {
// 共享锁
static final Node SHARED = new Node();
// 互斥锁
static final Node EXCLUSIVE = null;
// 节点状态,还有一个是0,代表初始化状态
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 描述节点的状态
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS中Node节点:
加锁流程源码分析
加锁流程概述
非公平锁流程
加锁流程源码分析
一共有三个方法:
- lock
- tryLock
- lockInterruptibly
Lock()
java测试类代码
package com.xqm.juc.lock.reentrantLock;
import java.util.concurrent.locks.ReentrantLock;
public class Test01 {
public static void main(String[] args) {
// 默认是非公平锁
ReentrantLock noFairLock=new ReentrantLock();
// 有参构造,设置为true,就是公平锁
ReentrantLock fairLock=new ReentrantLock(true);
noFairLock.lock();
try {
System.out.println("业务代码");
} catch (Exception e) {
e.printStackTrace();
} finally {
noFairLock.unlock();
}
}
}
lock有公平锁和非公平锁区分。
// 上锁
public void lock() {
sync.lock();
}
// abstract static class Sync extends AbstractQueuedSynchronizer {}
// setExclusiveOwnerThread是AbstractQueuedSynchronizer类中的方法
// acquire也是AbstractQueuedSynchronizer类中的方法
// 非公平锁中sync.lock()
final void lock() {
// 上来就先基于CAS的方式,尝试将state从0改为1
if (compareAndSetState(0, 1))
// 获取锁资源成功,会将当前线程设置到setExclusiveOwnerThread属性,代表是当前线程持有着锁资源
setExclusiveOwnerThread(Thread.currentThread());
else
// 执行acquire,尝试获取锁资源
acquire(1);
}
// 公平锁中sync.lock()
final void lock() {
// 执行acquire,尝试获取锁资源
acquire(1);
}
// AQS中setExclusiveOwnerThread方法
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
acquire方法:是公平锁和非公平锁的逻辑
// AQS中acquire()
public final void acquire(int arg) {
// tryAcquire方法,再次查看,当前线程是否可以获取锁资源
// 如果拿到锁资源,返回的是true,加上!,那么就是false,也就是直接不执行下面的语句了
if (!tryAcquire(arg) &&
// 执行这条语句代表没有拿到锁资源,Node.EXCLUSIVE代表互斥锁
// addWaiter(Node.EXCLUSIVE):将当前线程封装为Node节点,插入到AQS的双向链表的结尾
// acquireQueued:查看我是否是第一个排队的节点,如果是,那就再次尝试获取锁资源,并且如果长时间拿不到,就
// 挂起线程,如果不是第一个排队的节点,那就尝试挂起线程
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 当前线程中断
selfInterrupt();
}
tryAcquire:分为公平锁和非公平锁
// 非公平锁中的tryAcquire
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取state属性
int c = getState();
// 判断state是否为0,如果为0代表之前持有锁的线程释放了锁资源
if (c == 0) {
// 再次抢夺锁资源
if (compareAndSetState(0, acquires)) {
// 抢到锁资源就把state从0改为1
setExclusiveOwnerThread(current);
// 拿锁成功返回
return true;
}
// 不是0,说明线程持有着锁,持有锁的线程如果是当前线程,则是锁重入操作
} else if (current == getExclusiveOwnerThread()) {
// 将state加1
int nextc = c + acquires;
// 如果加1还是小于0 ,就是超出了int正数的最大范围
// int 最大范围是 01111111 11111111 11111111 11111111
// 如果加1之后 10000000 00000000 00000000 00000000 就是负数
// 说明重入次数超出界限
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 正常的将计算结果赋值给state
setState(nextc);
// 锁重入成功
return true;
}
// 如果state=1,并且拿锁的线程也不是当前线程,直接返回
return false;
}
// 公平锁的tryAcquire
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取state属性
int c = getState();
// 判断state是否为0,如果为0代表之前持有锁的线程释放了锁资源
if (c == 0) {
// hasQueuedPredecessors:查看AQS中是否有排队的Node,如果返回false,那么就可以抢占锁资源
// 两种情况返回false可以抢占资源:一种队列无节点,第二种当前线程是第一个节点
if (!hasQueuedPredecessors() &&
// 抢占锁资源
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
// 锁重入操作
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 查看AQS中是否有排队的Node,如果返回false,就可以去抢锁了
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// 如果头尾相等返回false,证明没有Node排队
// 头节点的next节点不为null,并且s节点的线程为当前线程,那么就返回false,也就是可以抢占锁资源
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
addWaiter():将当前没有拿到锁资源的线程,封装为Node节点,放入AQS的双向链表中去排队
// addWaiter():将当前没有拿到锁资源的线程,封装为Node节点,放入AQS的双向链表中
// 这里的mode代表互斥锁
private Node addWaiter(Node mode) {
// 将当前线程封装成Node
Node node = new Node(Thread.currentThread(), mode);
// 拿到尾节点
Node pred = tail;
// 如果尾节点不为null(只有链表中没有节点的时候,尾节点才会指向null)
if (pred != null) {
// 当前节点的prev指向尾节点
node.prev = pred;
// 使用CAS,保证线程安全,将当前节点设置为tail
if (compareAndSetTail(pred, node)) {
// 前一个节点的next指向当前node
pred.next = node;
return node;
}
}
// 如果CAS失败,以死循环的方式,保证当前线程的Node一定可以放到AQS队列的末尾
enq(node);
return node;
}
// enq():死循环,知道保证node放到链表末尾为止
private Node enq(final Node node) {
for (;;) {
// 拿到尾节点
Node t = tail;
// 如果尾节点为null,AQS中一个节点也没有
if (t == null) { // Must initialize
// new一个Node,也就是构建一个伪节点,设置为头
if (compareAndSetHead(new Node()))
// 尾指向头
tail = head;
} else {
// 以CAS的方式,在AQS中有节点之后,插入到AQS队列中的末尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued():判断当前线程是否还能再次获取锁资源,如果不能获取资源,就尝试将当前线程挂起
// acquireQueued():当前没有拿到锁资源,并且到AQS排队之后触发的方法
// 这里不需要考虑中断,lock涉及不到,但是tryLock和lockInterruptibly才会涉及到中断
// node是队列中节点,arg就是1
final boolean acquireQueued(final Node node, int arg) {
// 获取锁资源是否失败,默认是失败true,如果获取锁成功,那就是false
// 真正触发的还是tryLock和lockInterruptibly
boolean failed = true;
try {
boolean interrupted = false;
// 死循环
for (;;) {
// node.predecessor():获取当前节点的上一个节点
final Node p = node.predecessor();
// 前继节点是否是head,如果是head,再次执行tryAcquire尝试获取锁资源
if (p == head && tryAcquire(arg)) {
// 获取锁资源成功,将当前节点设置为头节点,也就是这个节点在链表中没意义了
setHead(node);
// p节点也就是当前节点的前继节点,将next设置为null,也就是帮助gc更快回收
p.next = null; // help GC
// 代表获取锁资源成功了
failed = false;
return interrupted;
}
// 没拿到锁资源,是否可以挂起当前线程,传递前继节点和当前节点
// 基于上一个节点的状态来判断当前节点是否可以挂起,如果上一个节点状态为-1.代表可以挂起,返回true
// parkAndCheckInterrupt:挂起线程,running--> wait状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果获取资源成功了,就不会执行下面的语句
if (failed)
// 在lock方法中,基本不会执行
cancelAcquire(node);
}
}
// 获取当前节点的上一个节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 获取到锁资源,则节点设置为头节点,其他属性设置为null
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
// 当前node没有拿到锁资源或者没有资格去竞争锁资源,check是否挂起当前线程
// 挂起之前要查看前继节点的状态,看是否之后能唤醒当前节点,如果不能唤醒,就不能挂起,否则造成这个节点死掉
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取上一个节点状态
int ws = pred.waitStatus;
// SIGNAL状态:值为-1,表示当前节点的后继节点可以挂起线程,后续会唤醒后继节点
if (ws == Node.SIGNAL)
// 只有当上一个节点的状态为-1,当前节点的线程才能挂起
return true;
// CANCELLED:值为1,代表当前节点已经被取消了,不能挂起后继节点的线程
if (ws > 0) {
// 如果当前节点是取消状态,那么需要往前找到一个状态不为1的节点
// 找到状态不为1的节点,设置prev和next的关系
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
// 如果状态不是1,不是-1,那么就剩0,-2,-3
// 0代表刚初始化的节点,-2是涉及到condition的状态,-3是涉及共享锁的操作
} else {
// 上一个节点的状态不是-1或者1,那就代表节点状态正常,那么就将上一个节点状态设置为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 挂起线程
private final boolean parkAndCheckInterrupt() {
// 挂起线程
LockSupport.park(this);
return Thread.interrupted();
}
tryLock()
无参:
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
和lock()中tryAcquire()一模一样
// tryLock无论是公平锁还是非公平锁,都会走非公平锁抢占锁资源的操作
// 就是拿到state的值,如果是0 ,直接CAS尝试获取锁,否则就是锁重入操作
// 如果没抢到锁资源,或者不是锁重入,那么直接返回false
public boolean tryLock() {
// 非公平锁的竞争锁操作
return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
有参:输入时间和单位。在时间内拿到资源,那就执行业务,否则就做一些其他处理
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
第一波分析
// tryLock有参构造函数
public boolean tryLock(long timeout, TimeUnit unit)
// 如果被中断,会抛出异常
throws InterruptedException {
// 无论输入什么单位,都会转成纳秒
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// tryLock(time,unit)执行的方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 判断线程的中断标记位,是不是从false被改为true,如果是,直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquire分为公平和非公平两种执行方式。和lock()方法一样,参考上面
// tryAcquire:拿锁,如果拿锁成功,直接返回
return tryAcquire(arg) ||
// 如果拿锁失败,等待指定时间
doAcquireNanos(arg, nanosTimeout);
}
// 拿锁失败,等待指定时间
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果时间小于等于0,那么直接返回false
if (nanosTimeout <= 0L)
return false;
// 设置结束时间
final long deadline = System.nanoTime() + nanosTimeout;
// addWaiter,扔到AQS队列中
final Node node = addWaiter(Node.EXCLUSIVE);
// 拿锁失败,默认是true
boolean failed = true;
try {
// 死循环
for (;;) {
// AQS中,如果当前node是head的next,直接抢锁资源,抢到则返回
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 计算剩余的时间
nanosTimeout = deadline - System.nanoTime();
// 判断时间是否用尽
if (nanosTimeout <= 0L)
return false;
// shouldParkAfterFailedAcquire:根据上一个节点状态看是否能挂起线程
// spinForTimeoutThreshold:1000L
if (shouldParkAfterFailedAcquire(p, node) &&
// 避免时间太少,就不用挂起线程
nanosTimeout > spinForTimeoutThreshold)
// 如果时间足够,那就线程挂起剩余的时间,不需要别人唤醒,时间到了自己唤醒
LockSupport.parkNanos(this, nanosTimeout);
// 线程被唤醒,一种是时间到了,另一种是中断标记位改变
// 中断唤醒就抛出异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
// 除非线程拿到锁,否则就执行
if (failed)
cancelAcquire(node);
}
}
取消在AQS中排队:cancleAcquire()
取消节点整体操作流程:
1.线程设置为null
2.往前找到有效节点作为当前节点的prev
3.将waitState设置为1,代表取消
4.脱离整个AQS队列三种情况
(4.1).当前node是tail
(4.2).当前ndoe是head的后继节点
(4.3).不是tail节点,也不是head的后继节点
// 取消在AQS中排队
// 取消节点整体操作流程:
// 1.线程设置为null
// 2.往前找到有效节点作为当前节点的prev
// 3.将waitState设置为1,代表取消
// 4.脱离整个AQS队列
// (4.1).当前node是tail
// (4.2).当前ndoe是head的后继节点
// (4.3).不是tail节点,也不是head的后继节点
private void cancelAcquire(Node node) {
// 如果当前node为null,直接忽略
if (node == null)
return;
// 把当前node的线程设置为null
node.thread = null;
// 往前找状态不为1的节点
Node pred = node.prev;
// waitStatus>0,其实就是1,为取消状态,那么就一直往前找
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 拿到上一个节点的next,不一定是当前节点,因为之前一直往前找过
Node predNext = pred.next;
// 当前节点状态设置为1,就是取消状态
node.waitStatus = Node.CANCELLED;
// 下面则是脱离AQS的操作
// 如果当前节点是尾节点,那么使用CAS操作,将当前节点的前继节点设置为tail尾节点
if (node == tail && compareAndSetTail(node, pred)) {
// 将pred节点的next指向null
compareAndSetNext(pred, predNext, null);
} else {
// 不是尾节点或者CAS操作失败
int ws;
// 如果前置节点不是head的后继节点
if (pred != head &&
// 前继节点的状态是否为-1,是的话代表后继节点是挂起的状态.
// 如果不是-1并且小于等于0,也就是上一个节点的状态不是取消状态,那么就改为-1
((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
// 前置节点的线程不为null,避免并发导致节点为head节点或取消节点,线程才会为null
&& pred.thread != null) {
// 下面一段是为了保证前继节点是有效节点,能够唤醒后面的节点
// 获取当前节点的下一个节点
Node next = node.next;
// 如果下一个节点不为null并且是有效节点
if (next != null && next.waitStatus <= 0)
// 把前一个节点的next从当前节点指向当前节点的下一个节点
compareAndSetNext(pred, predNext, next);
} else {
// 当前节点是head的后继节点,那么唤醒后面的节点
unparkSuccessor(node);
}
// 把我自己的next指向我自己,为了方便GC回收
node.next = node; // help GC
}
}
lockInterruptibly()
和tryLock()不同,tryLock()会等待指定的时间,直到时间到或者被中断才会结束,但是lockInterruptibly()则是一直等待,没有时间限制,直到被中断为止或者拿到锁资源。
lock.lockInterruptibly();
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// lock.lockInterruptibly()函数
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// 和tryLock唯一不同
doAcquireInterruptibly(arg);
}
// 和tryLock唯一区别在于这个函数
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
// addWaiter,扔到AQS队列中
final Node node = addWaiter(Node.EXCLUSIVE);
// 抢夺资源失败,默认为true
boolean failed = true;
try {
for (;;) {
// p是当前节点的前继节点
final Node p = node.predecessor();
// AQS中,如果当前node是head的next,直接抢锁资源,抢到则返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// // shouldParkAfterFailedAcquire:根据上一个节点状态看是否能挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
// 检查中断异常,返回true的话,代表要被唤醒
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// 从队列中取消节点,见tryLock()
cancelAcquire(node);
}
}
// 检查中断异常
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 如果返回true,代表要从挂起状态要被唤醒
return Thread.interrupted();
}
释放锁流程分析
释放锁流程概述
释放锁流程源码分析
public void unlock() {
// 无论是公平锁还是非公平锁,走的都是tryRelease()
sync.release(1);
}
// 释放锁的核心流程
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 如果锁释放干净,走这个流程
Node h = head;
// head不为null,也就是head后面有排队的Node,并且node状态不为new,也就是waitStatus!=0
// 其实也就是head的waitStatus为-1,后面有挂起的排队线程
if (h != null && h.waitStatus != 0)
// 唤醒排队线程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 拿到state-1的值,但是并没有赋值给state
int c = getState() - releases;
// 如果当前线程不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// free,代表当前锁是否释放干净,因为有重入操作,而每次state只减1,可能state不等于0,也就是没释放干净锁
boolean free = false;
// 如果是0 ,代表释放干净
if (c == 0) {
free = true;
// 将持有锁的线程置为null
setExclusiveOwnerThread(null);
}
// 将state设为c,也就是state-1
setState(c);
// 如果锁资源释放干净,返回true,否则返回false
return free;
}
// 传递的是head,头节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 头节点的状态为-1.
if (ws < 0)
// 基于CAS,将头节点的状态从-1改为0
compareAndSetWaitStatus(node, ws, 0);
// 拿到头节点的后续节点
Node s = node.next;
// 如果后续节点为null,或者后续节点为1,代表节点取消
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找到有效节点,也就是waitStatus<=0
// 之所以不从前往后找,是为了防止,中间某个节点的thread为null,这样就找不到下一个节点了
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
// 有效线程赋值给s
s = t;
}
// 找到需要被唤醒的节点,唤醒节点
if (s != null)
LockSupport.unpark(s.thread);
}
AQS常见问题
1.AQS中为什么要虚拟一个head节点
Node节点中有个waitStatus状态,其中-1代表当前的节点的后继节点是挂起状态,需要被唤醒。
AQS中提供了ReentrantLock的基本实现,而在释放锁资源的时候,需要考虑是否执行unparkSuccessor方法,去唤醒后继节点。
如果当前节点的后继节点的线程挂起,那么就将当前节点的状态设置为-1,-1状态的出现是为了避免重复唤醒或者重复释放资源的过程。
因为AQS中排队Node中的线程如果挂起了,是无法主动唤醒的,需要释放锁或者资源的节点的线程去唤醒。
唤醒节点需要从整个AQS双向链表中找到离head最近的有效节点去唤醒。这种操作可能需要从后往前去整个遍历双向链表。
为了避免不必要的遍历双向链表操作,提供了一个-1的状态,如果只有一个Node节点到AQS链表中,所以发现如果是第一个Node进来排队,就需要虚拟一个Node节点作为头,head里面的waitStatus来监听后面节点的状态,是否有需要挂起的线程
2.AQS中为什么要使用双向链表而不是单向链表
因为AQS中存放没有获取到资源的Node,在方法lockInterruptibly中,在竞争锁资源的时候,是可以被中断的,中断后调用cancelAcquire()方法从双向链表中取消节点,将节点的状态置为1,从AQS队列中移除该节点。如果采用单向链表,就需要遍历整个链表,将前继节点指向当前节点的后继节点,这样成本高。其次在唤醒后继节点时,如果是单向链表,也需要遍历整个链表。
四:ReentrantReadWriteLock
为什么出现读写锁
synchronized和ReentrantLock都是互斥锁。
如果说有一种情况,读操作远大于写操作,如果全部使用互斥锁,效率太低。
读和读之间是不互斥的,因此可以并发执行。写和写之间是互斥的,因此需要互斥锁。
读写之间也是互斥的,如果读锁不释放,写操作是拿不到锁的。
读锁:结果是子线程先拿到锁,1s后主线程也拿到锁,5s后释放。
package com.xqm.juc.lock.reentrantLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Test02 {
static ReentrantReadWriteLock lock=new ReentrantReadWriteLock();
static ReentrantReadWriteLock.WriteLock writeLock=lock.writeLock();
static ReentrantReadWriteLock.ReadLock readLock=lock.readLock();
public static void main(String[] args) {
new Thread(()->{
readLock.lock();
try {
System.out.println("子线程");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
readLock.unlock();
}
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
readLock.lock();
try {
System.out.println("主线程");
} catch (Exception e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}
}
写锁:子线程先拿到写锁,等5s之后释放了锁,主线程才能拿到读锁。
package com.xqm.juc.lock.reentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Test03 {
static ReentrantReadWriteLock lock=new ReentrantReadWriteLock();
static ReentrantReadWriteLock.WriteLock writeLock=lock.writeLock();
static ReentrantReadWriteLock.ReadLock readLock=lock.readLock();
public static void main(String[] args) {
new Thread(()->{
writeLock.lock();
try {
System.out.println("子线程");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
writeLock.unlock();
}
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
writeLock.lock();
try {
System.out.println("主线程");
} catch (Exception e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}
}
读写锁实现原理
ReentrantReadWriteLock还是基于AQS实现的,还是对state操作,拿到锁资源就去执行业务,没有拿到就去AQS的队列中排队。
读锁:基于state的高16位进行操作
写锁:基于state的低16位进行操作
ReentrantReadWriteLock也是可重入锁。
读锁重入:因为读锁是共享锁,在获取锁时,对高16位进行加1的操作。因为是共享锁,因此同一时间会有多个线程对state进行操作。这样无法确认每个线程重入的次数,为了记录每个线程锁重入的次数,在读操作线程中有ThreadLocal去记录各自的锁重入的次数
写锁重入:写锁的重入方式和ReentrantLock重入一样,都是对state进行加1的操作,只不过state最大值的范围
变小了
写锁的饥饿问题:当有读操作首先拿到锁时,其他读操作到来,可以拿到读锁,直接修改state即可。但是突然来了一个写操作,因为读锁和写锁时互斥的,写操作一直得不到锁,之后可能一直有读操作进来,导致写操作长时间无法获取锁,因此写操作产生饥饿问题。
读线程拿到锁资源后,如果再有读线程需要获取锁资源,需要去AQS队列中排队。如果队列中有写线程,那么写线程后的读线程是拿不到锁资源。比如读-读-写-读-读,那么写后面的读是拿不到锁的。
写锁分析
写锁加锁流程
写锁加锁源码分析
// 写锁入口
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
// tryAcquire和ReentrantLock不一样,只有一个实现方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 读写锁的写锁的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
// 拿到当前线程
Thread current = Thread.currentThread();
// 拿到state
int c = getState();
// 得到state低16位的值
int w = exclusiveCount(c);
// 判断线程是否持有锁资源,如果有线程持有锁资源,进入if
if (c != 0) {
// w==0 当前没有线程持有写锁,代表有线程持有读锁,因为读写互斥,所以直接返回false
// current != getExclusiveOwnerThread() 线程持有锁是否是当前线程,不是也返回,代表不是重入锁
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 到这代表当前线程持有写锁。锁重入操作,加1之后看超没超过00000000 00000000 11111111 11111111
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 没有超过锁重入次数,那么设置state+1
setState(c + acquires);
return true;
}
// 如果没有线程持有读锁和写锁,那么尝试获取锁资源
// writerShouldBlock()将会区分公平锁和非公平锁
if (writerShouldBlock() ||
// CAS尝试拿锁
!compareAndSetState(c, c + acquires))
return false;
// 拿锁成功
setExclusiveOwnerThread(current);
return true;
}
//
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// MAX_COUNT也是 00000000 00000000 11111111 11111111
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 1往左移16位减1
// 00000000 00000000 00000000 00000001
// 00000000 00000001 00000000 00000000 左移16位
// 00000000 00000000 11111111 11111111 减1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 写锁的tryAcquire中方法,将state的低16位的值拿到
// c & EXCLUSIVE_MASK(00000000 00000000 11111111 11111111)
// 与运算,都为1才为1,否则为0
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// writerShouldBlock():非公平,直接返回false
final boolean writerShouldBlock() {
return false; // writers can always barge
}
// writerShouldBlock():公平,
// 查看AQS队列中是否有node,如果没有排队,返回false,如果有排队,看是否是head的next,如果是,返回false,否则返回true
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
// 查看AQS中是否有排队的Node,如果返回false,就可以去抢锁了
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// 如果头尾相等返回false,证明没有Node排队
// 头节点的next节点不为null,并且s节点的线程为当前线程,那么就返回false,也就是可以抢占锁资源
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
写锁释放锁流程和源码分析
流程:释放的流程和ReentrantLock一致,只是在判断释放是否干净时,判断低16位的值
// 写锁释放锁的入口
public void unlock() {
sync.release(1);
}
// 只有tryRelease和reentrantLock不同
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 如果释放干净,查看队列,修改状态为-1,查看状态对不对,不对就从结尾遍历到离head最近的Node,然后唤醒
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 判断当前线程是否是持有锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//获取state-1
int nextc = getState() - releases;
// 判断低16位是否为0,如果为0 ,free为true,代表释放干净
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 将持有锁的线程设置为null
setExclusiveOwnerThread(null);
// 设置state
setState(nextc);
// 释放干净,返回true。写锁是有重入的,这里需要返回false,不去释放排队的Node
return free;
}
// // 判断当前线程是否是持有锁的线程
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
读锁分析
读锁加锁流程
判断是否是写锁还是读锁。如果当前线程占有着写锁,同时读操作也来抢占读锁,是可以直接获取到锁资源的。
读锁加锁源码分析1
第一部分分析:
// 读锁入口
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
// 竞争锁资源,返回-1代表拿锁失败
if (tryAcquireShared(arg) < 0)
// 没拿到锁资源去排队
doAcquireShared(arg);
}
// 读锁竞争锁资源的操作
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 拿到state
int c = getState();
// 先拿到低16位,判断是否不等于0 ,返回true代表有写锁占用锁资源
// 那么就再次判断拿到写锁的是否是当前线程
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
// 写锁被其他线程占用,拿不到锁,返回-1,去排队
return -1;
// 到这一步有两种情况,一种是没有线程持有写锁,一种是当前线程持有写锁
// 获取读锁的信息,获取state高16位
int r = sharedCount(c);
// 这里分为公平锁和非公平锁,返回false则继续往下走
// 公平锁:查看队列中是否有排队情况,有返回true,直接return,没有则返回false
// 非公平锁:没有排队的,直接抢,有排队的,
// 如果是读锁,如果出现这个情况,大部分是写锁刚刚释放资源,后续node还没来得及拿到锁,那么就抢占资源
if (!readerShouldBlock() &&
// 判断持有读锁的数量是否达到临界值
r < MAX_COUNT &&
// SHARED_UNIT:值为1,也就是state+1
compareAndSetState(c, c + SHARED_UNIT)) {
// 读锁的拿锁和可重入操作,暂时不看
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
// 获取高16位之后的公平锁操作
final boolean readerShouldBlock() {
// 看是否有排队以及是否是head的next节点
return hasQueuedPredecessors();
}
// 获取高16位之后的非公平锁操作
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
// 只要有一个是false,那就返回false,也就是可以去拿锁
return (h = head) != null && // head为null,可以去抢占锁
(s = h.next) != null && // head.next,也可以去抢占锁
!s.isShared() && // head.next是共享锁,可以去抢占锁
s.thread != null; // head.next的thread为null,可以去抢占锁资源
}
// 判断node是否是共享锁
final boolean isShared() {
return nextWaiter == SHARED;
}
读锁重入流程
读锁为了记录重入的次数,需要让每个线程用ThreadLocal去记录重入次数,使用map去存储代价太高。
ReentrantReadWriteLock对读锁重入做了一些优化:
1.第一个拿到读锁资源的线程,不需要使用ThreadLocal存储,内部提供两个属性来记录第一个拿到读锁资源的线程(tid:线程id,count:重入次数)
2.ReentrantReadWriteLock在内部对ThreadLocal做了一个封装,基于HoldCounter的对象存储重入次数,由于是ThreadLocal,所以直接count++没有线程安全的问题
3.内部提供了firstReader记录第一个拿到锁资源的线程,firstReaderHoldCount记录第一个拿到锁资源重入的次数
4.cachedHoldCounter记录最后拿到锁资源线程的重入次数
5.readHolds记录当前线程的信息
读锁重入流程:
1.判断当前线程是否是第一个拿到锁资源的线程,也就是state的高16位是否为0,如果是,那么设置firstReader和firstReaderHoldCount设置为当前线程的信息
2.判断当前线程是否是firstReader,如果是,那么直接对firstReaderHoldCount进行++操作
3.到这一步说明不是第一个拿到锁的线程。获取cachedHoldCounter,也就是最后一个拿到锁资源线程的重入次数,判断是否是当前线程
3.1 如果不是,那么将cachedHoldCounter设置为当前线程的重入次数(因为当前线程是最后一个获取锁的),如果是最后一个拿到锁线程的话,那么判断当前线程重入次数是否为0
3.2 如果是的话,就需要重新设置当前的锁重入信息到readHolds中(ThreadLocal中),算是初始化操作,重入的次数是0
3.3对HoldCounter的count进行++操作
使用HoldCounter来存储线程重入的次数
static final class HoldCounter {
// 重入次数
int count = 0;
// 线程id
final long tid = getThreadId(Thread.currentThread());
}
// 封装ThreadLocal
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 记录第一个线程
private transient Thread firstReader = null;
// 记录第一个线程重入的次数
private transient int firstReaderHoldCount;
// 记录最后一个拿到锁线程的重入次数
private transient HoldCounter cachedHoldCounter;
// 记录当前线程的信息
private transient ThreadLocalHoldCounter readHolds;
读锁重入源码
// 读锁竞争锁资源的操作
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 拿到state
int c = getState();
// 先拿到低16位,判断是否不等于0 ,返回true代表有写锁占用锁资源
// 那么就再次判断拿到写锁的是否是当前线程
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
// 写锁被其他线程占用,拿不到锁,返回-1,去排队
return -1;
// 到这一步有两种情况,一种是没有线程持有写锁,一种是当前线程持有写锁
// 获取读锁的信息,获取state高16位
int r = sharedCount(c);
// 这里分为公平锁和非公平锁,返回false则继续往下走
// 公平锁:查看队列中是否有排队情况,有返回true,直接return,没有则返回false
// 非公平锁:没有排队的,直接抢,有排队的,
// 如果是读锁,如果出现这个情况,大部分是写锁刚刚释放资源,后续node还没来得及拿到锁,那么就抢占资源
if (!readerShouldBlock() &&
// 判断持有读锁的数量是否达到临界值
r < MAX_COUNT &&
// SHARED_UNIT:值为1,也就是state+1
compareAndSetState(c, c + SHARED_UNIT)) {
// ---------------------------读锁重入操作-----------------------------------------
// 获取state高16位值,如果是0 ,说明是第一个拿到锁资源的线程
if (r == 0) {
// 第一个线程赋值给firstReader
firstReader = current;
// 重入的次数为1
firstReaderHoldCount = 1;
// 判断当前线程是否是第一个线程
} else if (firstReader == current) {
// 如果是的话,说明是重入操作,直接++
firstReaderHoldCount++;
// 到这说明不是第一个获取锁资源的线程
} else {
// 获取之前最后一个拿到锁资源线程
HoldCounter rh = cachedHoldCounter;
// 判断当前线程是否是最后一个拿到锁资源的线程
//
if (rh == null || rh.tid != getThreadId(current))
// 如果不是,那就把当前线程设为最后一个线程
cachedHoldCounter = rh = readHolds.get();
// 当前线程是最后一个拿到锁的线程 cachedHoldCounter,并且还是之前已经释放的线程,count=0
else if (rh.count == 0)
// 直接将当前线程的重入信息设置到ThreadLocal,当前线程就是之前最后一个拿锁的线程
readHolds.set(rh);
// 不管是不是最后一个线程,重入次数count++
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
读锁竞争循环源码
fullTryAcquireShared()
// tryAcquireShared中,如果没有拿到锁资源,走这个方法,尝试再次获取锁,逻辑和上面基本一样
// 读锁加锁的逻辑
final int fullTryAcquireShared(Thread current) {
// 当前线程的锁重入次数
HoldCounter rh = null;
// 死循环:要不拿到锁,要不就拿不到进行阻塞
for (;;) {
// 获取state
int c = getState();
// 判断当前有写锁在占用锁资源,并且不是当前线程,那么直接返回-1,去队列中排队
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 当前是否可以尝试获取锁资源:分为公平锁和非公平锁
} else if (readerShouldBlock()) {
// 无论公平还是非公平,只要进来,就要开始准备阻塞当前线程,也就是放到AQS队列中
// 下面这个if在处理ThreadLocal的内存泄露问题
if (firstReader == current) {
// 如果当前线程是firstReader,因为不使用ThreadLocal,就什么都不用做
} else {
// 第一次进来是null
if (rh == null) {
// 拿到最后一次获取读锁的线程
rh = cachedHoldCounter;
// 如果线程为null(第一次都为null),或者当前线程不是最后一次获取读锁的线程,没拿到重入数
if (rh == null || rh.tid != getThreadId(current)) {
// 从ThreadLocal中拿到重入的计数器
rh = readHolds.get();
// 如果重入数目为0,说明之前没拿到过锁
if (rh.count == 0)
// 直接remove,防止内存泄露
readHolds.remove();
}
}
// 前面处理完之后,直接返回-1,代表可以加入AQS队列中了
if (rh.count == 0)
return -1;
}
}
// 判断重入次数或者超出最大次数,抛异常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 否则CAS尝试获取锁资源
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
读线程在AQS队列获取锁资源的后续操作
1、正常如果都是读线程来获取读锁资源,不需要使用到AQS队列的,直接CAS操作即可
2、如果写线程持有着写锁,这是读线程就需要进入到AQS队列排队,可能会有多个读线程在AQS中。
当写锁释放资源后,会唤醒head后面的读线程,当head后面的读线程拿到锁资源后,还需要查看next节点是否也是读线程在阻塞,如果是,直接唤醒
public final void acquireShared(int arg) {
// 竞争锁资源,返回-1代表拿锁失败
if (tryAcquireShared(arg) < 0)
// 没拿到锁资源去排队
doAcquireShared(arg);
}
// doAcquireShared读锁未获取到锁资源,准备进AQS中排队
private void doAcquireShared(int arg) {
// 声明node,类型是共享锁,放到AQS队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 拿到上一个节点
final Node p = node.predecessor();
// 如果上一个节点是head
if (p == head) {
// 直接执行tryAcquireShared()
int r = tryAcquireShared(arg);
// 当写锁释放锁资源。队列中线程可以唤醒抢占锁,r>0,r有两种,-1是没拿到锁,1是拿到锁
if (r >= 0) {
// setHeadAndPropagate()拿到读锁之后,需要做的后续处理
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 找到上一个有效节点,将状态设置为-1,挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// AQS中node拿到读锁之后,做的后续操作
private void setHeadAndPropagate(Node node, int propagate) {
// 获取头节点
Node h = head;
// 将当前节点设置为头节点,因为当前节点获取到了锁资源,会释放head节点,当前节点替换为head,thread为null
setHead(node);
// propagate上面传下来,-1代表没拿到锁,1代表拿到锁,拿到锁才会走这个方法,因此propagate肯定为1
// 因此propagate > 0在这里的意义更多是在信号量处理JDK1.5的bug操作
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 拿到当前节点的next节点,如果next节点是共享锁,直接唤醒,因为也会得到读锁资源
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
读锁释放锁流程和源码分析
// 读锁释放锁
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 处理state的值,以及可重入的内容,如果返回true,就释放队列中的node
if (tryReleaseShared(arg)) {
// AQS队列的相关的事
doReleaseShared();
return true;
}
return false;
}
// 1.处理重入的次数 2.处理state
protected final boolean tryReleaseShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 如果当前线程是第一个获取锁的线程,不需要ThreadLocal
if (firstReader == current) {
// 如果当前线程重入次数只有一次的话
if (firstReaderHoldCount == 1)
// 直接释放当前线程
firstReader = null;
else
// 否则就把重入次数--
firstReaderHoldCount--;
} else {
// 获取cachedHoldCounter
HoldCounter rh = cachedHoldCounter;
// 当前线程是不是最后一个获取锁的线程
if (rh == null || rh.tid != getThreadId(current))
// 如果不是,当前线程就作为了最后一个线程
rh = readHolds.get();
int count = rh.count;
// 看当前线程重入次数是否小于等于1
if (count <= 1) {
// 如果是,直接释放
readHolds.remove();
// 如果释放多了,就不用再释放了,因为后面都会统一--
if (count <= 0)
throw unmatchedUnlockException();
}
// 当前线程重入次数--
--rh.count;
}
for (;;) {
int c = getState();
// 减去高16位
int nextc = c - SHARED_UNIT;
// CAS设置state为释放后的次数
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
// 唤醒AQS中排队的线程
private void doReleaseShared() {
// 死循环
for (;;) {
// 拿到头
Node h = head;
// 头不等于Null,并且不等于尾。说明有排队的node
if (h != null && h != tail) {
// 获取waitStatus
int ws = h.waitStatus;
// 判断是否为-1,如果是,说明后面有挂起的线程
if (ws == Node.SIGNAL) {
// 先基于CAS操作,将head的waitStatus从-1改为0,只要有一个node被唤醒就能改
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒后续节点
unparkSuccessor(h);
// 头节点状态不是-1
} else if (ws == 0 &&
// 把waitStatus从0改成了-3,这里不是给读写锁准备的,在Semaphore、countDownLatch中使用
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果队列中没有排队的node,直接break,没有要唤醒的node
if (h == head)
break;
}
}
五:死锁
操作系统2022版本有,已经有最新的死锁课程了,这里就不做过多讲解
查看这个课程: