Lock

Mr.ZhangJava大约 41 分钟

Lock

JUC Lock锁的特性及用法

提示

相对Java synchronized,JUC Lock有何优势?

尽管synchronized的底层原理比较复杂,但是使用起来却非常简单。从本节开始,我们来学习另外一种互斥锁:JUC并发包提供的Lock锁。相对于synchronized内置锁,JUC Lock锁提供了更加丰富的特性,比如支持公平锁、可中断锁、非阻塞锁、可超时锁等。本节,我们就详细介绍一下JUC Lock锁的各种特性及其用法。下一节,我们再结合AQS,对这些特性的实现原理做深入讲解。

1、JUC锁概述

JUC提供了几种不同的锁,继承和实现层次关系如下图所示。本节重点讲解Lock锁(也就是Lock接口及其实现类ReentrantLock),下下节讲解读写锁(也就是ReadWriteLock接口及其实现类ReetrantReadWriteLock)以及读写锁的升级版StampedLock。

img
img

我们先来看Lock接口,其接口定义如下所示。因为在平时的开发中,我们用到的锁都是可重入锁,所以,Lock接口只有 一个可重入的实现类ReentrantLock。

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

2、可重入锁

可重入锁指的是可以被同一个线程多次加锁的锁。注意,这里说的多次加锁,并不是说解锁之后再次加锁,而是在锁没有解锁前再次加锁。

如下代码所示,为了保证线程安全,getEvenSeq()函数和increment()函数中的代码都加了锁。getEvenSeq()函数调用increment()函数,导致getEvenSeq()函数在锁释放前再次加锁。如果JUC提供的锁不支持可重入特性,那么,getEvenSeq()中的第二次加锁需要等待锁释放,而锁释放又需要加锁之后才能执行,于是,getEvenSeq()就会出现死锁。

public class Demo {
  Lock lock = new ReentrantLock();
  private int seq = 0;

  public int getEvenSeq() {
    lock.lock();
    try {
      // ...省略其他操作...
      if (seq%2 == 1) {
        increment();
      }
      return seq;
    } finally {
      lock.unlock();
    }
  }

  public void increment() {
    lock.lock();
    seq++;
    lock.unlock();
  }
}

对于上述代码,我们稍微解释一下。之所以getEvenSeq()函数使用finally来释放锁,是为了避免代码抛出异常而导致锁无法正常释放。而之所以increment()函数没有使用finally来释放锁,是因为代码比较简单,我们可以确定代码不会抛出异常。

JUC提供的锁都是可重入锁。实际上,Java synchronized内置锁也是可重入锁。从侧面上,我们也可以得出,可重入是对锁的基本要求。为了实现可重入特性,可重入锁中需要有一个变量来记录重入的次数。每重入一次,变量就增一;每解锁一次(调用unlock()或退出synchronized代码块),变量就减一,直到变量值为0时,才会释放锁唤醒其他线程执行。

3、公平锁

对于公平锁来说,线程会按照请求锁的先后顺序来获得锁,也就是我们经常说的FIFO。对于非公平锁,多个线程请求锁时,非公平锁无法保证这些线程获取锁的先后顺序,有可能后申请锁的线程先获取到锁。

Java将synchronized设计为只支持非公平锁,而JUC提供的ReentrantLock既支持公平锁,也支持非公平锁。默认情况下,ReentrantLock为非公平锁。如果需要创建公平锁,那么我们只需要在创建ReentrantLock对象时,将构造函数的参数设置为true即可。如下代码所示。

Lock lock = new ReentrantLock(true); // 公平锁

接下来,我们再来看下,公平锁和非公平锁的实现原理。

在讲解synchronized的底层实现原理时,我们讲到,多个线程竞争锁,竞争到锁的线程就去执行任务了,没有竞争到锁的线程会放入Monitor锁的_cxq队列中等待锁,并且还需要调用park()函数阻塞自己。当持有锁的线程释放锁时,它会从_EntryList队列中取一个线程,取消阻塞状态,让它去重新竞争锁,而不是直接将锁给它。而此时如果有新来的线程也要竞争这个锁,新来的线程有可能不需要排队,就能直接获取锁,显然,这是一种“插队”的行为。

当然,我们也可以让synchronzied支持公平锁。实现起来也很简单。新来的线程在申请获取锁时,如果_EntryList队列和_cxq队列中有排队等待锁的线程,那么,不管此时锁有没有释放,新来的线程都直接放入_cxq队列中排队,按照先来后到的顺序等待锁,以避免新来线程的“插队”行为。这样实现的锁就是公平锁。

实际上,ReentrantLock实现公平锁和非公平锁的方法,跟上述synchronized的实现方法,其基本原理是一致的。区别在于,ReentrantLock使用AQS(抽象队列同步器)来存储排队等待锁的线程。关于AQS,我们在下一节中详细讲解。

既然实现公平锁并不复杂,而且从直觉上,公平锁比非公平锁更加合理,但是,synchronized为什么只支持非公平锁?主要原因有以下3个方面。

1)历史的原因:synchronized早期开发时没有考虑那么全面;

2)需求的原因:绝大部分业务场景都不需要严格规定线程的执行顺序,如果真的需要,我们可以通过条件变量(wait()、notify()等)等同步工具来实现;

3)性能的原因:非公平锁的性能比公平锁的性能更好。我们知道,加入等待队列并调用park()函数阻塞线程,涉及到用户态和内核态的切换,是比较耗时。对于非公平锁来说,新来的线程直接竞争锁,这样就有可能避免加入等待队列并调用费时的park()函数。

不过,非公平锁也有缺点,在极端情况下,线程竞争锁激烈,频繁有新来的线程插队,就有可能导致,排在等待队列中的线程,迟迟无法获取到锁。

4、可中断锁

对于synchronized锁来说,线程在阻塞等待synchronized锁时是无法响应中断的。而JUC Lock接口提供了lockInterruptibly()函数,支持可响应中断的方式来请求锁。示例代码如下所示。主线程先获取到了锁并一直持有,之后线程t1调用lockInterruptibly()请求锁,因为锁被主线程持有,所以,线程t1阻塞等待。主线程调用interrupt()函数向线程t1发起中断请求,线程t1响应中断请求,退出阻塞等待锁,并打印“I am interrupted”。

public class Demo {
  private static Lock lock = new ReentrantLock();
  
  public static void main(String[] args) {
    Thread t1 = new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          lock.lockInterruptibly();
        } catch (InterruptedException e) {
          System.out.println("I am interrupted");
          return;
        }
        try {
          System.out.println("I got lock");
        } finally {
          lock.unlock();
        }
      }
    });
    lock.lock();
    t1.start();
    t1.interrupt();
    lock.unlock();
  }
}

可中断锁一般用于线程管理中,方便关闭正在执行的线程。比如,Nginx服务器采用多线程来执行请求。当我们调用stop命令关闭Nginx服务器时,Nginx服务器可以采用中断的方式,将阻塞等待锁的线程中止,然后,合理的释放资源和妥善处理未执行完成的请求,以实现服务器的优雅关闭。

5、非阻塞锁

对于synchronized锁来说,一个线程去请求一个synchronized锁时,如果锁已经被另一个线程获取,那么,这个线程就需要阻塞等待。JUC Lock接口提供了tryLock()函数,支持非阻塞的方式获取锁。如果锁已经被其他线程获取,那么,调用tryLock()函数会直接返回错误码而非阻塞等待。示例代码如下所示。非阻塞锁的实现原理非常简单。竞争锁失败的线程不放入队列排队即可实现非阻塞锁。

public class Demo {
  private Lock lock = new ReentrantLock();
  
  public void useTryLock() {
    if (lock.tryLock()) {
      try {
        // ...执行业务代码...
      } finally {
        lock.unlock();
      }
    } else {
      // ...没有获取锁,执行其他业务代码...
    }
  }
}

6、可超时锁

除了提供不带参数的tryLock()函数之外,JUC Lock接口还提供给了带时间参数的tryLock()函数,支持非阻塞获取锁的同时设置超时时间。也就是说,一个线程在请求锁时,如果这个锁被其他线程持有,那么这个线程会阻塞等待一段时间。如果超过了设定的超时时间,线程仍然没有获取到锁,那么tryLock()函数将会返回错误码而不再阻塞等待。示例代码如下所示。从示例代码中,我们还可以发现,tryLock()跟lockInterruptibly()一样,也可以被中断。这样是为了避免tryLock()阻塞过长时间。

public class Demo {
  private Lock lock = new ReentrantLock();
  
  public void useTryLockWithTimeout() {
    boolean locked = false;
    try {
      locked = lock.tryLock(100, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
      System.out.println("I am interrupted");
    }
    if (locked) {
      try {
        // ...执行业务代码...
      } finally {
        lock.unlock();
      }
    } else {
      // ...没有获取锁,执行其他业务代码...
    }
  }
}

接下来,我们再来看看可超时锁的应用场景。

在很多对响应时间比较敏感的系统中,比如面向用户的系统,从用户体验上说,请求失败给与提示,要远好于响应超时而没有反应。我们拿Tomcat等Web服务器来举例。Tomcat采用线程池的方式多线程执行用户请求。如果某个特殊请求不能并发执行,并且请求执行时间比较长,那么,请求的处理代码就需要加锁。当多个线程同时执行多个特殊请求时,有些线程就有可能因为迟迟无法获取到锁而无法执行请求。一方面,这样会导致用户请求超时,给用户带来不好的体验,另一方面,线程一直等待锁,长期被占用,无法执行其他任务,剩余可以执行用户请求的线程变少,从而加重了系统负担,导致更多请求超时。

针对以上问题,我们就可以使用带超时时间的tryLock()函数来请求锁,如果在设定的超时时间内未获取到锁,那么,线程就中止执行用户请求,返回错误信息给用户。当然,这只是保护措施,毕竟,以上问题只有在无法并发执行的特殊请求集中大量到来时才会发生。

JUC Lock底层原理

提示

如何使用AQS(抽象队列同步器)实现JUC Lock?

上一节,我们讲解了JUC Lock的各种特性,比如支持重入锁、公平锁、可中断锁、非阻塞锁、可超时锁。本节,我们就来讲一下JUC Lock的底层实现原理。JUC Lock底层主要依赖AQS来实现。AQS也是JUC中非常重要的基础组件。JUC中很多锁(Lock、ReadWriteLock)和同步工具(Condition、Semaphore、CountDownLatch)都是基于AQS来实现的。因此,在讲解JUC Lock的底层实现原理时,我们会重点讲解AQS。

一、AQS简介

AQS是抽象类AbstractQueueSynchronizer的简称,中文翻译为抽象队列同步器。

前面讲到,在Hotspot JVM中,synchronized主要依赖ObjectMonitor类来实现。类中的_cxq、_EntryList、_WaitSet用来排队线程。其中,_cxq、_EntryList用来实现锁,也就是synchronized,_WaitSet用来实现条件变量,也就是wait()和notify()。实际上,在功能上,AQS跟ObjectMonitor非常类似,都实现了排队线程、阻塞线程、唤醒线程等功能。

class ObjectMonitor {
  void * volatile _object; //该Monitor锁所属的对象
  void * volatile _owner; //获取到该Monitor锁的线程
  ObjectWaiter * volatile _cxq; //没有获取到锁的线程暂时加入_cxq
  ObjectWaiter * volatile _EntryList; //存储等待被唤醒的线程
  ObjectWaiter * volatile _WaitSet; //存储调用了wait()的线程
}

不过,在实现思路上,AQS跟ObjectMonitor有所不同。首先,ObjectMonitor类是在JVM中基于C++来实现的,因为synchronized、wait()、notify()是Java语言提供的内置的语法和函数。AQS类是在JDK中基于Java语言实现的,因为JUC只是JDK中的一个并发工具包而已。其次,ObjectMonitor使用不同的队列来实现锁和同步工具,AQS使用同一个队列来实现锁和同步工具。

接下来,我们就详细讲解一下AQS的实现原理。

二、数据结构

AQS类中所包含的成员变量并不多,如下代码所示。这几个成员变量构成了AQS实现锁和同步工具所依赖的核心数据结构。

public abstract class AbstractQueuedSynchronizer
                      extends AbstractOwnableSynchronizer {
    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;
}

public abstract class AbstractOwnableSynchronizer {
    private transient Thread exclusiveOwnerThread;
}

如上代码所示,AQS继承自AbstractOwnableSynchronizer类。AbstractOwnableSynchronizer类只包含一个成员变量exclusiveOwnerThread。AQS连带继承来的一个成员变量,总共有4个成员变量。接下来,我们依次介绍下它们。

1)state

前面在讲到synchronized的底层实现原理时,我们讲到,当多个线程竞争锁时,它们会通过CAS操作来设置ObjectMonitor中的_owner字段。谁设置成功,谁就获取了这个锁。实际上,AQS中的state的作用就类似于ObjectMonitor中的_owner字段。只不过_owner字段是一个指针,存储的是获取锁的线程,而state是一个int类型的变量,存储0、1等整型值。其中,0表示锁没有被占用,1表示锁已经被占用,大于1的数表示重入的次数。当多个线程竞争锁时,它们会通过如下所示的CAS操作来更新state的值。这里CAS指的是先检查state的值是否为0,如果是的话,将state值设置为1。谁设置成功,谁就获取了这个锁。

protected final boolean compareAndSetState(int expect, int update) {
  return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

compareAndSetState()函数底层使用Unsafe类提供的native函数来实现。native函数是JVM中的C++函数。如果想要阅读native函数的代码实现,那么,我们需要查看JVM源码。实际上,compareAndSetState()函数经过层层调用,最底层仍然是依靠硬件提供的原子CAS指令来实现。

2)exclusiveOwnerThread

AQS中的exclusiveOwnerThread成员变量存储持有锁的线程,它配合state成员变量,可以实现锁的重入机制。关于重入机制的实现方式,我们稍后讲解。

3)head和tail

在ObjectMonitor中,_cxq、_EntryList用来存储等待锁的线程,_WaitSet用来存储调用了wait()函数(等待条件变量的函数)的线程。相比而言,AQS只有一个等待队列,既可以用来存储等待锁的线程,又可以用来存储等待条件变量的线程。在ObjectMonitor中,_cxq使用单链表来实现,_EntryList和_WaitSet使用双向链表来实现。在AQS中,等待队列使用双向链表来实现。双向链表的节点定义如下所示。AQS中的head和tail两个成员变量分别为双向链表的头指针和尾指针。

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    
    volatile Thread thread;
    volatile Node prev;
    volatile Node next;
    volatile int waitStatus;
    Node nextWaiter;
}

三、基本原理

AQS使用模板方法模式来实现。在《设计模式之美》一书中,我们讲到,模板方法模式包含两个主要的组件:模板方法和抽象方法。模板方法包含主功能逻辑,并且依赖抽象方法来实现部分逻辑的可定制化。当使用模板方法模式时,我们需要定义一个子类,让其继承模板类,并实现其中的抽象方法,然后再使用子类创建对象,调用对象的模板方法来做编程开发。AQS的代码结构和使用方法大致也是如此。

AQS定义了8个模板方法,如下所示**。**以下8个函数可以分为2组,分别用于AQS的两种工作模式:独占模式和共享模式。其中,前4个函数用于独占模式,后4个函数用于共享模式。Lock为排它锁,因此,Lock的底层实现只会用到AQS的独占模式。ReadWriteLock中的读锁为共享锁,写锁为排它锁,因此,ReadWriteLock的底层实现既会用到AQS的独占模式,又会用到AQS的共享模式。Semaphore、CountdownLatch这些同步工具只会用到AQS的共享模式。

public final void acquire(int arg) { ... }
public final void acquireInterruptibly(int arg)
                  throws InterruptedException { ... }
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                  throws InterruptedException { ... }
public final boolean release(int arg) { ... }

public final void acquireShared(int arg) { ... }
public final void acquireSharedInterruptibly(int arg)
                  throws InterruptedException { ... }
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
                  throws InterruptedException { ... }
public final boolean releaseShared(int arg) { ... }

**AQS提供了4个抽象方法,如下所示。**前两个抽象方法用于独占模式的4个模板方法,后两个抽象方法用于共享模式的4个模板方法。在标准的模板方法模式的代码实现中,抽象方法需要使用abstract关键字来定义,以强制子类去实现它。但以下抽象方法并没有使用abstract关键字来定义,而是给出了默认的实现,即抛出UnsupportOperationException异常。这样做是为了减少开发量,即我们不需要在子类中实现所有的抽象方法,用到哪个就实现哪个即可。

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

**接下来,我们结合ReentrantLock来看下AQS如何使用。**ReentrantLock既支持非公平锁,又支持公平锁,其部分代码如下所示。ReentrantLock定义了两个继承自AQS的子类:NonfairSync和FairSync,分别用来实现非公平锁和公平锁。因为NonfairSync和FairSync的释放锁的逻辑是一样的,所以,NonfairSync和FairSync又抽象出了一个公共的父类Sync。注意,为了更清晰的展示原理,在不改变代码逻辑的情况下,我对本节中的代码均做了少许调整。

public class ReentrantLock implements Lock {
    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer { ... }
    static final class NonfairSync extends Sync { ... }
    static final class FairSync extends Sync { ... }
    
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    } 
     
    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }
    //...省略其他方法...
}

ReentrantLock中的lock()函数调用AQS的acquire()模板方法来实现,unlock()函数调用AQS的release()模板方法来实现。接下来,我们就来看下acquire()和release()的底层实现原理。

1)acquire()模板方法

acquire()的代码实现如下所示。acquire()的代码实现看似非常简单,实际上,其包含的逻辑可不少。acquire()先调用tryAcquire()方法去竞争获取锁。如果tryAcquire()获取锁成功,acquire()就直接返回。如果tryAcquire()获取锁失败,那么就会执行addWaiter(),将线程包裹为Node节点放入等待队列的尾部,最后调用acquireQueued()阻塞当前线程。selfInterrupt()用来处理中断,如果在等待锁的过程中,线程被其他线程中断,那么,在获取锁之后,将线程的中断标记设置为true。这里的中断不是重点,简单了解即可。

public final void acquire(int arg) {
    // tryAcquire() -> addWaiter() -> acquireQueued()
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire()是抽象方法,在NonfairSync和FairSync中实现。代码如下所示。我对代码做了详细的注释,这里就不再重述其中的代码逻辑了。两个tryAcquire()方法的代码实现区别也不大,唯一的区别是在获取锁之前,FairSync会调用hasQueuedPredecessors()函数,查看等待队列中是否有线程在排队,如果有,那么tryAcquire()返回false,表示竞争锁失败,从而禁止“插队”获取锁的行为。

static final class NonfairSync extends Sync {
    // 尝试获取锁,成功返回true,失败返回false。AQS用于实现锁时,acquires=1
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState(); //获取state值
        if (c == 0) { //1、锁没有被其他线程占用
            if (compareAndSetState(0, acquires)) { //CAS设置state值为1
                setExclusiveOwnerThread(current); //设置exclusiveOwnerThread
                return true; //获取锁成功
            }
        } else if (current == getExclusiveOwnerThread()) { //2、锁可重入
            int nextc = c + acquires; // state+1
            if (nextc < 0) //重入次数太多,超过了int最大值,溢出为负数,此情况罕见
                throw new Error("Maximum lock count exceeded");
            setState(nextc); // state=state+1,state记录重入的次数,解锁的时候用
            return true; //获取锁成功
        }
        return false; //3、锁被其他线程占用
    }    
}

static final class FairSync extends Sync {
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) { //1、锁没有被占用
            if (!hasQueuedPredecessors() &&  //等待队列中没有线程时才获取锁
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) { //2、锁可重入
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

addWaiter()函数的代码实现如下所示。在多线程环境下,往链表尾部添加节点会存在线程安全问题,因此,下面的代码采用自旋+CAS操作的方式来解决这个问题,这种方式在AtomicInteger等原子类中被大量使用,我们在讲解原子类时再详细讲解。除此之外,addWaiter()函数还需要特殊处理链表为空的情况,同样也存在线程安全问题,也同样是采用自旋+CAS操作解决的。注意,为了方便操作,AQS中的双向链表带有虚拟头节点。关于虚拟头节点,你可以阅读我的《数据结构与算法之美》这本书来了解。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 自旋执行CAS操作,直到成功为止
    for (;;) {
        Node t = tail;
        if (t == null) { //链表为空,添加虚拟头节点
            //CAS操作解决添加虚拟头节点的线程安全问题
            if (compareAndSetHead(null, new Node()))
                tail = head;
        } else { //链表不为空
            node.prev = t;
            //CAS操作解决了同时往链表尾部添加节点时的线程安全问题
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return node;
            }
        }
    }
}

acquireQueued()的代码实现如下所示,主要包含两部分逻辑:使用tryAcquire()函数来竞争锁和使用park()函数来阻塞线程,并且采用for循环来交替执行这两个逻辑。之所以这样做,是因为线程在被唤醒(取消阻塞)之后,并不是直接获取锁,而是需要重新竞争锁,如果竞争失败,那么就需要再次被阻塞。关于代码中涉及的中断的处理逻辑,我们在本节中的中断机制小结中讲解。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋(竞争锁+阻塞),因为被唤醒之后不一定能竞争到锁,所以要自旋
        for (;;) {
            final Node p = node.predecessor();
            // 如果线程是被中断唤醒的,那么p就不一定等于head,也就不能去竞争锁
            if (p == head && tryAcquire(arg)) {
                setHead(node); //把node设置成虚拟头节点,也就相当于将它删除
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 调用park()函数来阻塞线程,线程被唤醒有以下两种情况:
            // 1、其他线程调用unpark()函数唤醒,此时,节点位于虚拟头节点的下一个,p==head
            // 2、被中断唤醒,此时,节点不一定是虚拟头节点的下一个,p不一定等于head
            if (parkAndCheckInterrupt()) interrupted = true;
        }
    } finally { //以上过程只要抛出异常,都要将这个节点标记为CANCELLED,等待被删除
        if (failed) cancelAcquire(node);
    }
}

private final boolean parkAndCheckInterrupt() {
    //底层调用JVM提供的native park()函数来实现,跟synchronized使用的park()函数相同
    LockSupport.park(this);
    return Thread.interrupted();
}

2)release()模板方法

release()模板方法的代码实现比较简单,如下所示,主要包含两部分逻辑:使用tryRelease()函数释放锁和调用unpark()函数唤醒链首节点(除虚拟头节点之外)对应的线程。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); //内部调用unpark()函数
        return true;
    }
    return false;
}

tryRelease()是抽象方法。不管是公平锁还是非公平锁,tryRelease()释放锁的逻辑相同,如下所示。代码中有详细的注释,这里就不再赘述代码逻辑了。

static final class Sync extends AbstractQueuedSynchronizer {
    // 释放锁,成功返回true,失败返回false。AQS用于实现锁时,releases=1
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases; // state-1
        //不持有锁的线程去释放锁,这不是瞎胡闹嘛,抛出异常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        if (c == 0) { //state-1之后为0,解锁
            setExclusiveOwnerThread(null);
            return true;
        }
        setState(c); //state-1之后不为0,说明锁被重入多次,还不能解锁。
        return false;
    }
}

从上述分析,我们可以发现,模板方法acquire()包含加锁的所有逻辑,比如竞争锁、竞争失败之后的排队、阻塞,而竞争锁这部分逻辑由抽象方法tryAcquire()来实现,因此,我们可以在子类中定制如何竞争锁,比如是否支持重入锁、是否支持公平锁等。模板方法release()包含解锁的所有逻辑,比如释放锁、唤醒等待线程,而释放锁这部分逻辑由抽象方法tryRelease()来实现,因此,我们也可以在子类中定制如何释放锁。

四、中断机制

在独占模式下,AQS中对应的模板方法有4个。前面讲到了两个:acquire()和release(),分别用来实现ReentrantLock中的lock()和unlock()函数。接下来,我们再来看下另外两个:aquireInterruptibly()和tryAquireNanos(),它们分别用来实现ReentrantLock中的lockInterruptibly()函数和带超时时间的tryLock()函数。具体如下代码所示。

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

public boolean tryLock(long timeout, TimeUnit unit)
                               throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

acquireInterruptibly()模板方法对应的代码实现如下所示。代码实现也非常简单,如果线程被中断,则抛出InterruptedException异常,否则,调用tryAcquire()竞争获取锁,如果获取锁成功,则直接返回,否则,调用doAcquireInterruptible()函数。

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted()) throw new InterruptedException();
    if (!tryAcquire(arg)) doAcquireInterruptibly(arg);
}

doAcquireInterruptibly()函数的代码实现如下所示,跟之前讲的acquireQueued()函数的代码实现非常相似。唯一的区别是对中断的响应处理不同。parkAndCheckInterrupt()函数返回有两种情况,一种是其他线程调用了unpark()函数取消阻塞,另一种是被其他线程中断。对于第二种情况,acquireQueued()函数不对中断做任何处理,继续等待锁。doAcquireInterruptibly()函数则是将中断包裹为InterruptedException异常抛出,终止等待锁。因此,调用acquire()实现的lock()函数,在阻塞等待锁时,不会被中断。调用acquireInterruptibly()实现的lockInterruptibly()函数,在阻塞等待锁时,可以被中断。

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (parkAndCheckInterrupt())
                throw new InterruptedException(); //区别:抛出异常!
        }
    } finally {
        if (failed) cancelAcquire(node);
    }
}

五、超时机制

tryAquireNanos()模板方法的代码实现如下所示。代码实现也非常简单,如果线程被中断,则直接抛出InterruptedException异常,否则,调用tryAcquire()竞争获取锁,如果获取锁成功,则直接返回,否则,调用doAcquireNanos()函数。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    if (Thread.interrupted()) throw new InterruptedException();
    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}

doAcquireNanos()函数的代码实现如下所示。在doAcquireInterruptibly()函数的代码实现的基础之上,doAcquireNanos()函数又添加了对超时的处理机制。因此,使用tryAcquireNanos()实现的ReentrantLock的tryLock()函数,既支持中断,又支持设置超时时间。

private boolean doAcquireNanos(int arg, long nanosTimeout)
                                    throws InterruptedException {
    if (nanosTimeout <= 0L) return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) return false;
            if (nanosTimeout > spinForTimeoutThreshold) //不着急阻塞,先自旋一下
                LockSupport.parkNanos(this, nanosTimeout); //超时阻塞
            if (Thread.interrupted()) throw new InterruptedException();
        }
    } finally {
        if (failed) cancelAcquire(node);
    }
}

为了支持超时阻塞,在阻塞线程时,doAcquireNanos()函数调用parkNanos()函数。parkNanos()函数的实现方式跟park()函数差不多。在讲解synchronized的时候,我们提到,park()函数的代码实现大致如下所示。parkNanos()只需要将其中的pthread_cond_wait()函数替换成了pthread_cond_timewait()函数便可以实现超时等待。

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
boolean ready = false;
void park() {
  ...
  pthread_mutex_lock(&mutex);
  while (!ready) {
      pthread_cond_wait(&cond, &mutex);
  }
  ready = false;
  pthread_mutex_unlock(&mutex);
  ...
}

读写锁和StampedLock

提示

ReadWriteLock的锁升级与锁降级及其底层实现原理

在上上节中,我们讲到,JUC提供的锁有三类:普通互斥锁(Lock和ReentrantLock)、读写锁(ReadWriteLock和ReentrantReadWriteLock)、StampedLock、上两节,我们介绍了JUC中的Lock,并且讲解了其底层实现原理,特别是AQS。本节,我们讲解读写锁和StampedLock。

一、读写锁的基本用法

为了提高多线程环境下代码执行的并发度,两个读操作是可以并发执行的,但是,读操作和写操作不能并发执行,同理,写操作和写操作也不能并发执行。为了满足这样特殊的加锁需求,JUC提供了读写锁(ReadWriteLock接口和ReentrantReadWriteLock类)。

ReadWriteLock接口的定义,如下所示。跟Lock和ReentrantLock的关系类似,ReadWriteLock也只有一个可重入的实现类ReentrantReadWriteLock。

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

ReadWriteLock接口中只包含两个函数,其中,readLock()函数返回读锁。读锁用来给读操作加锁。writeLock()函数返回写锁。写锁用来给写操作加锁。读锁是一种共享锁,读锁可以被多个线程同时获取。写锁是排它锁。写锁同时只能被一个线程获取。除此之外,读锁和写锁之间也是排它的。因此,读写锁一般用于读多写少的场景。读写锁的使用示例代码如下所示。两个线程允许并发执行get()函数。

public class Demo {
  private List<String> list = new LinkedList<>();
  private ReadWriteLock rwLock = new ReentrantReadWriteLock();
  private Lock rLock = rwLock.readLock(); //读锁
  private Lock wLock = rwLock.writeLock(); //写锁
  
  public void add(int idx, String elem) {
    wLock.lock(); //加写锁
    try {
      list.add(idx, elem);
    } finally {
      wLock.unlock(); //释放写锁
    }
  }
  
  public String get(int idx) {
    rLock.lock(); //加读锁
    try {
      return list.get(idx);
    } finally {
      rLock.unlock(); //释放读锁
    }
  }
}

ReentrantReadWriteLock既支持公平锁又支持非公平锁。跟ReentrantLock的公平锁和非公平锁的构建方法一样,ReentrantReadWriteLock默认为非公平锁。如果要成创建公平锁,我们只需要在创建ReentrantReadWriteLock对象时,将构造函数的参数设置为true即可,示例代码如下所示。

ReadWriteLock rwLock = new ReentrantReadWriteLock(true); //公平锁
ReadWriteLock rwLock = new ReentrantReadWriteLock(false); //非公平锁
ReadWriteLock rwLock = new ReentrantReadWriteLock(); //默认为非公平锁

二、锁升级和锁降级

前面讲到,绝大部分锁都是可重入锁,读写锁也不例外。一个线程获取读锁之后,在读锁释放前,还可以再次获取读锁。同理,一个线程获取写锁之后,在写锁释放前,还可以再次获取写锁。但是,一个线程在获取读锁之后,在读锁释放前,是否还能再获取写锁?还有,一个线程在获取写锁之后,在写锁释放前,是否还能再获取读锁呢?

读写锁不支持锁升级,也就是,一个线程获取读锁之后,在读锁释放前,不可以再获取写锁。这是因为在一个线程获取读锁时,有可能同时还有其他线程也获取了读锁,如果将一个线程的读锁升级为写锁,那么就有可能违背了读写锁中读锁和写锁互斥的要求。示例代码如下所示。

img
img

读写锁支持锁降级,也就是,一个线程在获取写锁之后,在写锁释放前,可以再获取读锁。当写锁释放之后,线程持有的锁从写锁降级为读锁,示例代码如下所示。

img
img

当临界区中既有写操作又有读操作时,如果我们用写锁来给整个临界区加锁,那么代码的并行度就不高。如果我们先加写锁,写操作完成之后释放写锁,再加读锁执行读操作。如下图所示,这样做就有可能存在多线程安全问题,我们无法保证写操作和读操作的组合起来的原子性。写操作完成之后,切换到其他线程执行,更新了共享变量的值,读操作变无法读取之前写操作之后的值了。而使用上图中的锁降级,我们便既可以保证临界区线程安全,又能提到代码的并行度。

img

三、读写锁的实现原理

前面讲到读写锁跟上一节讲到的普通锁(JUC Lock)一样,既支持公平锁,也支持非公平锁。ReentrantReadWriteLock的代码结构如下所示。

public class ReentrantReadWriteLock implements ReadWriteLock {
    private final ReadLock readerLock;
    private final WriteLock writerLock;
    final Sync sync;
    public ReentrantReadWriteLock() { this(false);}
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    public WriteLock writeLock() { return writerLock; }
    public ReadLock readLock()  { return readerLock; }
    
    // AQS的子类NonfairSync和FairSync的公共父类:Sync
    abstract static class Sync extends AbstractQueuedSynchronizer {
        abstract boolean readerShouldBlock(); //用来区分公平锁和非公平锁
        abstract boolean writerShouldBlock(); //用来区分公平锁和非公平锁
        //以下为AQS模板方法的抽象方法的代码实现
        protected final boolean tryAcquire(int acquires) { ... }
        protected final boolean tryRelease(int releases) { ... }
        protected final int tryAcquireShared(int unused) { ... }
        protected final boolean tryReleaseShared(int unused) { ... }
        //..省略其他方法...
        final boolean tryWriteLock() { ... }
        final boolean tryReadLock() { ... }
    }
    
    static final class NonfairSync extends Sync {
        final boolean writerShouldBlock() { return false; }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }

    static final class FairSync extends Sync {
        final boolean writerShouldBlock() { return hasQueuedPredecessors(); }
        final boolean readerShouldBlock() { return hasQueuedPredecessors(); }
    }
}

上述代码结构跟ReentrantLock的代码结构类似,NonfairSync和FairSync具体化抽象的模板类AQS,并且实现了其中的抽象方法。NonfairSync是非公平锁,FairSync是公平锁。ReentrantReadWriteLock使用NonfairSync或FairSync来编程实现读锁(ReadLock)和写锁(WriteLock)。ReadLock和WriteLock均实现了Lock接口,使用相同的AQS,实现了Lock接口中的所有加锁和解锁函数。ReadLock和WriteLock的代码实现如下所示。

//写锁中的加锁和解锁方法使用AQS的独占模式下的几个模板方法来实现
public static class WriteLock implements Lock, java.io.Serializable {
    private final Sync sync;
    protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; }
    public void lock() { sync.acquire(1); }
    public void unlock() { sync.release(1); }
    public boolean tryLock( ) { return sync.tryWriteLock(); }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}
//读锁中的加锁和解锁方法使用AQS的共享模式下的几个模板方法来实现
public static class ReadLock implements Lock {
    private final Sync sync;
    protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; }
    public void lock() { sync.acquireShared(1); }
    public void unlock() { sync.releaseShared(1); }
    public boolean tryLock() { return sync.tryReadLock(); }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
}

从上述代码,我们可以发现,读锁和写锁共用一个AQS。在上一节中,我们讲到,对于JUC Lock,我们使用AQS中state变量来表示加锁情况,0表示没有加锁,1表示已经加锁,大于1的值表示重入次数。对于读写锁来说,我们不仅需要知道有没有加锁、重入次数,还需要知道加的是读锁还是写锁,但是AQS中只有一个表示加锁情况的int类型的state变量。为了让state变量表达更多的信息,我们用state变量中的低16位表示写锁的使用情况,高16位表示读锁的使用情况。

对于低16位所表示的数,值为0表示没有加写锁,值为1表示已加写锁,值大于1表示写锁的重入次数。对于高16位所表示的数,值为0表示没有加读锁,值为1表示已加读锁,不过,值大于1并不表示读锁的重入次数,而是表示读锁总共被获取了多少次(每个线程对读锁重入的次数相加)。那么,读锁的重入次数在哪里记录呢?毕竟重入次数是有用信息,只有重入次数大于0时,才可以继续重入。

当多个线程同时持有读锁时,每个线程都可以对读锁重复加锁,也就就是说,重入次数是跟每个线程相关的数据,我们可以使用ThreadLocal变量来存储,对于ThreadLocal,我们在后面的章节中信息讲解,在这里,你就简单将它看做线程的一个属性或者局部变量即可。

接下来,我们详细看下,读锁和写锁的实现原理。

1)写锁的实现原理

WriteLock写锁是排它,因此,它的实现原理跟上一节讲到的ReentrantLock的实现原理类似。WriteLock实现了Lock接口,因此,它也支持各种不同的加锁方式,比如可中断加锁、非阻塞加锁、可超时加锁。接下来,我们重点讲解WriteLock中的lock()函数和unlock()函数的实现原理。对于WriteLock中的tryLock()、带超时时间的tryLock()、lockInterruptibly()这三个加锁函数,你可以参考ReentrantLock中这三个函数的实现原理,以及结合源码,自行研究。

**我们先来看WriteLock中的lock()函数。**实现比较简单,直接调用了AQS中的acquire()模板方法。

public void lock() {
    sync.acquire(1);
}

AQS中的acquire()模板方法如下所示,在上一节中已经讲解,使用tryAcquire()竞争锁,如果竞争锁成功,则直接返回,如果竞争锁失败,则调用addWaiter()将线程放入等待队列的尾部,然后调用acquireQueued()阻塞线程等待被唤醒。

public final void acquire(int arg) {
    // tryAcquire() -> addWaiter() -> acquireQeuued()
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上一节,我们已经详细讲解了addWaiter()函数和acquiredQueued()函数,这里就不再赘述。我们重点看下tryAcquire()竞争锁的逻辑,它是AQS中的抽象方法,在NonfairSync和FairSync的公共父类Sync类中实现。代码如下所示。

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState(); //c为state的值
    int w = exclusiveCount(c); //低16位的值,也就是写锁的加锁情况
    
    // 1、已经加读锁或写锁(state!=0)
    if (c != 0) {
        //已加读锁(w==0)或者当前加写锁的线程不是自己
        if (w == 0 || current != getExclusiveOwnerThread())
            return false; //去排队
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        setState(c + acquires); //更新写锁的重入次数
        return true; //获取到了锁
    }
    
    // 2、没有加锁(state==0)
    if (writerShouldBlock()) return false; //去排队
    if (!compareAndSetState(c, c + acquires)) return false; //去排队
    setExclusiveOwnerThread(current);
    return true; //获取到了锁
}

我们重点看下writerShouldBlock()这个函数,这个函数控制着锁是否为公平锁。在state=0,也就是没有加读锁和写锁的情况下,如果writerShouldBlock()函数返回值为true,那么,线程不尝试竞争锁,而是直接去排队。如果writerShouldBlock()函数返回值为false,那么,线程先尝试竞争锁,不行再去排队。对于非公平锁,writerShouldBlock()总是返回false。对于公平锁,如果等待队列中有线程,那么writerShouldBlock()返回true。如果等待队列中没有线程,那么writerShouldBlock()返回false。

static final class NonfairSync extends Sync {
    final boolean writerShouldBlock() { return false; }
    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    }
}

static final class FairSync extends Sync {
    final boolean writerShouldBlock() { return hasQueuedPredecessors(); }
    final boolean readerShouldBlock() { return hasQueuedPredecessors(); }
}

在注释中,我对代码逻辑做了详细的介绍。这里就不再赘述。我将tryAcquire()的执行逻辑梳理并绘制成了一张流程图,如下所示,你可以对比着流程和注释来理解tryAcquire()的代码逻辑。

img
img

**我们再来看WriteLock的unlock()函数。**代码实现也比较简单,直接调用了AQS的release()模板方法。

public void unlock() { 
    sync.release(1);
}

AQS中的release()模板方法如下所示,在上一节中已经讲解,使用tryRelease()释放锁,然后唤醒等待队列中位于队首的线程。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

我们重点看下在写锁中tryRelease()抽象方法的代码实现,代码如下所示。tryRelease()的代码实现比较简单,在代码中,我详细作了注释,你可以参看注释了解其代码逻辑。这里就不再赘述了。

protected final boolean tryRelease(int releases) {
    // tryRelease()是AQS工作在独占模式下的函数,只能用于排它锁,也就是写锁
    if (!isHeldExclusively()) throw new IllegalMonitorStateException();
    // 更新state值,写锁的重入次数-releases,对于锁来说,releases总是等于1
    int nextc = getState() - releases;
    // 只有更新之后的state值为0时,才可以将写锁释放
    boolean free = exclusiveCount(nextc) == 0;
    if (free) setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

2)读锁的实现原理

刚刚我们讲了读写锁中的写锁的实现原理,现在,我们再来看下读锁的实现原理。写锁是排它锁,实现原理比较简单,而读锁是共享锁,实现原理相对来说更加复杂。跟WriteLock相同,ReadLock也实现了Lock接口。同样,支持各种不同的加锁方式(lock()、tryLock()、带超时时间的tryLock()、lockInterruptibly())。接下来,我们还是重点讲解lock()和unlock()这两个函数。对于其他加锁方式,你可以参看上一节的内容和源码,自行研究。

**我们先来看ReadLock中的lock()函数。**前面讲到,WriteLock中的lock()函数调用了AQS中的acquire()模板方法,这里ReadLock的lock()函数调用的是AQS中的acquireShared()模板方法。acquire()模板方法用于独占模式,acquireShared()模板方法用于共享模式。

public void lock() {
    sync.acquireShared(1);
}

我们再来看下AQS中acquireShared()的代码实现,如下所示。对比acquire()的代码实现,acquireShared()的代码实现同样也比较简单,调用tryAquireShared()去竞争锁,如果竞争成功,则直接返回,如果竞争失败,则调用doAcquireShared()去排队等待唤醒。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0) // 竞争读锁
        doAcquireShared(arg); // 竞争失败去排队
}

tryAcquiredShared()为AQS的抽象方法,其在AQS的子类Sync中实现,具体代码如下所示。tryAcquireShared()为了提高性能做了很多代码层面的优化,导致代码量很大。为了聚焦在基本实现原理上,在不改变基本实现原理的情况下,我对tryAcquireShared()中的代码做了简化。如果你想了解完成的代码,请自行查看源码。

// 返回-1表示竞争锁失败,返回1表示竞争锁成功
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    // 如果state没加锁或者是加了读锁,那么线程会通过CAS操作改变state值来竞争锁;
    // 如果其他线程也在竞争读锁,并且竞争成功,那么此线程就会竞争失败;
    // 于是,此线程就要自旋(for循环)再次尝试去竞争读锁。
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) { // 已加写锁
            // 如果加写锁的线程不是此线程,那么读锁也加不成,直接返回-1
            // 否则,读写锁支持锁降级,加了写锁的线程可以再加读锁
            if (getExclusiveOwnerThread() != current)
                return -1;
        } 
        
        //理论上讲,如果没有加写锁,不管有没有加读锁,都可以去竞争读锁了,毕竟读锁是共享锁。
        //但是,存在两个特殊情况:
        //1、对于公平锁来说,如果等待队列不为空,并且当前线程没有持有读锁(重入加锁),
        //   那么,线程就要去排队。
        //2、对于非公平锁来说,如果等待队列中队首线程(接下来要被唤醒的)是写线程,
        //   那么,线程就要去排队。这样做是为了避免请求写锁的线程迟迟获取不到写锁。
        if (sharedCount(c) != 0) { // 已加读锁
            if (readerShouldBlock()) { //上述1、2两种情况对应此函数的返回值为true
                if (readHolds.get().count == 0) // 此线程没有持有读锁,不能重入
                    return -1;
            }
        }
        // 以下是对上述代码中readHolds的解释:
        // readHolds是ThreadLocal变量,保存跟这个线程的读锁重入次数。
        // 如果重入次数为0,表示没有加读锁,返回-1去排队。
        // 如果重入次数大于等于0,表示已加读锁,可以继续重入,不用排队。
        
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //CAS竞争读锁,此时有可能还有其他线程在竞争读锁或写锁
        if (compareAndSetState(c, c + SHARED_UNIT)) { //SHARED_UNIT=1<<16
            // 竞争读锁成功
            readHolds.get().count++; //更新线程重入次数
            return 1; //成功获取读锁
        }
    }
}

从上述代码,我们可以发现,相对于tryAcquire()抽象方法,tryAcquireShared()要复杂很多。在注释中,我对代码逻辑做了详细的介绍。这里就不再赘述。我将tryAcquireShared()的执行逻辑梳理并绘制成了一张流程图,如下所示,你可以对比着流程和注释来理解tryAcquireShared()的代码逻辑。

img
img

接下来,我们再来看下doAcquireShared()函数,此函数负责排队和等待唤醒,代码如下所示。doAcquireShared()函数跟上一节讲到的acquireQueued()函数非常类似。区别主要有两点,如下注释所示。区别一是等待读锁的线程标记为SHARED,区别二是线程获取到读锁之后,如果下一个节点对应的线程也在等待读锁,那么也会被唤醒。下一个节点对应的线程获取到读锁之后,又会去唤醒下下个节点对应的线程(如果下下个节点对应的线程也在等待读锁的话)。唤醒操作一直传播下去,直到遇到等待写锁的线程为止。

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//区别一:标记此线程等待的是共享锁(读锁)
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //区别二:如果下一个节点对应的线程也在等待读锁,那么顺道唤醒它
                    setHeadAndPropagate(node, r); 
                    p.next = null; // help GC
                    if (interrupted) selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (parkAndCheckInterrupt()) interrupted = true;
        }
    } finally {
        if (failed) cancelAcquire(node);
    }
}

**我们先来看ReadLock中的unlock()函数。**代码实现也比较简单,直接调用了AQS的releaseShared()模板方法。

public void unlock() {
    sync.releaseShared(1);
}

AQS中的releaseShared()模板方法如下所示,调用tryReleaseShared()释放读锁,只有当所有的读锁都释放之后,state变为0,才会调用doReleaseShared()唤醒等待队列中位于队首的线程。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

doReleaseShared()的代码实现比较简单,我们重点看下tryReleaseShared()。tryReleaseShared()是AQS中的抽象方法,在Sync中实现,代码如下所示。

//当所有的读锁都释放之后(state变成0)才会返回true
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    readHolds.get().count--; //更新本线程对读锁的重入次数
    for (;;) { //因为有可能多个线程同时释放读锁,同时CAS更新state,因此要自旋+CAS
        int c = getState();
        // c-SHARED_UNIT:相当于将读锁的加锁次数-1
        int nextc = c - SHARED_UNIT; //SHARED_UNIT=1<<16
        if (compareAndSetState(c, nextc))
            return nextc == 0; //state变为0才会返回true,才会去唤醒等待队列中的线程
    }
}

四、读写锁的升级版

StampedLock是对ReadWriteLock的进一步优化,在读锁和写锁的基础之上,又提供了乐观读锁。实际上,乐观读锁并没有加任何锁。在读多写少的应用场景中,大部分读操作都不会被写操作干扰,因此,我们甚至可以将读锁也省略掉。只有验证读操作真正有被写操作干扰的情况下,线程再加读锁重复执行读操作。我们举一个例子解释一下。代码如下所示。

public class Demo {
  private StampedLock slock = new StampedLock();
  private List<String> list = new LinkedList<>();

  public void add(int idx, String elem) {
    long stamp = slock.writeLock(); //加写锁
    try {
      list.add(idx, elem);
    } finally {
      slock.unlockWrite(stamp); //释放写锁
    }
  }

  public String get(int idx) {
    long stamp = slock.tryOptimisticRead(); //加乐观读锁
    String res = list.get(idx);
    if (slock.validate(stamp)) { //没写操作干扰
      return res;
    }
    
    // 有写操作干扰,重新使用读锁,重新执行读操作
    stamp = slock.readLock(); //加读锁
    try {
      return list.get(idx);
    } finally {
      slock.unlockRead(stamp); //释放读锁
    }
  }
}

在上述代码中,tryOptimisticRead()获取的是乐观读锁,返回一个时间戳stamp。因为乐观读锁并非真正加锁,所以,乐观读锁并不需要解锁。在执行完读操作之后,我们只需要验证stamp是否有被更改,如果有被更改,说明执行读操作期间,writeLock()函数有被执行,也就说明有对共享资源的写操作发生(也就是执行了add()函数),此时,之前得到的结果需要作废,使用读锁来重新获取数据。

思考题

在本节的示例代码中,我们把lock()、tryLock()、lockInterruptibly()函数的调用,都放置于try-finally代码块之外,这是为什么?是否可以移到try-finally代码块之内呢?

本节中,我们讲到,ReentrantLock中的lock()函数使用AQS中的acquire()模板方法来实现,unlock()函数使用AQS中的release()模板方法来实现,lockInterruptibly()函数使用acquireInterruptibly()模板方法来实现,带超时时间的tryLock()函数使用AQS中的tryAcquireNanos()模板方法来实现,那么,ReentrantLock中的tryLock()函数是如何实现的呢?

如果一个线程在获取读锁之后,在读锁释放前,再次请求写锁,将会发生什么事情?

Loading...