三:锁

一:锁的分类

可重入锁-不可重入锁

java中提供的synchronized、ReentrantLock、ReentrantReadWriteLock都是可重入锁

ThreadPoolExecutor中的Worker实现的是不可重入锁

重入:当前线程获取A锁,获取之后尝试再次获取A锁是可以直接拿到的

不可重入:当前线程获取A锁,获取之后尝试再次获取A锁是不可以直接拿到的,需要等待自己释放锁之后再次竞争锁才能得到

package com.xqm.juc.lock.one;

public class Test01 {
    private static volatile Test01 test;

    private Test01(){};
    // DCL:double check lock双重检查锁
    private static Test01 getInstance(){
        if (test==null){
            synchronized (Test01.class){
                if (test ==null){
                    // 再次获取锁能获取直接获取到,这就是可重入锁
                    synchronized (Test01.class){
                        test=new Test01();
                    }
                }
            }
        }
        return test;
    }
}

乐观锁-悲观锁

java中提供的synchronized、ReentrantLock、ReentrantReadWriteLock都是悲观锁

CAS操作使用的是乐观锁的实现

Atomic原子类中,底层就是使用CAS乐观锁实现的

乐观锁:乐观状态,每次都认为不会发生多线程并发问题,等待之后处理。获取不到锁资源,可以让CPU再次调度,重新获取锁资源。

悲观锁:悲观状态,认为每一次都会发生并发问题。当获取不到锁资源时,会将当前线程挂起,进入到(BLOCKED、WAITING)状态。线程挂起会涉及到用户态和内核态的切换,非常消耗资源。

用户态:JVM可以自行执行的指令,不需要借助操作系统执行

内核态:JVM不可以自行执行,需要操作系统才能执行

公平锁-非公平锁

java中提供的synchronized只能是非公平锁,ReentrantLock、ReentrantReadWriteLock默认是非公平锁,但是也可以实现公平锁。

公平锁:根据想要获取的时间进行排队来一个一个获取锁资源,先到先得。

非公平锁:一个线程想要获取锁,不管锁是否被占有,先尝试是否能够获取到锁。

  • 拿到锁资源:插队成功
    
  • 未拿到锁资源:进行排队,当之前线程释放锁之后,再次尝试获取锁
    

共享锁-互斥锁

java中提供的synchronized、ReentrantLock是互斥锁,ReentrantReadWriteLock中写锁是互斥锁,读锁是共享锁

共享锁:同一时间,可以有多个线程获取共享锁

互斥锁:同一时间,只能有一个线程获取到互斥锁

二:Synchronized

类锁、对象锁

synchronized使用方法:同步方法、同步代码块

synchronzied锁是基于对象实现的

同步方法:

  • static:使用的是当前类.class作为锁(类锁)
  • 非static:使用的是当前对象(也就是this)作为锁(对象锁)
package com.xqm.juc.lock.one;

public class Test02 {

    public static void main(String[] args) {
        // 锁的是整个Test类
        Test.a();
        Test b=new Test();
        // 锁的是new出来的对象
        b.b();
    }

}

class Test{
    /**
     * 锁的是当前类,也就是Test.class
     */
    public static synchronized void a(){
        System.out.println("aaa");
    }

    /**
     * 锁的是当前类对象,也就是this
     */
    public  synchronized void b(){
        System.out.println("bbb");
    }
}

synchronized的优化

JDK1.5之后,推出了ReentrantLock,LocK的性能高于synchronized。因此在JDK1.6中,JDK团队就对synchronized做出大量优化。

  • 锁消除:在synchronized修饰的代码中,如果不存在操作临界资源的情况,也就是没有多线程的问题,会触发锁消除,即便写了synchronized,也不会执行
    package com.xqm.juc.lock.one;
    
    public class Test03 {
        public synchronized void method(){
            // 没有操作临界资源 此时这个synchronized可以认为是没有
        }
    }
    
    
  • 锁粗化/膨胀:如果在一个循环中,频繁的获取和释放锁资源,这样会带来很大的消耗。锁膨胀就是将锁的范围扩大,避免频繁的竞争和获取锁资源带来不必要的消耗。
     public void method1(){
            for (int i = 0; i < 10000; i++) {
                synchronized (对象){
                    // 这里的synchronized就会触发锁膨胀
                }
            }
        }
    
  • 锁升级:ReentrantLock的实现,是基于CAS获取锁资源,如果拿不到锁资源,才会挂起线程。而synchronized在JDK1.6之前,如果获取不到锁,则立即挂起线程。在JDK1.6之后,就会从无锁-偏向级锁-轻量级锁-重量级锁进行锁升级。
    • 无锁、匿名偏向:当前对象没有锁存在
    • 偏向锁:如果当前锁资源,只有一个线程在频繁的获取和释放锁资源,那么就判断偏向标志位指向的线程是否是当前的线程,如果是,就直接拿到锁资源,不需要竞争。如果当前线程不是偏向标志位指向的那个线程,那么就基于CAS的方式,尝试获取锁资源。如果获取不到,就会触发锁升级,升级为轻量级锁
    • 轻量级锁:会采用自旋锁的方式去频繁的以CAS的形式获取锁资源。如果成功获取到锁,直接获取资源。如果获取不到锁资源,则进行锁升级,升级为重量级锁。自旋锁采用的是自适应自旋锁(JVM会根据上次自旋的次数来决定)
    • 重量级锁:最传统的方式,如果拿不到锁资源,就会挂起线程,出现用户态和内核态的切换

synchronized的实现原理

synchronized是基于对象实现的。

对象组成:

  • 对象头:包括了关于堆对象的布局、类型、GC状态、同步状态和标识哈希码的基本信息。Java对象和vm内部对象都有一个共同的对象头格式。
  • 实例数据:主要是存放类的数据信息,父类的信息,对象字段属性信息。
  • 对齐填充:为了字节对齐,填充的数据。

对象

image.png

  • 对象头:mark word、class point(类型指针)、数组长度

image.png

32位mark word

image.png

64位mark word

image.png

synchronized的锁升级

为了在程序中看到对象头的信息,需要导入依赖。

  <!-- https://mvnrepository.com/artifact/org.openjdk.jol/jol-core -->
        <dependency>
            <groupId>org.openjdk.jol</groupId>
            <artifactId>jol-core</artifactId>
            <version>0.9</version>
        </dependency>

Object对象

package com.xqm.juc.lock.one;

import org.openjdk.jol.info.ClassLayout;

public class Test04_Object {
    public static void main(String[] args) {
        Object o=new Object();
        System.out.println(ClassLayout.parseInstance(o).toPrintable());
        synchronized (o){
            System.out.println(ClassLayout.parseInstance(o).toPrintable());
        }
    }
}

java.lang.Object object internals:
 OFFSET  SIZE   TYPE DESCRIPTION                               VALUE
      0     4        (object header)     01 00 00 00 (00000001 00000000 00000000 00000000) (1)
      4     4        (object header)     00 00 00 00 (00000000 00000000 00000000 00000000) (0)
      8     4        (object header)     e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
     12     4        (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total

java.lang.Object object internals:
 OFFSET  SIZE   TYPE DESCRIPTION                               VALUE
      0     4        (object header)     e8 f0 c7 02 (11101000 11110000 11000111 00000010) (46657768)
      4     4        (object header)     00 00 00 00 (00000000 00000000 00000000 00000000) (0)
      8     4        (object header)     e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
     12     4        (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total

第一个例子:

第一行:0 4 (object header) 01 00 00 00 (00000001 00000000 00000000 00000000) (1)

其中的00000001的最后三位001代表不是偏向锁(0),现在是无锁状态(01)

第二个例子:

第一行:0 4 (object header) e8 f0 c7 02 (11101000 11110000 11000111 00000010) (46657768)

其中的11101000最后两位00代表轻量级锁。(从无锁一下子到轻量级锁?)

原因如下:

锁默认情况下,开启了偏向锁延迟。
偏向锁采用CAS来获取锁
偏向锁升级为轻量级锁时,会涉及到偏向锁撤销,需要等到一个安全点,才可以撤销。在明知道有并发情况下,可以选择不开启偏向锁,或者设置偏向锁延迟开启。
因为JVM在启动时,需要加载大量的class文件到内存中,这个操作会涉及到synchronized的使用,为了避免出现偏向锁撤销操作,JVM启动初期,有个延迟4s启动偏向锁的操作。
如果正常开启了偏向锁,那么不会出现无锁状态,对象会编程匿名偏向。
这里是因为开启了偏向锁延迟开启,因此直接从无锁到轻量级锁。

因此,如果在代码前面sleep(4000);的话,那么直接开启了偏向锁,也就是101。

package com.xqm.juc.lock.one;

import org.openjdk.jol.info.ClassLayout;

public class Test04_Object {
    public static void main(String[] args) throws InterruptedException {
        Thread.sleep(4000);
        Object o=new Object();
        System.out.println(ClassLayout.parseInstance(o).toPrintable());
        synchronized (o){
            System.out.println(ClassLayout.parseInstance(o).toPrintable());
        }
    }
}

java.lang.Object object internals:
 OFFSET  SIZE   TYPE DESCRIPTION                               VALUE
      0     4        (object header)                           05 00 00 00 (00000101 00000000 00000000 00000000) (5)
      4     4        (object header)                           00 00 00 00 (00000000 00000000 00000000 00000000) (0)
      8     4        (object header)                           e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
     12     4        (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total

java.lang.Object object internals:
 OFFSET  SIZE   TYPE DESCRIPTION                               VALUE
      0     4        (object header)                           05 60 e1 02 (00000101 01100000 11100001 00000010) (48324613)
      4     4        (object header)                           00 00 00 00 (00000000 00000000 00000000 00000000) (0)
      8     4        (object header)                           e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
     12     4        (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total

轻量级锁->重量级锁(这里可能是偏向锁、轻量级锁、重量级锁,取决于cpu运行的不同情况)

package com.xqm.juc.lock.one;

import org.openjdk.jol.info.ClassLayout;

public class Test04_Object {
    public static void main(String[] args) throws InterruptedException {
        Thread.sleep(5000);
        Object o=new Object();
        System.out.println(ClassLayout.parseInstance(o).toPrintable());
        new Thread(
                ()->{
                    synchronized (o){
                        System.out.println("thread:"+ClassLayout.parseInstance(o).toPrintable());
                    }
                }
        ).start();
        synchronized (o){
            System.out.println("main:"+ClassLayout.parseInstance(o).toPrintable());
        }
    }
}

java.lang.Object object internals:
 OFFSET  SIZE   TYPE DESCRIPTION                               VALUE
      0     4        (object header)                           05 00 00 00 (00000101 00000000 00000000 00000000) (5)
      4     4        (object header)                           00 00 00 00 (00000000 00000000 00000000 00000000) (0)
      8     4        (object header)                           e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
     12     4        (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total

main:java.lang.Object object internals:
 OFFSET  SIZE   TYPE DESCRIPTION                               VALUE
      0     4        (object header)                           05 60 55 03 (00000101 01100000 01010101 00000011) (55926789)
      4     4        (object header)                           00 00 00 00 (00000000 00000000 00000000 00000000) (0)
      8     4        (object header)                           e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
     12     4        (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total

thread:java.lang.Object object internals:
 OFFSET  SIZE   TYPE DESCRIPTION                               VALUE
      0     4        (object header)                           da 2d 26 1d (11011010 00101101 00100110 00011101) (489041370)
      4     4        (object header)                           00 00 00 00 (00000000 00000000 00000000 00000000) (0)
      8     4        (object header)                           e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
     12     4        (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total

状态:

16599236690553014052ffy

Lock Record以及ObjectMonitor存储的内容

Lock Record是在栈中

image.png

重量级锁底层ObjectMonitor

需要找到openJDK。

网址:https://openjdk.org/ 或者 http://hg.openjdk.java.net/

找到ObjectMonitor的两个文件,hpp,cpp。

目录在:directory /src/share/vm/runtime/

先查看核心属性:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/69087d08d473/src/share/vm/runtime/objectMonitor.hpp

 ObjectMonitor() {
    _header       = NULL;  		// 存储着mark word
    _count        = 0;			// 存储竞争锁的线程个数
    _waiters      = 0,			// 现在wait线程的个数
    _recursions   = 0;			// 标志synchronized锁重入的次数
    _object       = NULL;
    _owner        = NULL;		// 持有锁的线程
    _WaitSet      = NULL;		// 保存wait的线程信息,是双向链表
    _WaitSetLock  = 0 ;
    _Responsible  = NULL ;
    _succ         = NULL ;
    _cxq          = NULL ;		// 获取锁竞争资源失败,线程要放到当前的链表中
    FreeNext      = NULL ;
    _EntryList    = NULL ;		// _cxq以及被唤醒的waitSet中的线程,在一定机制下,放入到entrySet中
    _SpinFreq     = 0 ;
    _SpinClock    = 0 ;
    OwnerIsThread = 0 ;
    _previous_owner_tid = 0;
  }

几个C++的加锁过程:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/69087d08d473/src/share/vm/runtime/objectMonitor.cpp

tryLock函数

int ObjectMonitor::TryLock (Thread * Self) {
   for (;;) {
      // 拿到持有锁的线程
      void * own = _owner ;
      // 如果有线程持有锁,直接返回
      if (own != NULL) return 0 ;
	  // 说明没有线程持有锁,cmpxchg指令就是底层的CAS实现
      if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
		// 成功获取锁资源
         return 1 ;
      }
 	  // 这里重试操作没什么意义,表示没拿到锁,直接返回-1
      if (true) return -1 ;
   }
}

try_enty

bool ObjectMonitor::try_enter(Thread* THREAD) {
  // 当前线程不是持有锁的线程,判断owner是不是当前线程
  if (THREAD != _owner) {
    // 判断当前持有锁的线程是否是当前线程,也就是拿到锁的线程不是owner,说明轻量级锁刚刚升级为重量级锁,还没转换,因为轻量级锁是在栈中
    if (THREAD->is_lock_owned ((address)_owner)) {
   
       _owner = THREAD ;
       _recursions = 1 ;
       OwnerIsThread = 1 ;
       return true;
    }
    // 如果既不是owner,也不是持有锁的线程,那么就CAS操作,尝试获取锁
    if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
	  // 没拿到锁,告辞
      return false;
    }
    // 拿到锁
    return true;
  } else {
	// 是当前线程,说明是锁重入操作
    _recursions++;
    return true;
  }
}

entry:想方设法拿到锁资源,如果没拿到,挂起放到_cxq单向链表中

void ATTR ObjectMonitor::enter(TRAPS) {
  // 拿到当前线程
  Thread * const Self = THREAD ;
  void * cur ;
  // CAS
  cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
  if (cur == NULL) {
     // 拿到锁资源
     return ;
  }
  // 锁重入操作
  if (cur == Self) {
     _recursions ++ ;
     return ;
  }
 // 轻量级锁过来,升级为重量级锁
  if (Self->is_lock_owned ((address)cur)) {
    _recursions = 1 ;
    _owner = Self ;
    OwnerIsThread = 1 ;
    return ;
  }

  Self->_Stalled = intptr_t(this) ;

  if (Knob_SpinEarly && TrySpin (Self) > 0) {
     Self->_Stalled = 0 ;
     return ;
  }

  JavaThread * jt = (JavaThread *) Self ;

  // 没拿到锁资源,count++
  Atomic::inc_ptr(&_count);

  JFR_ONLY(JfrConditionalFlushWithStacktrace<EventJavaMonitorEnter> flush(jt);)
  EventJavaMonitorEnter event;
  if (event.should_commit()) {
    event.set_monitorClass(((oop)this->object())->klass());
    event.set_address((uintptr_t)(this->object_addr()));
  }

  { 
    JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this);

    Self->set_current_pending_monitor(this);

    DTRACE_MONITOR_PROBE(contended__enter, this, object(), jt);
    if (JvmtiExport::should_post_monitor_contended_enter()) {
      JvmtiExport::post_monitor_contended_enter(jt, this);

    }

    OSThreadContendState osts(Self->osthread());
    ThreadBlockInVM tbivm(jt);

    for (;;) {
      jt->set_suspend_equivalent();
       // 入队操作,进入到cxq中
      EnterI (THREAD) ;

      if (!ExitSuspendEquivalent(jt)) break ;

          _recursions = 0 ;
      _succ = NULL ;
      exit (false, Self) ;

      jt->java_suspend_self();
    }
    Self->set_current_pending_monitor(NULL);
  }
  //count--,表示开始去拿锁资源
  Atomic::dec_ptr(&_count);
  Self->_Stalled = 0 ;

  DTRACE_MONITOR_PROBE(contended__entered, this, object(), jt);
  if (JvmtiExport::should_post_monitor_contended_entered()) {
    JvmtiExport::post_monitor_contended_entered(jt, this);
  }

  if (event.should_commit()) {
    event.set_previousOwner((uintptr_t)_previous_owner_tid);
    event.commit();
  }

  if (ObjectMonitor::_sync_ContendedLockAttempts != NULL) {
     ObjectMonitor::_sync_ContendedLockAttempts->inc() ;
  }
}

EnterI

 ObjectWaiter * nxt ;
	// 循环入队,入队之前还得tryLock获取锁资源
    for (;;) {
        node._next = nxt = _cxq ;
		// CAS的方式入队
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

        // Interference - the CAS failed because _cxq changed.  Just retry.
        // As an optional optimization we retry the lock.
        if (TryLock (Self) > 0) {
            assert (_succ != Self         , "invariant") ;
            assert (_owner == Self        , "invariant") ;
            assert (_Responsible != Self  , "invariant") ;
            return ;
        }
    }

三:ReentrantLock

ReentrantLock和synchronized的区别

  • synchronized是java关键字,ReentrantLock是java.util.concurrent包下的类。都是在JVM层面实现互斥锁。
  • synchronized是非公平锁,ReentrantLock可以是非公平锁也可以是公平锁
  • synchronized加在同步方法和同步代码块上,ReentrantLock在try-catch-finally中使用,并且在finally中使用unlock释放
  • synchronized存在锁升级,升级到重量级锁就不能进行锁降级。ReentrantLock不存在锁升级概念
  • synchronized是基于不同锁级别实现的,比如重量级锁是ObjectMonitor,轻量级锁是自旋锁,偏向锁是偏向指针指向线程ID。ReentrantLock是基于AQS实现的。
  • ReentrantLock支持等待可释放锁。可以添加多个条件Condition

AQS概述

AQS

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {.....}

AQS实际上就是JUC下的一个基类。JUC下很多内容都是基于AQS实现部分内容,比如ReentrantLock、ThreadPoolExecutor、阻塞队列、CountdownLatch、Semaphore、CyclicBarrier等等

首先AQS提供了一个由volatile修饰的,采用CAS修改int类型的state变量

private volatile int state;

其次AQS中维护了一个双向链表,每个节点都是Node对象,有prev和next

static final class Node {
        // 共享锁
        static final Node SHARED = new Node();
        // 互斥锁
        static final Node EXCLUSIVE = null;
		// 节点状态,还有一个是0,代表初始化状态

        static final int CANCELLED =  1;
 
        static final int SIGNAL    = -1;

        static final int CONDITION = -2;
  
        static final int PROPAGATE = -3;
        // 描述节点的状态
        volatile int waitStatus;
  
        volatile Node prev;
   
        volatile Node next;
 
        volatile Thread thread;
  
        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

   
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

AQS中Node节点:

image.png

加锁流程源码分析

加锁流程概述

非公平锁流程

image.png

加锁流程源码分析

一共有三个方法:

  • lock
  • tryLock
  • lockInterruptibly
Lock()

java测试类代码

package com.xqm.juc.lock.reentrantLock;

import java.util.concurrent.locks.ReentrantLock;

public class Test01 {

    public static void main(String[] args) {
        // 默认是非公平锁
        ReentrantLock noFairLock=new ReentrantLock();
        // 有参构造,设置为true,就是公平锁
        ReentrantLock fairLock=new ReentrantLock(true);
        noFairLock.lock();
        try {
            System.out.println("业务代码");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            noFairLock.unlock();
        }
    }
}

lock有公平锁和非公平锁区分。

// 上锁
public void lock() {
    sync.lock();
}

//  abstract static class Sync extends AbstractQueuedSynchronizer {}
//  setExclusiveOwnerThread是AbstractQueuedSynchronizer类中的方法
//  acquire也是AbstractQueuedSynchronizer类中的方法

// 非公平锁中sync.lock()
final void lock() {
    // 上来就先基于CAS的方式,尝试将state从0改为1
    if (compareAndSetState(0, 1))
        // 获取锁资源成功,会将当前线程设置到setExclusiveOwnerThread属性,代表是当前线程持有着锁资源
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 执行acquire,尝试获取锁资源
        acquire(1);
}


// 公平锁中sync.lock()
final void lock() {
    // 执行acquire,尝试获取锁资源
    acquire(1);
}

//  AQS中setExclusiveOwnerThread方法
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

acquire方法:是公平锁和非公平锁的逻辑

// AQS中acquire()
public final void acquire(int arg) {
    // tryAcquire方法,再次查看,当前线程是否可以获取锁资源
    // 如果拿到锁资源,返回的是true,加上!,那么就是false,也就是直接不执行下面的语句了
    if (!tryAcquire(arg) &&
            // 执行这条语句代表没有拿到锁资源,Node.EXCLUSIVE代表互斥锁
            // addWaiter(Node.EXCLUSIVE):将当前线程封装为Node节点,插入到AQS的双向链表的结尾
            // acquireQueued:查看我是否是第一个排队的节点,如果是,那就再次尝试获取锁资源,并且如果长时间拿不到,就
            // 挂起线程,如果不是第一个排队的节点,那就尝试挂起线程
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 当前线程中断
        selfInterrupt();
}

tryAcquire:分为公平锁和非公平锁

// 非公平锁中的tryAcquire
final boolean nonfairTryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取state属性
    int c = getState();
    // 判断state是否为0,如果为0代表之前持有锁的线程释放了锁资源
    if (c == 0) {
        // 再次抢夺锁资源
        if (compareAndSetState(0, acquires)) {
            // 抢到锁资源就把state从0改为1
            setExclusiveOwnerThread(current);
            // 拿锁成功返回
            return true;
        }
        // 不是0,说明线程持有着锁,持有锁的线程如果是当前线程,则是锁重入操作
    } else if (current == getExclusiveOwnerThread()) {
        // 将state加1
        int nextc = c + acquires;
        // 如果加1还是小于0 ,就是超出了int正数的最大范围
        // int 最大范围是 01111111 11111111 11111111 11111111
        // 如果加1之后    10000000 00000000 00000000 00000000 就是负数
        // 说明重入次数超出界限
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 正常的将计算结果赋值给state
        setState(nextc);
        // 锁重入成功
        return true;
    }
    // 如果state=1,并且拿锁的线程也不是当前线程,直接返回
    return false;
}

// 公平锁的tryAcquire
protected final boolean tryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取state属性
    int c = getState();
    // 判断state是否为0,如果为0代表之前持有锁的线程释放了锁资源
    if (c == 0) {
        // hasQueuedPredecessors:查看AQS中是否有排队的Node,如果返回false,那么就可以抢占锁资源
		// 两种情况返回false可以抢占资源:一种队列无节点,第二种当前线程是第一个节点
        if (!hasQueuedPredecessors() &&
                // 抢占锁资源
                compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    // 锁重入操作
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}



// 查看AQS中是否有排队的Node,如果返回false,就可以去抢锁了
public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    // 如果头尾相等返回false,证明没有Node排队
    // 头节点的next节点不为null,并且s节点的线程为当前线程,那么就返回false,也就是可以抢占锁资源
    return h != t &&
           ((s = h.next) == null || s.thread != Thread.currentThread());
}

addWaiter():将当前没有拿到锁资源的线程,封装为Node节点,放入AQS的双向链表中去排队

// addWaiter():将当前没有拿到锁资源的线程,封装为Node节点,放入AQS的双向链表中
// 这里的mode代表互斥锁
private Node addWaiter(Node mode) {
    // 将当前线程封装成Node
    Node node = new Node(Thread.currentThread(), mode);
    // 拿到尾节点
    Node pred = tail;
    // 如果尾节点不为null(只有链表中没有节点的时候,尾节点才会指向null)
    if (pred != null) {
        // 当前节点的prev指向尾节点
        node.prev = pred;
        // 使用CAS,保证线程安全,将当前节点设置为tail
        if (compareAndSetTail(pred, node)) {
            // 前一个节点的next指向当前node
            pred.next = node;
            return node;
        }
    }
    // 如果CAS失败,以死循环的方式,保证当前线程的Node一定可以放到AQS队列的末尾
    enq(node);
    return node;
}

// enq():死循环,知道保证node放到链表末尾为止
private Node enq(final Node node) {
    for (;;) {
        // 拿到尾节点
        Node t = tail;
        // 如果尾节点为null,AQS中一个节点也没有
        if (t == null) { // Must initialize
            // new一个Node,也就是构建一个伪节点,设置为头
            if (compareAndSetHead(new Node()))
                // 尾指向头
                tail = head;
        } else {
            // 以CAS的方式,在AQS中有节点之后,插入到AQS队列中的末尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued():判断当前线程是否还能再次获取锁资源,如果不能获取资源,就尝试将当前线程挂起

// acquireQueued():当前没有拿到锁资源,并且到AQS排队之后触发的方法
// 这里不需要考虑中断,lock涉及不到,但是tryLock和lockInterruptibly才会涉及到中断
// node是队列中节点,arg就是1
final boolean acquireQueued(final Node node, int arg) {
    // 获取锁资源是否失败,默认是失败true,如果获取锁成功,那就是false
    // 真正触发的还是tryLock和lockInterruptibly
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 死循环
        for (;;) {
            // node.predecessor():获取当前节点的上一个节点
            final Node p = node.predecessor();
            // 前继节点是否是head,如果是head,再次执行tryAcquire尝试获取锁资源
            if (p == head && tryAcquire(arg)) {
                // 获取锁资源成功,将当前节点设置为头节点,也就是这个节点在链表中没意义了
                setHead(node);
                // p节点也就是当前节点的前继节点,将next设置为null,也就是帮助gc更快回收
                p.next = null; // help GC
                // 代表获取锁资源成功了
                failed = false;
                return interrupted;
            }
            // 没拿到锁资源,是否可以挂起当前线程,传递前继节点和当前节点
            // 基于上一个节点的状态来判断当前节点是否可以挂起,如果上一个节点状态为-1.代表可以挂起,返回true
            // parkAndCheckInterrupt:挂起线程,running--> wait状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 如果获取资源成功了,就不会执行下面的语句
        if (failed)
            // 在lock方法中,基本不会执行
            cancelAcquire(node);
    }
}

// 获取当前节点的上一个节点
final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}

// 获取到锁资源,则节点设置为头节点,其他属性设置为null
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}


// 当前node没有拿到锁资源或者没有资格去竞争锁资源,check是否挂起当前线程
// 挂起之前要查看前继节点的状态,看是否之后能唤醒当前节点,如果不能唤醒,就不能挂起,否则造成这个节点死掉
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取上一个节点状态
    int ws = pred.waitStatus;
    // SIGNAL状态:值为-1,表示当前节点的后继节点可以挂起线程,后续会唤醒后继节点
    if (ws == Node.SIGNAL)
        // 只有当上一个节点的状态为-1,当前节点的线程才能挂起
        return true;
    // CANCELLED:值为1,代表当前节点已经被取消了,不能挂起后继节点的线程
    if (ws > 0) {
        // 如果当前节点是取消状态,那么需要往前找到一个状态不为1的节点
        // 找到状态不为1的节点,设置prev和next的关系
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
        // 如果状态不是1,不是-1,那么就剩0,-2,-3
        // 0代表刚初始化的节点,-2是涉及到condition的状态,-3是涉及共享锁的操作
    } else {
        // 上一个节点的状态不是-1或者1,那就代表节点状态正常,那么就将上一个节点状态设置为-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

// 挂起线程
private final boolean parkAndCheckInterrupt() {
    // 挂起线程
    LockSupport.park(this);
    return Thread.interrupted();
}
tryLock()

无参:

public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

和lock()中tryAcquire()一模一样

// tryLock无论是公平锁还是非公平锁,都会走非公平锁抢占锁资源的操作
// 就是拿到state的值,如果是0 ,直接CAS尝试获取锁,否则就是锁重入操作
// 如果没抢到锁资源,或者不是锁重入,那么直接返回false
public boolean tryLock() {
    // 非公平锁的竞争锁操作
    return sync.nonfairTryAcquire(1);
}



final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

有参:输入时间和单位。在时间内拿到资源,那就执行业务,否则就做一些其他处理

public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

第一波分析

// tryLock有参构造函数
public boolean tryLock(long timeout, TimeUnit unit)
// 如果被中断,会抛出异常
throws InterruptedException {
    // 无论输入什么单位,都会转成纳秒
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}


// tryLock(time,unit)执行的方法
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
    // 判断线程的中断标记位,是不是从false被改为true,如果是,直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // tryAcquire分为公平和非公平两种执行方式。和lock()方法一样,参考上面
    // tryAcquire:拿锁,如果拿锁成功,直接返回
    return tryAcquire(arg) ||
           // 如果拿锁失败,等待指定时间
           doAcquireNanos(arg, nanosTimeout);
}



// 拿锁失败,等待指定时间
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
    // 如果时间小于等于0,那么直接返回false
    if (nanosTimeout <= 0L)
        return false;
    // 设置结束时间
    final long deadline = System.nanoTime() + nanosTimeout;
    // addWaiter,扔到AQS队列中
    final Node node = addWaiter(Node.EXCLUSIVE);
    // 拿锁失败,默认是true
    boolean failed = true;
    try {
        // 死循环
        for (;;) {
            // AQS中,如果当前node是head的next,直接抢锁资源,抢到则返回
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 计算剩余的时间
            nanosTimeout = deadline - System.nanoTime();
            // 判断时间是否用尽
            if (nanosTimeout <= 0L)
                return false;
            // shouldParkAfterFailedAcquire:根据上一个节点状态看是否能挂起线程
            // spinForTimeoutThreshold:1000L
            if (shouldParkAfterFailedAcquire(p, node) &&
                    // 避免时间太少,就不用挂起线程
                    nanosTimeout > spinForTimeoutThreshold)
                // 如果时间足够,那就线程挂起剩余的时间,不需要别人唤醒,时间到了自己唤醒
                LockSupport.parkNanos(this, nanosTimeout);
            // 线程被唤醒,一种是时间到了,另一种是中断标记位改变
            // 中断唤醒就抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        // 除非线程拿到锁,否则就执行
        if (failed)
            cancelAcquire(node);
    }
}

取消在AQS中排队:cancleAcquire()

取消节点整体操作流程:
1.线程设置为null
2.往前找到有效节点作为当前节点的prev
3.将waitState设置为1,代表取消
4.脱离整个AQS队列三种情况
(4.1).当前node是tail
(4.2).当前ndoe是head的后继节点
(4.3).不是tail节点,也不是head的后继节点

// 取消在AQS中排队
// 取消节点整体操作流程:
// 1.线程设置为null
// 2.往前找到有效节点作为当前节点的prev
// 3.将waitState设置为1,代表取消
// 4.脱离整个AQS队列
//    (4.1).当前node是tail
//    (4.2).当前ndoe是head的后继节点
//    (4.3).不是tail节点,也不是head的后继节点
private void cancelAcquire(Node node) {
    // 如果当前node为null,直接忽略
    if (node == null)
        return;
    // 把当前node的线程设置为null
    node.thread = null;

    // 往前找状态不为1的节点
    Node pred = node.prev;
    // waitStatus>0,其实就是1,为取消状态,那么就一直往前找
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 拿到上一个节点的next,不一定是当前节点,因为之前一直往前找过
    Node predNext = pred.next;
    // 当前节点状态设置为1,就是取消状态
    node.waitStatus = Node.CANCELLED;
    // 下面则是脱离AQS的操作
    // 如果当前节点是尾节点,那么使用CAS操作,将当前节点的前继节点设置为tail尾节点
    if (node == tail && compareAndSetTail(node, pred)) {
        // 将pred节点的next指向null
        compareAndSetNext(pred, predNext, null);
    } else {
        // 不是尾节点或者CAS操作失败
        int ws;
        // 如果前置节点不是head的后继节点
        if (pred != head &&
                // 前继节点的状态是否为-1,是的话代表后继节点是挂起的状态.
                // 如果不是-1并且小于等于0,也就是上一个节点的状态不是取消状态,那么就改为-1
                ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
                // 前置节点的线程不为null,避免并发导致节点为head节点或取消节点,线程才会为null
                && pred.thread != null) {
            // 下面一段是为了保证前继节点是有效节点,能够唤醒后面的节点
            // 获取当前节点的下一个节点
            Node next = node.next;
            // 如果下一个节点不为null并且是有效节点
            if (next != null && next.waitStatus <= 0)
                // 把前一个节点的next从当前节点指向当前节点的下一个节点
                compareAndSetNext(pred, predNext, next);
        } else {
            // 当前节点是head的后继节点,那么唤醒后面的节点
            unparkSuccessor(node);
        }
        // 把我自己的next指向我自己,为了方便GC回收
        node.next = node; // help GC
    }
}
lockInterruptibly()

和tryLock()不同,tryLock()会等待指定的时间,直到时间到或者被中断才会结束,但是lockInterruptibly()则是一直等待,没有时间限制,直到被中断为止或者拿到锁资源。

lock.lockInterruptibly();
public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
// lock.lockInterruptibly()函数
public final void acquireInterruptibly(int arg)
throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        // 和tryLock唯一不同
        doAcquireInterruptibly(arg);
}


// 和tryLock唯一区别在于这个函数
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
    // addWaiter,扔到AQS队列中
    final Node node = addWaiter(Node.EXCLUSIVE);
    // 抢夺资源失败,默认为true
    boolean failed = true;
    try {
        for (;;) {
            // p是当前节点的前继节点
            final Node p = node.predecessor();
            // AQS中,如果当前node是head的next,直接抢锁资源,抢到则返回
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            // // shouldParkAfterFailedAcquire:根据上一个节点状态看是否能挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                    // 检查中断异常,返回true的话,代表要被唤醒
                    parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            // 从队列中取消节点,见tryLock()
            cancelAcquire(node);
    }
}


// 检查中断异常
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    // 如果返回true,代表要从挂起状态要被唤醒
    return Thread.interrupted();
}

释放锁流程分析

释放锁流程概述

image.png

释放锁流程源码分析

public void unlock() {
    // 无论是公平锁还是非公平锁,走的都是tryRelease()
    sync.release(1);
}

// 释放锁的核心流程
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        // 如果锁释放干净,走这个流程
        Node h = head;
        // head不为null,也就是head后面有排队的Node,并且node状态不为new,也就是waitStatus!=0
        // 其实也就是head的waitStatus为-1,后面有挂起的排队线程
        if (h != null && h.waitStatus != 0)
            // 唤醒排队线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}


protected final boolean tryRelease(int releases) {
    // 拿到state-1的值,但是并没有赋值给state
    int c = getState() - releases;
    // 如果当前线程不是持有锁的线程,抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // free,代表当前锁是否释放干净,因为有重入操作,而每次state只减1,可能state不等于0,也就是没释放干净锁
    boolean free = false;
    // 如果是0 ,代表释放干净
    if (c == 0) {
        free = true;
        // 将持有锁的线程置为null
        setExclusiveOwnerThread(null);
    }
    // 将state设为c,也就是state-1
    setState(c);
    // 如果锁资源释放干净,返回true,否则返回false
    return free;
}


// 传递的是head,头节点
private void unparkSuccessor(Node node) {
 
    int ws = node.waitStatus;
    // 头节点的状态为-1.
    if (ws < 0)
        // 基于CAS,将头节点的状态从-1改为0
        compareAndSetWaitStatus(node, ws, 0);
    // 拿到头节点的后续节点
    Node s = node.next;
    // 如果后续节点为null,或者后续节点为1,代表节点取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从后往前找到有效节点,也就是waitStatus<=0
        // 之所以不从前往后找,是为了防止,中间某个节点的thread为null,这样就找不到下一个节点了
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                // 有效线程赋值给s
                s = t;
    }
    // 找到需要被唤醒的节点,唤醒节点
    if (s != null)
        LockSupport.unpark(s.thread);
}

AQS常见问题

1.AQS中为什么要虚拟一个head节点

Node节点中有个waitStatus状态,其中-1代表当前的节点的后继节点是挂起状态,需要被唤醒。

AQS中提供了ReentrantLock的基本实现,而在释放锁资源的时候,需要考虑是否执行unparkSuccessor方法,去唤醒后继节点。

如果当前节点的后继节点的线程挂起,那么就将当前节点的状态设置为-1,-1状态的出现是为了避免重复唤醒或者重复释放资源的过程。

因为AQS中排队Node中的线程如果挂起了,是无法主动唤醒的,需要释放锁或者资源的节点的线程去唤醒。

唤醒节点需要从整个AQS双向链表中找到离head最近的有效节点去唤醒。这种操作可能需要从后往前去整个遍历双向链表。

为了避免不必要的遍历双向链表操作,提供了一个-1的状态,如果只有一个Node节点到AQS链表中,所以发现如果是第一个Node进来排队,就需要虚拟一个Node节点作为头,head里面的waitStatus来监听后面节点的状态,是否有需要挂起的线程

2.AQS中为什么要使用双向链表而不是单向链表

因为AQS中存放没有获取到资源的Node,在方法lockInterruptibly中,在竞争锁资源的时候,是可以被中断的,中断后调用cancelAcquire()方法从双向链表中取消节点,将节点的状态置为1,从AQS队列中移除该节点。如果采用单向链表,就需要遍历整个链表,将前继节点指向当前节点的后继节点,这样成本高。其次在唤醒后继节点时,如果是单向链表,也需要遍历整个链表。

四:ReentrantReadWriteLock

为什么出现读写锁

synchronized和ReentrantLock都是互斥锁。

如果说有一种情况,读操作远大于写操作,如果全部使用互斥锁,效率太低。

读和读之间是不互斥的,因此可以并发执行。写和写之间是互斥的,因此需要互斥锁。

读写之间也是互斥的,如果读锁不释放,写操作是拿不到锁的。

读锁:结果是子线程先拿到锁,1s后主线程也拿到锁,5s后释放。

 package com.xqm.juc.lock.reentrantLock;

import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Test02 {

    static ReentrantReadWriteLock lock=new ReentrantReadWriteLock();
    static ReentrantReadWriteLock.WriteLock writeLock=lock.writeLock();
    static ReentrantReadWriteLock.ReadLock readLock=lock.readLock();

    public static void main(String[] args) {

        new Thread(()->{
            readLock.lock();
            try {
                System.out.println("子线程");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                readLock.unlock();
            }
        }).start();
  

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        readLock.lock();
        try {
            System.out.println("主线程");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
        }
    }
}

写锁:子线程先拿到写锁,等5s之后释放了锁,主线程才能拿到读锁。

package com.xqm.juc.lock.reentrantLock;

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Test03 {

    static ReentrantReadWriteLock lock=new ReentrantReadWriteLock();
    static ReentrantReadWriteLock.WriteLock writeLock=lock.writeLock();
    static ReentrantReadWriteLock.ReadLock readLock=lock.readLock();

    public static void main(String[] args) {

        new Thread(()->{
            writeLock.lock();
            try {
                System.out.println("子线程");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                writeLock.unlock();
            }
        }).start();


        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        writeLock.lock();
        try {
            System.out.println("主线程");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();
        }
    }
}

读写锁实现原理

ReentrantReadWriteLock还是基于AQS实现的,还是对state操作,拿到锁资源就去执行业务,没有拿到就去AQS的队列中排队。

读锁:基于state的高16位进行操作

写锁:基于state的低16位进行操作

ReentrantReadWriteLock也是可重入锁。

读锁重入:因为读锁是共享锁,在获取锁时,对高16位进行加1的操作。因为是共享锁,因此同一时间会有多个线程对state进行操作。这样无法确认每个线程重入的次数,为了记录每个线程锁重入的次数,在读操作线程中有ThreadLocal去记录各自的锁重入的次数

写锁重入:写锁的重入方式和ReentrantLock重入一样,都是对state进行加1的操作,只不过state最大值的范围
变小了

写锁的饥饿问题:当有读操作首先拿到锁时,其他读操作到来,可以拿到读锁,直接修改state即可。但是突然来了一个写操作,因为读锁和写锁时互斥的,写操作一直得不到锁,之后可能一直有读操作进来,导致写操作长时间无法获取锁,因此写操作产生饥饿问题。

读线程拿到锁资源后,如果再有读线程需要获取锁资源,需要去AQS队列中排队。如果队列中有写线程,那么写线程后的读线程是拿不到锁资源。比如读-读-写-读-读,那么写后面的读是拿不到锁的。

写锁分析

写锁加锁流程

image.png

写锁加锁源码分析

// 写锁入口
public void lock() {
    sync.acquire(1);
}


public final void acquire(int arg) {
    // tryAcquire和ReentrantLock不一样,只有一个实现方法
    if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}


// 读写锁的写锁的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
    // 拿到当前线程
    Thread current = Thread.currentThread();
    // 拿到state
    int c = getState();
    // 得到state低16位的值
    int w = exclusiveCount(c);
    // 判断线程是否持有锁资源,如果有线程持有锁资源,进入if
    if (c != 0) {
        // w==0   当前没有线程持有写锁,代表有线程持有读锁,因为读写互斥,所以直接返回false
        // current != getExclusiveOwnerThread() 线程持有锁是否是当前线程,不是也返回,代表不是重入锁
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 到这代表当前线程持有写锁。锁重入操作,加1之后看超没超过00000000 00000000 11111111 11111111
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 没有超过锁重入次数,那么设置state+1
        setState(c + acquires);
        return true;
    }
    // 如果没有线程持有读锁和写锁,那么尝试获取锁资源
    // writerShouldBlock()将会区分公平锁和非公平锁
    if (writerShouldBlock() ||
            // CAS尝试拿锁
            !compareAndSetState(c, c + acquires))
        return false;
    // 拿锁成功
    setExclusiveOwnerThread(current);
    return true;
}

//
static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
// MAX_COUNT也是 00000000 00000000 11111111 11111111
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
// 1往左移16位减1
// 00000000 00000000 00000000 00000001
// 00000000 00000001 00000000 00000000 左移16位
// 00000000 00000000 11111111 11111111 减1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 写锁的tryAcquire中方法,将state的低16位的值拿到
// c &  EXCLUSIVE_MASK(00000000 00000000 11111111 11111111)
// 与运算,都为1才为1,否则为0
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }


// writerShouldBlock():非公平,直接返回false
final boolean writerShouldBlock() {
    return false; // writers can always barge
}


// writerShouldBlock():公平,
// 查看AQS队列中是否有node,如果没有排队,返回false,如果有排队,看是否是head的next,如果是,返回false,否则返回true
final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}


// 查看AQS中是否有排队的Node,如果返回false,就可以去抢锁了
public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    // 如果头尾相等返回false,证明没有Node排队
    // 头节点的next节点不为null,并且s节点的线程为当前线程,那么就返回false,也就是可以抢占锁资源
    return h != t &&
           ((s = h.next) == null || s.thread != Thread.currentThread());
}

写锁释放锁流程和源码分析

流程:释放的流程和ReentrantLock一致,只是在判断释放是否干净时,判断低16位的值

// 写锁释放锁的入口
public void unlock() {
    sync.release(1);
}

// 只有tryRelease和reentrantLock不同
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        // 如果释放干净,查看队列,修改状态为-1,查看状态对不对,不对就从结尾遍历到离head最近的Node,然后唤醒
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    // 判断当前线程是否是持有锁的线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //获取state-1
    int nextc = getState() - releases;
    // 判断低16位是否为0,如果为0 ,free为true,代表释放干净
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        // 将持有锁的线程设置为null
        setExclusiveOwnerThread(null);
    // 设置state
    setState(nextc);
    // 释放干净,返回true。写锁是有重入的,这里需要返回false,不去释放排队的Node
    return free;
}

// // 判断当前线程是否是持有锁的线程
protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

读锁分析

读锁加锁流程

判断是否是写锁还是读锁。如果当前线程占有着写锁,同时读操作也来抢占读锁,是可以直接获取到锁资源的。

image.png

读锁加锁源码分析1

第一部分分析:

// 读锁入口
public void lock() {
    sync.acquireShared(1);
}


public final void acquireShared(int arg) {
    // 竞争锁资源,返回-1代表拿锁失败
    if (tryAcquireShared(arg) < 0)
        // 没拿到锁资源去排队
        doAcquireShared(arg);
}

// 读锁竞争锁资源的操作
protected final int tryAcquireShared(int unused) {
    // 获取当前线程
    Thread current = Thread.currentThread();
    // 拿到state
    int c = getState();
    // 先拿到低16位,判断是否不等于0 ,返回true代表有写锁占用锁资源
    // 那么就再次判断拿到写锁的是否是当前线程
    if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
        // 写锁被其他线程占用,拿不到锁,返回-1,去排队
        return -1;
    // 到这一步有两种情况,一种是没有线程持有写锁,一种是当前线程持有写锁
    // 获取读锁的信息,获取state高16位
    int r = sharedCount(c);
    // 这里分为公平锁和非公平锁,返回false则继续往下走
    // 公平锁:查看队列中是否有排队情况,有返回true,直接return,没有则返回false
    // 非公平锁:没有排队的,直接抢,有排队的,
    // 如果是读锁,如果出现这个情况,大部分是写锁刚刚释放资源,后续node还没来得及拿到锁,那么就抢占资源
    if (!readerShouldBlock() &&
            // 判断持有读锁的数量是否达到临界值
            r < MAX_COUNT &&
            // SHARED_UNIT:值为1,也就是state+1
            compareAndSetState(c, c + SHARED_UNIT)) {
        // 读锁的拿锁和可重入操作,暂时不看
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}


// 获取高16位之后的公平锁操作
final boolean readerShouldBlock() {
    // 看是否有排队以及是否是head的next节点
    return hasQueuedPredecessors();
}

// 获取高16位之后的非公平锁操作
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}

final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    // 只要有一个是false,那就返回false,也就是可以去拿锁
    return (h = head) != null &&        // head为null,可以去抢占锁
           (s = h.next)  != null &&     // head.next,也可以去抢占锁
           !s.isShared()         &&     // head.next是共享锁,可以去抢占锁
           s.thread != null;            // head.next的thread为null,可以去抢占锁资源
}
// 判断node是否是共享锁
final boolean isShared() {
    return nextWaiter == SHARED;
}

读锁重入流程

读锁为了记录重入的次数,需要让每个线程用ThreadLocal去记录重入次数,使用map去存储代价太高。

ReentrantReadWriteLock对读锁重入做了一些优化:

1.第一个拿到读锁资源的线程,不需要使用ThreadLocal存储,内部提供两个属性来记录第一个拿到读锁资源的线程(tid:线程id,count:重入次数)

2.ReentrantReadWriteLock在内部对ThreadLocal做了一个封装,基于HoldCounter的对象存储重入次数,由于是ThreadLocal,所以直接count++没有线程安全的问题

3.内部提供了firstReader记录第一个拿到锁资源的线程,firstReaderHoldCount记录第一个拿到锁资源重入的次数

4.cachedHoldCounter记录最后拿到锁资源线程的重入次数

5.readHolds记录当前线程的信息

读锁重入流程:

1.判断当前线程是否是第一个拿到锁资源的线程,也就是state的高16位是否为0,如果是,那么设置firstReader和firstReaderHoldCount设置为当前线程的信息

2.判断当前线程是否是firstReader,如果是,那么直接对firstReaderHoldCount进行++操作

3.到这一步说明不是第一个拿到锁的线程。获取cachedHoldCounter,也就是最后一个拿到锁资源线程的重入次数,判断是否是当前线程

3.1 如果不是,那么将cachedHoldCounter设置为当前线程的重入次数(因为当前线程是最后一个获取锁的),如果是最后一个拿到锁线程的话,那么判断当前线程重入次数是否为0

3.2 如果是的话,就需要重新设置当前的锁重入信息到readHolds中(ThreadLocal中),算是初始化操作,重入的次数是0

3.3对HoldCounter的count进行++操作

使用HoldCounter来存储线程重入的次数

static final class HoldCounter {
			// 重入次数
            int count = 0;
            // 线程id
            final long tid = getThreadId(Thread.currentThread());
        }

        // 封装ThreadLocal
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

// 记录第一个线程
private transient Thread firstReader = null;
// 记录第一个线程重入的次数
private transient int firstReaderHoldCount;
// 记录最后一个拿到锁线程的重入次数
private transient HoldCounter cachedHoldCounter;
// 记录当前线程的信息
private transient ThreadLocalHoldCounter readHolds;

读锁重入源码

// 读锁竞争锁资源的操作
protected final int tryAcquireShared(int unused) {
    // 获取当前线程
    Thread current = Thread.currentThread();
    // 拿到state
    int c = getState();
    // 先拿到低16位,判断是否不等于0 ,返回true代表有写锁占用锁资源
    // 那么就再次判断拿到写锁的是否是当前线程
    if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
        // 写锁被其他线程占用,拿不到锁,返回-1,去排队
        return -1;
    // 到这一步有两种情况,一种是没有线程持有写锁,一种是当前线程持有写锁
    // 获取读锁的信息,获取state高16位
    int r = sharedCount(c);
    // 这里分为公平锁和非公平锁,返回false则继续往下走
    // 公平锁:查看队列中是否有排队情况,有返回true,直接return,没有则返回false
    // 非公平锁:没有排队的,直接抢,有排队的,
    // 如果是读锁,如果出现这个情况,大部分是写锁刚刚释放资源,后续node还没来得及拿到锁,那么就抢占资源
    if (!readerShouldBlock() &&
            // 判断持有读锁的数量是否达到临界值
            r < MAX_COUNT &&
            // SHARED_UNIT:值为1,也就是state+1
            compareAndSetState(c, c + SHARED_UNIT)) {
        // ---------------------------读锁重入操作-----------------------------------------
        // 获取state高16位值,如果是0 ,说明是第一个拿到锁资源的线程
        if (r == 0) {
            // 第一个线程赋值给firstReader
            firstReader = current;
            // 重入的次数为1
            firstReaderHoldCount = 1;
        // 判断当前线程是否是第一个线程
        } else if (firstReader == current) {
            // 如果是的话,说明是重入操作,直接++
            firstReaderHoldCount++;
            // 到这说明不是第一个获取锁资源的线程
        } else {
            // 获取之前最后一个拿到锁资源线程
            HoldCounter rh = cachedHoldCounter;
            // 判断当前线程是否是最后一个拿到锁资源的线程
            //
            if (rh == null || rh.tid != getThreadId(current))
                // 如果不是,那就把当前线程设为最后一个线程
                cachedHoldCounter = rh = readHolds.get();
            // 当前线程是最后一个拿到锁的线程 cachedHoldCounter,并且还是之前已经释放的线程,count=0
            else if (rh.count == 0)
                // 直接将当前线程的重入信息设置到ThreadLocal,当前线程就是之前最后一个拿锁的线程
                readHolds.set(rh);
            // 不管是不是最后一个线程,重入次数count++
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

读锁竞争循环源码

fullTryAcquireShared()

// tryAcquireShared中,如果没有拿到锁资源,走这个方法,尝试再次获取锁,逻辑和上面基本一样
// 读锁加锁的逻辑
final int fullTryAcquireShared(Thread current) {
    // 当前线程的锁重入次数
    HoldCounter rh = null;
    // 死循环:要不拿到锁,要不就拿不到进行阻塞
    for (;;) {
        // 获取state
        int c = getState();
        // 判断当前有写锁在占用锁资源,并且不是当前线程,那么直接返回-1,去队列中排队
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // 当前是否可以尝试获取锁资源:分为公平锁和非公平锁
        } else if (readerShouldBlock()) {
            // 无论公平还是非公平,只要进来,就要开始准备阻塞当前线程,也就是放到AQS队列中
            // 下面这个if在处理ThreadLocal的内存泄露问题
            if (firstReader == current) {
                // 如果当前线程是firstReader,因为不使用ThreadLocal,就什么都不用做
            } else {
                // 第一次进来是null
                if (rh == null) {
                    // 拿到最后一次获取读锁的线程
                    rh = cachedHoldCounter;
                    // 如果线程为null(第一次都为null),或者当前线程不是最后一次获取读锁的线程,没拿到重入数
                    if (rh == null || rh.tid != getThreadId(current)) {
                        // 从ThreadLocal中拿到重入的计数器
                        rh = readHolds.get();
                        // 如果重入数目为0,说明之前没拿到过锁
                        if (rh.count == 0)
                            // 直接remove,防止内存泄露
                            readHolds.remove();
                    }
                }
                // 前面处理完之后,直接返回-1,代表可以加入AQS队列中了
                if (rh.count == 0)
                    return -1;
            }
        }
        // 判断重入次数或者超出最大次数,抛异常
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 否则CAS尝试获取锁资源
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

读线程在AQS队列获取锁资源的后续操作

1、正常如果都是读线程来获取读锁资源,不需要使用到AQS队列的,直接CAS操作即可

2、如果写线程持有着写锁,这是读线程就需要进入到AQS队列排队,可能会有多个读线程在AQS中。

当写锁释放资源后,会唤醒head后面的读线程,当head后面的读线程拿到锁资源后,还需要查看next节点是否也是读线程在阻塞,如果是,直接唤醒

public final void acquireShared(int arg) {
    // 竞争锁资源,返回-1代表拿锁失败
    if (tryAcquireShared(arg) < 0)
        // 没拿到锁资源去排队
        doAcquireShared(arg);
}
// doAcquireShared读锁未获取到锁资源,准备进AQS中排队
private void doAcquireShared(int arg) {
    // 声明node,类型是共享锁,放到AQS队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 拿到上一个节点
            final Node p = node.predecessor();
            // 如果上一个节点是head
            if (p == head) {
                // 直接执行tryAcquireShared()
                int r = tryAcquireShared(arg);
                // 当写锁释放锁资源。队列中线程可以唤醒抢占锁,r>0,r有两种,-1是没拿到锁,1是拿到锁
                if (r >= 0) {
                    // setHeadAndPropagate()拿到读锁之后,需要做的后续处理
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 找到上一个有效节点,将状态设置为-1,挂起当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


// AQS中node拿到读锁之后,做的后续操作
private void setHeadAndPropagate(Node node, int propagate) {
    // 获取头节点
    Node h = head; 
    // 将当前节点设置为头节点,因为当前节点获取到了锁资源,会释放head节点,当前节点替换为head,thread为null
    setHead(node);
    // propagate上面传下来,-1代表没拿到锁,1代表拿到锁,拿到锁才会走这个方法,因此propagate肯定为1
    // 因此propagate > 0在这里的意义更多是在信号量处理JDK1.5的bug操作
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
        // 拿到当前节点的next节点,如果next节点是共享锁,直接唤醒,因为也会得到读锁资源
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

读锁释放锁流程和源码分析

// 读锁释放锁
public void unlock() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    // 处理state的值,以及可重入的内容,如果返回true,就释放队列中的node
    if (tryReleaseShared(arg)) {
        // AQS队列的相关的事
        doReleaseShared();
        return true;
    }
    return false;
}

// 1.处理重入的次数 2.处理state
protected final boolean tryReleaseShared(int unused) {
    // 获取当前线程
    Thread current = Thread.currentThread();
    // 如果当前线程是第一个获取锁的线程,不需要ThreadLocal
    if (firstReader == current) {
        // 如果当前线程重入次数只有一次的话
        if (firstReaderHoldCount == 1)
            // 直接释放当前线程
            firstReader = null;
        else
            // 否则就把重入次数--
            firstReaderHoldCount--;
    } else {
        // 获取cachedHoldCounter
        HoldCounter rh = cachedHoldCounter;
        // 当前线程是不是最后一个获取锁的线程
        if (rh == null || rh.tid != getThreadId(current))
            // 如果不是,当前线程就作为了最后一个线程
            rh = readHolds.get();
        int count = rh.count;
        // 看当前线程重入次数是否小于等于1
        if (count <= 1) {
            // 如果是,直接释放
            readHolds.remove();
            // 如果释放多了,就不用再释放了,因为后面都会统一--
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        // 当前线程重入次数--
        --rh.count;
    }
    for (;;) {
        int c = getState();
        // 减去高16位
        int nextc = c - SHARED_UNIT;
        // CAS设置state为释放后的次数
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

// 唤醒AQS中排队的线程
private void doReleaseShared() {
    // 死循环
    for (;;) {
        // 拿到头
        Node h = head;
        // 头不等于Null,并且不等于尾。说明有排队的node
        if (h != null && h != tail) {
            // 获取waitStatus
            int ws = h.waitStatus;
            // 判断是否为-1,如果是,说明后面有挂起的线程
            if (ws == Node.SIGNAL) {
                // 先基于CAS操作,将head的waitStatus从-1改为0,只要有一个node被唤醒就能改
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;   
                // 唤醒后续节点
                unparkSuccessor(h);
                // 头节点状态不是-1
            } else if (ws == 0 &&
                // 把waitStatus从0改成了-3,这里不是给读写锁准备的,在Semaphore、countDownLatch中使用
                       !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果队列中没有排队的node,直接break,没有要唤醒的node
        if (h == head)
            break;
    }
}

五:死锁

操作系统2022版本有,已经有最新的死锁课程了,这里就不做过多讲解

查看这个课程:

https://www.mashibing.com/course/1368 56-62