avatar

25.Java并发之ReentrantLock及AQS简析

0x00 前言

在上一篇文章中我们介绍了CAS无锁机制,为我们这篇文章做了个铺垫。为什么要这么说呢?因为我们这篇将会说到ReentrantLock,其中就有CAS无锁机制的身影。
在这篇文章中,我们将要说到:

  • AQS的思想浅析
  • AQS应用之ReentrantLock简析

下面就让我们开始吧!可能篇幅有点长,耐心的读下去吧。

0x01 AQS概览

  1. AQS全名是AbstractQueuedSynchronizer,是java.util.concurrent.locks包下的一个抽象类,其子类有我们熟知的CountDownLatchReentrantLockReentrantReadWriteLock等等。今天我们就使用ReentrantLock来介绍和分析AQS队列同步器,顺便来一探ReentrantLock的秘密。
  2. AQS抽象类中的几个重要的属性和方法
    1. Node内部类

      • Node类是一个节点类,代码如下:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      static final class Node {
      //标识,表示该锁为共享锁
      static final Node SHARED = new Node();
      //标识,表示该锁为独占锁(排他锁)
      static final Node EXCLUSIVE = null;

      static final int CANCELLED = 1;
      static final int SIGNAL = -1;
      static final int CONDITION = -2;
      static final int PROPAGATE = -3;
      //状态,取值为CANCELLED、SIGNAL、CONDITION、PROPAGATE
      //对于普通同步节点,字段初始化为0
      volatile int waitStatus;
      //前驱,即上一个节点的值
      volatile Node prev;
      //后继,即下一个节点的值
      volatile Node next;
      //当前节点对应的线程,在new Node的时候当前线程需要作为参数传递进来
      volatile Thread thread;
      //链接到正在等待状态的下一个节点,或共享的特殊值
      Node nextWaiter;
      }
      • waitStatus状态取值描述如下:
        • CANCELLED
          • 值为1,如果节点的waitStatus为此类型时,说明对应的线程等待超时或被中断,需要从同步队列中取消等待。节点一旦变为该状态,就不会再变成其他的值。
        • SIGNAL
          • 值为-1,如果节点的waitStatus为此类型时,后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态(释放了锁)或被取消,将会通知后继节点,使后继节点的线程得以运行。
        • CONDITION
          • 值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中。与Condition有关。
        • PROPAGATE
          • 值为-3,表示下一次共享式同步状态获取将会无条件的被传播下去。
        • INITIAL
          • 值为0,初始状态
      • AQS使用Node类定义一个使用双向链表作为数据结构的等待队列,翻看AQS的源码,发现Node类doc文档中有如下图案。该图案就是等待队列的草图。因为是使用文字画的图,所以没有把next画出来。但是我们AQS确实是使用双向链表作为等待队列的。
        1
        2
        3
             +------+  prev +-----+   prev +-----+
        head | | <---- | | <----- | | tail
        +------+ +-----+ +-----+
    2. head属性

      • 等待队列的头节点,总是指向队列的头部,head指向的Node对应的对象是锁的拥有者。字段定义为:private transient volatile Node head;
    3. tail属性

      • 等待队列的尾节点,也就是总是指向队列的尾部。字段定义:private transient volatile Node tail;
    4. state属性

      • 表示是否拿到锁,即锁的状态。
    5. tryAcquire()方法

      • 该方法是一个抽象方法,由子类实现,看字面意思是视图获取锁。我们将在分析ReentrantLock时看到它的身影。这里暂且不提。方法定义为:
      1
      2
      3
      protected boolean tryAcquire(int arg) {
      throw new UnsupportedOperationException();
      }
    6. tryRelease()方法

      • 该方法也是一个抽象方法,在释放锁的时候需要调用该方法,由子类进行实现。方法定义如下:
      1
      2
      3
      protected boolean tryRelease(int arg) {
      throw new UnsupportedOperationException();
      }
  3. 以上就是AQS类中一些属性和抽象方法的定义,乍一看摸不着头脑。所以需要拿它的子类下手。下面将以ReentrantLock来进行分析。

0x02 ReentrantLock.lock(通过ReentrantLock获取锁)

  1. ReentrantLock的构造方法看起。源码如下:

    1
    2
    3
    4
    5
    6
    7
    public ReentrantLock() {
    sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    }
    • 从源码上来看,它定义了两个AQS同步器的子类,一个是NonfairSync非公平锁,一个是FairSync公平锁.并且默认是NonfairSync。下面我们就以非公平锁NonfairSync为例,讲解ReentrantLock获取锁的细节。
    • 我们通常使用ReentrantLock获取锁的代码大致写法如下:
      1
      2
      3
      4
      ReentrantLock lock = new ReentrantLock();
      lock.lock();
      //或者使用
      boolean isLock = lock.tryLock();
      第一句获取ReentrantLock对象,我们已经分析了,默认获取的是非公平锁。那么下面就让我们看下第二句lock.lock()做了些什么事吧。注意:这里以非公平锁NonfairSync为例。
  2. 首先我们来看看NonfairSync非公平锁类的定义

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    //获取锁。后面会再讲到该方法
    final void lock() {
    if (compareAndSetState(0, 1))
    setExclusiveOwnerThread(Thread.currentThread());
    else
    acquire(1);
    }
    //这个方法貌似有点熟悉,没错,就是AQS中定义的。
    protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
    }
    }

    上面说到NofairSync不是实现AbstractQueuedSynchronizer么?怎么这里是继承Sync?莫急莫慌,我们来看看Sync中是怎样的吧。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;
    //定义一个lock方法让子类去实现。`NonfairSync`实现了该方法
    abstract void lock();
    //为节省篇幅,该方法体我使用...代替。待下面说到该方法时再给出方法体
    final boolean nonfairTryAcquire(int acquires) {
    ...
    }
    //为节省篇幅,该方法体我使用...代替。待下面说到该方法时再给出方法体
    protected final boolean tryRelease(int releases) {

    }
    //为节省篇幅,该方法体我使用...代替。待下面说到该方法时再给出方法体
    protected final boolean isHeldExclusively() {
    ...
    }

    final ConditionObject newCondition() {
    return new ConditionObject();
    }
    //获取锁的拥有者
    final Thread getOwner() {
    return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    final int getHoldCount() {
    return isHeldExclusively() ? getState() : 0;
    }

    final boolean isLocked() {
    return getState() != 0;
    }

    private void readObject(java.io.ObjectInputStream s)
    ...
    }
    }

    通过以上源码,我们大致知道了,是否拥有锁就是通过父类(AQS)的getState()方法获取state,看是否等于0.如果等于0则说明该锁还不属于谁,也就是还没有线程获取锁。

  3. 下面就让我从lock.lock()方法入手分析,一探AQS的思想。

    • 调用ReentrantLocklock()方法获取锁。lock()方法定义如下:
    1
    2
    3
    public void lock() {
    sync.lock();
    }

    可见,调用的是sync中的lock方法。这里sync在创建ReentrantLock时就指定为NonfairSync.那我们就直接来查看NonfairSync中的lock方法把。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    final void lock() {
    //调用父类(AQS)的compareAndSetState方法,对state属性进行赋值
    if (compareAndSetState(0, 1))
    //如果if条件成立,则设置锁的拥有者
    setExclusiveOwnerThread(Thread.currentThread());
    else
    //if如果不成立则调用acquire()方法
    acquire(1);
    }

    首先先使用compareAndSetState()尝试设置AQS的state属性值为1,即获取锁。compareAndSetState内部使用CAS算法比较赋值。这也是为什么说上一篇文章是为该篇做铺垫。代码如下:

    1
    2
    3
    protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    注意:此处没有自旋操作,也就是尝试赋值,因为可能同时有多个线程获取锁,如果成功把state的值变为1,则返回true,否则返回false。
    compareAndSetState()返回为true时,则设置锁的拥有者为当前线程。线程一次性就获取到锁,这是最理想的结果。那么如果该线程抢占设置state失败,则又会进行什么样的操作呢?

    • acquire(1)解析。该方法在AQS中实现,代码如下
    1
    2
    3
    4
    public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }

    该方法首先会调用tryAcquire()方法,试图再去获取一次锁。注意:此时的参数为1。该方法在子类实现,NonfairSync实现的源码如下:

    1
    2
    3
    protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
    }

    发现其中调用的是nonfairTryAcquire()方法,那么这个方法又是在哪呢?该方法在其父类(Sync)中定义。源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    //获取state值
    int c = getState();
    //如果state=0,则说明还没有线程获取锁,就尝试获取锁
    if (c == 0) {
    //调用compareAndSetState()方法尝试获取锁,此时acquires为1
    if (compareAndSetState(0, acquires)) {
    //如果设置值成功了。则把锁的拥有者设置为当前线程
    setExclusiveOwnerThread(current);
    return true;
    }
    }//如果state不为0,则判断一下该锁是不是当前线程拥有
    else if (current == getExclusiveOwnerThread()) {
    //如果是,则state+1
    int nextc = c + acquires;
    //该判断成立的条件是nextc的值已经查过了int的最大值
    if (nextc < 0) // overflow
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    //返回true
    return true;
    }
    //如果都不成立,则直接返回false
    return false;
    }

    该方法首先会获取state的值,这时有两种情况:
    - 如果state为0,则说明该锁还没有被其他线程拥有,于是尝试调用compareAndSetState()方法去获取锁,如果能获取到锁,则设置锁的拥有者为当前线程,此时的state值变成了1
    - 如果state不为0时,也有两种情况:
    - 看是不是当前线程拥有了锁,如果是当前对象拥有锁,则把state+1.这就是重入锁。如果state超出了最大值,也就是可重入锁达到了最大,则会抛出异常。
    - 如果不是当前线程拥有锁,则返回false。

    回到AQS的acquire()方法,如果tryAcquire()方法返回是true,则下面就不用执行了,但这种是最乐观的,也是最理想的情况。那么如果该锁已经被其他线程获取了,接下来该怎么做呢?

    • 如果tryAcquire()方法返回的false,!取反后则会为true,根据&&的特性,则会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg).妈耶!这是啥?怎么这么绕。不急!我们一层一层的剥开它的衣服。
  4. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)分析

    • 首先它执行的是addWaiter(Node.EXCLUSIVE)。那我们就一起去看看它代码怎么写的

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      private Node addWaiter(Node mode) {
      Node node = new Node(Thread.currentThread(), mode);
      // Try the fast path of enq; backup to full enq on failure
      Node pred = tail;
      if (pred != null) {
      node.prev = pred;
      if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
      }
      }
      enq(node);
      return node;
      }

      等等,咋看起来一脸懵逼?下面来听我一行一行的分析。
      首先,创建了个Node对象。这个Node类就是我们开头说的AQS的内部类,或许你已经知道了下面将进行什么操作了。没错,就是构建双向链表。
      首先是把tail赋给一个中间变量,tail也就是一直指向队列的尾部,刚开始tail为null,所以就会执行enq(node)方法。我们先来看看enq()方法做了些什么事。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      private Node enq(final Node node) {
      //使用for循环自旋
      for (;;) {
      Node t = tail;//t的值为队列尾部的节点,即最后一个节点
      if (t == null) { // 如果t==null,也就是第一次进入的时候
      //新建一个节点,使用CAS把该节点设置为head节点
      if (compareAndSetHead(new Node()))
      //如果设置成功,则尾节点也指向头节点
      tail = head;
      } else {//如果t不为null,也就是队列中已经有节点
      node.prev = t; //把node的前驱设置为t。
      //使用CAS算法把尾部节点设置为node
      if (compareAndSetTail(t, node)) {
      //如果设置成功,则说明tail的值为node。
      //注意,compareAndSetTail()不是把node值赋给t,即不是t指向node。
      //此时,t还指向上一个尾节点,而不是指向node,
      //设置上一个节点的后继为node
      t.next = node;
      return t;
      }
      }
      }
      }

      enq()使用for循环自旋使用CAS设置值,首先会读出尾节点的值。然后判断尾节点是否为null。

      1. 为null
        • 新建一个节点,使用CAS设置该节点为head,如果设置成功,则把尾节点也指向该节点,即初始化双向链表。注意,此时还不会退出该方法。画图表示如下。这里为了简单表示,我会把tail和head表示为一个指针。
      2. 不为null
        • 即已经初始化了双向链表,图为:

          node为传递进来的参数,此时与head没有连接。
        • 执行node.prev = t后图为:
        • 然后执行compareAndSetTail(t, node)即设置tail尾节点的值,如果成功,这时候图为:
        • 再接着设置上一个尾节点的后继。执行t.next = node,最后的图为:

      妈耶!终于是把enq方法分析完了,继续加油!简单来说,该方法就是初始化链表,然后再把head和传进来的node关联起来,并且更新尾节点的值为传进来的node。
      绕来绕去,还记得我们分析到了哪么?提示下,我们分析到了这里:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      private Node addWaiter(Node mode) {
      Node node = new Node(Thread.currentThread(), mode);
      // Try the fast path of enq; backup to full enq on failure
      Node pred = tail;
      if (pred != null) {
      node.prev = pred;
      if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
      }
      }
      //刚刚分析完enq方法
      enq(node);
      return node;
      }

      虽然我们分析完了enq方法,但是addWaiter()前半部分还没说呢。有过分析enq方法的经验,一眼就可以看出,如果pred不为null是在干些什么事。这代码在enq()中也有。意识就是把node节点与队列关联,并且把tail指向node。总得来说,执行完这个方法,双向队列应该是这样的。

  5. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,我们已经分析完了里层的addWaiter(Node.EXCLUSIVE)。总得来说addWaiter(Node.EXCLUSIVE)会返回一个状态为Node.EXCLUSIVE(独占锁的标识)的节点。下面接着该分析acquireQueued(Node,arg)方法了。方法源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    //是否被中断
    boolean interrupted = false;
    //for循环自旋
    for (;;) {
    //node.predecessor()获取node的前驱
    final Node p = node.predecessor();
    //如果该节点的前驱是head节点,即是老二,所以可以去抢占下锁。
    //如果当前线程尝试获取锁是成功的。则执行if体中的代码
    if (p == head && tryAcquire(arg)) {
    //设置当前节点为头节点(head)
    setHead(node);
    //断开当前节点的前驱与等待队列的连接。以便于垃圾回收期去回收
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    //如果当前节点的前驱不是head节点或获取锁失败则执行如下代码。具体我们下面分析
    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

    首先也是一个for循环自旋,在循环体里获取通过node.predecessor()获取到前驱,如果前驱是head节点,也就是说前驱已经拿到锁了,那么根据FIFO,当前节点是有机会去成为下一个拿到锁的节点。所以就会去尝试获取锁。如果获取锁成功,也就是说,前驱刚好释放锁了。那么就把当前节点设为head节点,然后把该节点的前驱值为null,方便GC去回收。如果条件不满足,则会走到if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()).下面我们先看看shouldParkAfterFailedAcquire()方法。

    • shouldParkAfterFailedAcquire(prev,node)方法源码如下:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      int ws = pred.waitStatus; // 前驱的waitStatus值
      //如果前驱的等待状态值是SIGNAL,则返回true。
      //即会指定if中的第二个方法parkAndCheckInterrupt.等下面说到我们再说这里是什么意思
      if (ws == Node.SIGNAL)
      return true;
      //如果waitStatus>0,根据我们前面列出的waitStatus的取值来看,
      //只有当线程等待超时或被中断,需要从同步队列中取消等待,waitStatus才会>0。
      //简单说就是上一个节点已经挂了。
      if (ws > 0) {
      //循环往前查找,直到查找到的节点的waitStatus<0为止。即跳过已经被中断或者已经结束的线程。
      do {
      node.prev = pred = pred.prev;
      } while (pred.waitStatus > 0);
      pred.next = node;
      } else {
      //如果前驱节点状态正常,则把前驱节点的状态变为SIGNAL
      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
      }
      return false;
      }
      这里的小结先不说,需要结合下面的方法才能说清楚。
    • parkAndCheckInterrupt()源码如下:
      1
      2
      3
      4
      5
      6
      7
      private final boolean parkAndCheckInterrupt() {
      //使用LockSupport的park方法,让线程进行阻塞(挂起/休息)
      //只有调用了unpark()或者被interrupt才能唤醒
      LockSupport.park(this);
      //返回线程是否被中断。
      return Thread.interrupted();
      }
      结合上面的shouldParkAfterFailedAcquire()方法分析,该if()做的事情如下:
      • 查看前驱的waitStatus是不是SIGNAL,只有前驱是SIGNAL状态,才可以调用parkAndCheckInterrupt()挂起(休息)。可以简单的想象为:只有当你家门锁了(前驱是SIGNAL状态),你才能安心的去休息,对吧。
      • 如果前驱已经已经退出了等待队列,则需要在等待队列中找到最近的节点(该节点还在等待队列中)。可以简单的想象为:你去排队买东西,然后排你前面的n个人有事不排了,你是不是应该往前走,直到找到最近一个还在排队的人,然后排在他后面等待。
      • 如果前驱的状态是<0的,也就是前驱的节点的状态是正常的,则需要把前驱的状态改成SIGNAL,以便下次循环时,当前节点能够挂起(休息)。
    • acquireQueued()方法已经分析完了,即下面这段代码我们已经搞定了。
      1
      2
      3
      4
      5
      public final void acquire(int arg) {
      if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      //中断当前线程
      selfInterrupt();
      }
    • 让我们再来总结下这段代码再干些什么吧。流程图如下:
  6. 到此,整个ReentrantLock.lock()方法算是分析完了。画了个流程图,再次总结下lock()的全过程

0x03 ReentrantLock.unLock(ReentrantLock释放锁)

  1. 上面已经说完了如何获取锁,下面我们将谈到如何释放锁。有获取就一定得释放,不然会造成死锁现象。
  2. 同样,我们从ReentrantLock.unlock()方法开始入手,其源码简单,如下:
    1
    2
    3
    public void unlock() {
    sync.release(1);
    }
    我们同样追踪进去,看看在Sync中做了什么,该方法实现是在AQS中。还记得我们这里sync取值是什么吗?我们分析的是NonfairSyncrelease()代码如下
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public final boolean release(int arg) {
    //调用子类的tryRelease()方法释放锁
    if (tryRelease(arg)) {
    //如果释放成功,拿到head节点
    Node h = head;
    //如果head不为空,并且waitStatus不为0
    if (h != null && h.waitStatus != 0)
    //唤醒下一个正常的节点
    unparkSuccessor(h);
    return true;
    }
    return false;
    }
    释放锁逻辑比获取锁要简单些。所以分析起来没有上面那么绕。总的来说就是调用子类的方法试图去释放锁,如果释放成功,就唤醒下个正常的节点,告诉他:哥们,我已经用完了锁,下面该你了。然后下一个节点就会被唤醒,执行上面分析的acquireQueued()方法中的for循环自旋,直到获取锁成功。
  3. tryRelease()方法分析.这里分析的是ReentrantLock.Sync类的实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    protected final boolean tryRelease(int releases) {
    //releases的值为1,下面为什么要减呢?
    //还记得我们获取锁时,传的数字是多少么?是1.
    //所以如果不是重入锁这样一减,就把c置0了。即释放锁
    int c = getState() - releases;
    //释放锁,必须是锁的拥有者,这个很好理解
    if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
    boolean free = false;
    //如果c的值为0,那么说明释放锁成功了。
    //如果进行上面的操作后,c的值还不为0.则说明是重入锁。
    //即获取了多少次锁就得释放多少次
    if (c == 0) {
    free = true;
    //设置锁的拥有者为null。即释放锁
    setExclusiveOwnerThread(null);
    }
    //设置c的值
    setState(c);
    return free;
    }
    再来整理下逻辑。首先会用(state-releases)。然后判断调用该方法的线程是不是已经拿到锁的线程。这很好理解,你释放锁,首先得拿到锁吧?不然你释放啥?接下来就会判断c的值是不是为0.如果为0则说明锁已经释放成功了,然后把锁的拥有者置null。这里出现减去后,c的值还不为0的情况,只有当该锁是重入锁时。也就是说,你调用了多少次lock,相应的你也要调用多少次unlock。
  4. 接下来分析下它是如何唤醒下一个兄弟的。即unparkSuccessor()方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    private void unparkSuccessor(Node node) {
    //获取head节点(即获取到锁的节点)的状态值
    int ws = node.waitStatus;
    //如果状态值<0.则把它置0
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);
    //获取下一个兄弟
    Node s = node.next;
    //如果下一个兄弟为空,或者他的状态>0。换句话说,就是下一个兄弟已经挂了。
    if (s == null || s.waitStatus > 0) {
    s = null;
    //从等待队列末尾寻找离head节点最近的正常节点。
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    //经过一系列操作,如果s的不为空,则唤醒该节点对应的线程
    if (s != null)
    LockSupport.unpark(s.thread);
    }
    首先获取到当前节点(得到锁的节点)的状态值,把它置0。然后获取它的下个兄弟,如果排在他后面的兄弟挂了,那么就得从后往前找到离head节点最近的正常节点(waitStatus<=0),如果该节点不为空,那么就唤醒该线程。之后被唤醒的线程就会调用上面分析的acquireQueued()方法中的for循环自旋,直到获取锁成功。即会调用if (p == head && tryAcquire(arg)) 去尝试获取锁。可能有时候首次调用p!=head,但是没有关系,经过一次shouldParkAfterFailedAcquire()方法调整后,当前唤醒的线程的节点会紧接着head,即p==head会成立。
  5. 到此,整个释放锁的过程已经分解完了。释放锁没什么难度。总得来说就是先尝试释放锁,如果释放成功,就通知下一个兄弟(节点)该你获取锁了。

0x04 总结

  1. 今天我们对AQS队列同步器以及ReentrantLock进行了分析和讲解。篇幅很长,内容也很多,能够看到最后和分析到最后真的不易。最后总结几点:
    • ReentrantLock是一个重入锁。所以释放锁时,有多少层就得unlock多少次,不然就不算是释放锁资源。
    • 非公平锁,非公平是指,获取锁时,可以先插队,如果插队获取不了,再加入到等待队列。
    • 恩。其他的上面的分析中都有,就不多说了。
  2. 推荐几篇不错的文章吧。都是关于锁的。
    1. 并发编程网的一篇文章:AQS的原理浅析
    2. 博客园的一篇文章:Java并发之AQS详解
    3. 美团技术团队的一篇文章:不可不说的Java“锁”事
  3. 本人才疏学浅,对知识的理解难免有误,还望大家能够不吝指出,我会及时更正。感谢大家观看!

评论