锁存器和栅栏

Mr.ZhangJava多线程同步工具类大约 9 分钟

锁存器和栅栏

提示

如何编程测试接口在不同并发下的性能?

在平时开发中,有时候,我们需要自己编写代码,测试某个接口在不同并发下的性能,比如测试在N个(N可以是100、200等任意数)接口请求并发执行时接口的响应时间、QPS等。为了模拟接口的并发请求,我们需要用到多线程。除此之外,线程之间的协调执行,还需要用到本节要讲的CountDownLatch和CyclicBarrier。接下来,我们就结合这个需求的开发,详细讲解一下CountDownLatch和CyclicBarrier的用法以及实现原理。

一、CountDownLatch的用法

CountDownLatch中文名称叫作锁存器,其提供的常用方法有以下几个。

//构造函数,传入count值
public CountDownLatch(int count);

//阻塞等待count值变为0
public void await() throws InterruptedException;
public boolean await(long timeout, TimeUnit unit) throws InterruptedException;
  
//将count值减一  
public void countDown();

CountDownLatch的作用有点类似Thread类中的join()函数,用于一个线程等待其他多个线程的事件发生。对于join()函数来说,这里的事件指的是线程结束。对于CountDownLatch来说,这里的事件可以根据业务逻辑来定义。除此之外,使用join()需要知道被等待的线程是谁,而使用CountDownLatch则不需要。因此,CountDownLatch相对于join()函数来说,更加通用。

我们举个例子解释一下,代码如下所示。在DemoJoin中,主线程(执行main()函数的线程)调用join()函数阻塞等待线程t1和t2的结束。在DemoLatch中,CountDownLatch中的count值初始化为2,主线程调用await()函数阻塞等待count值变为0。另外两个线程在执行完部分逻辑之后,调用countDown()函数将count值减一。当两个线程均执行countDown()函数之后,count值变为0。阻塞在await()函数上的主线程被唤醒,继续执行后续逻辑。

public class DemoJoin {
  public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(new RunnableForJoin());
    Thread t2 = new Thread(new RunnableForJoin());
    t1.start();
    t2.start();
    t1.join(); //join()只用来等待线程执行结束,并且必须知道被等待线程是谁,
    t2.join();
  }

  public static class RunnableForJoin implements Runnable {
    @Override
    public void run() {
      //...do something...
    }
  }
}

public class DemoLatch {
  private static final CountDownLatch latch = new CountDownLatch(2);
  public static void main(String[] args) throws InterruptedException {
    new Thread(new RunnableForLatch()).start();
    new Thread(new RunnableForLatch()).start();
    latch.await(); //等待something执行完成而非等待线程结束,并且不需要知道在等谁
    //...执行后续逻辑...
  }

  public static class RunnableForLatch implements Runnable {
    @Override
    public void run() {
      //...do something...
      latch.countDown();
      //...do other thing...
    }
  }
}

二、CountDownLatch的实现原理

CountDownLatch的用法非常简单,其实现原理也不难,底层依赖AQS来实现。CountDownLatch的部分源码如下所示。AQS模板方法的使用方法是比较固定的,因此,CountDownLatch的代码结构跟之前讲过的ReentrantLock、ReentrantReadWriteLock、Semaphore的代码结构是类似的。代码结构大致为:具体化抽象模板方法类AQS;在具体类Sync中实现AQS中的抽象方法;使用具体类Sync中的模板方法来编程。

public class CountDownLatch {
    //具体化抽象模板方法类AQS
    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) { setState(count); } // 将count值存储在AQS的state中

        protected int tryAcquireShared(int acquires) { //实现AQS的抽象方法
            return (getState() == 0) ? 1 : -1; //检查count是不是为0了
        }

        protected boolean tryReleaseShared(int releases) { //实现AQS的抽象方法
            for (;;) { //执行count--
                int c = getState();
                if (c == 0) return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc)) return nextc == 0;
            }
        }
    }

    private final Sync sync;
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1); //使用Sync的模板方法编程
    }

    public boolean await(long timeout, TimeUnit unit) 
        throws InterruptedException {
         //使用Sync的模板方法编程
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    
    public void countDown() {
        sync.releaseShared(1); //使用Sync的模板方法编程
    }
}

我们重点看下await()和countDown()这两个函数的实现原理。

从上述代码,我们可以发现,await()函数直接调用AQS的acquireSharedInterruptibly()函数,acquireSharedInterruptibly()函数的源码如下所示。CountDownLatch的count值存储在AQS的state中,acquireSharedInterruptibly()函数调用tryAcquireShared()函数查看state是否为0。如果state为0,则直接返回,如果state不为0,则调用doAcquireSharedInterruptibly()函数阻塞等待state变为0。tryAcquireShared()函数在上述CountDownLatch的Sync内部类中已经给出。对于doAcquireSharedInterruptibly()函数,其跟我们之前在讲解ReentrantReadWriteLock实现原理时讲到的doAcquireShared()函数类似,读者可以自行查阅源码了解,这里就不再赘述了。

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted()) throw new InterruptedException();
    if (tryAcquireShared(arg) < 0) //查看state是否为0,否则就放入等待队列等待state为0
        doAcquireSharedInterruptibly(arg); //阻塞等待state为0
}

我们再来看下CountDownLatch中的countDown()函数,countDown()函数直接调用AQS的releaseShared()函数来实现。releaseShared()函数的源码如下所示。releaseShared()函数调用tryReleaseShared()函数将state减一,如果此时state变为0,则执行doReleaseShared()函数唤醒等待队列中的线程,也就是唤醒调用了await()函数的线程。tryReleaseShared()函数在上述CountDownLatch的Sync内部类中已经给出。对于doReleaseShared()函数,其在我们讲解ReentrantReadWriteLock实现原理时已经详细讲解,这里就不再赘述了。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//state--,如果state变为0,则执行doReleaseShared()
        doReleaseShared(); //唤醒等待队列中的线程
        return true;
    }
    return false;
}

三、CyclicBarrier的用法

接下来,我们再来看下CyclicBarrier。CyclicBarrier包含的常用方法有如下所示。

//构造函数,传入parties
public CyclicBarrier(int parties);

//调用await()函数的线程会将parties减一,如果不为0,则阻塞,直到为0为止
public int await() throws InterruptedException, BrokenBarrierException;
public int await(long timeout, TimeUnit unit)
    throws InterruptedException, BrokenBarrierException, TimeoutException;

CyclicBarrier的中文名为栅栏,非常形象地解释了CyclicBarrier的作用,用于多个线程互相等待,互相等待的线程都就位之后,再同时开始执行。我们举个例子解释一下,如下代码所示。我们创建了一个parties为10的CyclicBarrier对象,用于10个线程之间互相等待。尽管这10个线程启动(执行start()函数)的时间不同,但每个线程启动之后,都会调用await()函数,将parties减一,然后检查parties是否为0。如果parties不为0,则当前线程阻塞等待。如果parties为0,则当前线程唤醒所有调用了await()函数的线程。

public class Demo {
  private static final CyclicBarrier barrier = new CyclicBarrier(10);
  public static void main(String[] args) {
    for (int i = 0; i < 10; ++i) {
      new Thread(new Runnable() {
        @Override
        public void run() {
          try {
            barrier.await();
          } catch (InterruptedException e) { //当前线程被中断
            e.printStackTrace();
          } catch (BrokenBarrierException e) { //其他线程调用await()期间被中断
            e.printStackTrace();
          }
          //执行业务逻辑
        }
      }).start();
    }
    //主线程需要等待以上10个线程执行结束,方法有以下3种:
    //1) Sleep()  2) join()  3) CountDownLatch
  }
}

对于CountDownLatch和CyclicBarrier,前者是用于一个线程阻塞等待其他线程,后者是用于多个线程互相等待。实际上,使用CountDownLatch,我们也可以实现CyclicBarrier所能实现的功能。如下代码所示。我们创建一个count值为1的CountDownLatch对象,10个线程均调用await()函数阻塞等待count为0。主线程调用countDown()函数将count值减一,变为0,然后唤醒调用了await()函数的这10个线程,以此达到让这10个线程同步执行的目的。

public class Demo {
  private static final CountDownLatch latch = new CountDownLatch(1);
  public static void main(String[] args) {
    for (int i = 0; i < 10; ++i) {
      new Thread(new Runnable() {
        @Override
        public void run() {
          try {
            latch.await();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          //执行业务逻辑
        }
      }).start();
    }
    latch.countDown();
    //主线程需要等待以上10个线程执行结束,方法有以下3种:
    //1) Sleep()  2) join()  3) CountDownLatch
  }
}

四、CyclicBarrier的实现原理

CyclicBarrier的使用方法比较简单,接下来,我们来看下它的实现原理。跟CountDownLatch不同,CyclicBarrier并未使用AQS来实现,而是使用之前讲到条件变量来实现的。CyclicBarrier类的源码如下所示。为了更清晰地展示其核心实现原理,我对CyclicBarrier中的源码做了简化。调用await()函数先将parties减一,然后检查parties是否为0,如果不为0,则调用Condition上的await()函数让当前线程阻塞等待,如果为0,则调用Condition上的signalAll()函数唤醒所有调用了await()函数的线程。

public class CyclicBarrier {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();
    private final int parties;
    
    public CyclicBarrier(int parties) {
        this.parties = parties;
    }
    
    //函数返回值线程调用await()函数之后的parties剩余值
    //注意:以下代码逻辑省略掉了对InterruptedException和BrokenBarrierException的处理
    public int await() throws InterruptedException, BrokenBarrierException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int index = --parties;
            if (index == 0) {  //最后一个调用await()的线程,唤醒其他线程
                trip.signalAll();
                return 0;
            }
            trip.await();
            return index;
        } finally {
            lock.unlock();
        }
    }
}

五、在接口性能测试中的应用

对CountDownLatch和CyclicBarrier的用法和实现原理有了了解之后,接下来,我们使用CountDownLatch和CyclicBarrier来编写开头的接口并发性能测试代码。具体代码如下所示。如果要测试N个接口请求并发执行时接口的性能,我们需要创建N个测试线程,让每个测试线程循环执行接口请求,并且记录每个接口请求的响应时间。主线程通过CountDownLatch来等待其他测试线程执行完成,然后,再通过记录的运行数据统计接口的性能,比如平均响应时间、QPS等。除此之外,在以下代码实现中,我们还使用了CyclicBarrier,让各测试线程更加精确地同时开始执行,以便更加准确地测试指定并发下的接口性能。

public class ApiBenchmark {
  private static int numThread = 20; //并发度为20
  private static int numReqPerThread = 1000; //每个线程请求1000次接口
  
  private static CountDownLatch latch = new CountDownLatch(numThread);
  private static CyclicBarrier barrier = new CyclicBarrier(numThread);
  
  public static void main(String[] args) throws InterruptedException {
    // 创建线程
    Thread[] threads = new Thread[numThread];
    TestRunnable[] runnables =new TestRunnable[numThread];
    for (int i = 0; i < numThread; ++i) {
      runnables[i] = new TestRunnable();
      threads[i] = new Thread(runnables[i]);
    }
    // 启动线程
    long startTime = System.nanoTime();
    for (int i = 0; i < numThread; ++i) {
      threads[i].start();
    }
    // 等待测试线程结束
    latch.await();
    long endTime = System.nanoTime();

    // 统计接口性能
    long qps = (numThread*numReqPerThread*1000)/((endTime-startTime)/1000000);
    float avgRespTime = 0.0f;
    for (int i = 0; i < numThread; ++i) {
      for (Long respTime : runnables[i].respTimes) {
        avgRespTime += respTime;
      }
    }
    avgRespTime /= (numThread*numReqPerThread);
  }

  public static class TestRunnable implements Runnable {
    public List<Long> respTimes = new ArrayList<>();
    @Override
    public void run() {
      try {
        barrier.await();
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (BrokenBarrierException e) {
        e.printStackTrace();
      }
      for (int i = 0; i < numReqPerThread; ++i) {
        long reqStartTime = System.nanoTime();
        // ...调用接口
        long reqEndTime = System.nanoTime();
        respTimes.add(reqEndTime-reqStartTime);
      }
      latch.countDown();
    }
  }
}

六、思考题

在本节中,我们讲到,CountDownLatch底层使用AQS来实现,CyclicBarrier底层使用条件变量来实现,那么,本节中提到的join()函数是怎么实现的呢?

Loading...