avatar

37.Java并发之wait、notify&interrupt&join简析

上篇我们说到Synchronized的原理,那么接下来我们谈谈并发相关的其他东西。在这一篇我们会说到如下内容:

  1.wait/notify的使用(生产者/消费者)以及原理
  2.为什么Thread类中的一些方法会抛出InterruptedException
  3.join的原理
话不多说,开整。

0x1 wait/notify

  1. wait/notify初认识(也就是J.U.C下的Condition类)
    • wait/notify是两个方法, 定义下Object类中。
    • 两个方法的调用都需要放在Synchronized同步代码块中,不然会抛出IllegalMonitorStateException
    • wait方法会释放锁,然后把当前线程放在等待队列中阻塞。直到notify/notifyAll方法执行后,才被唤醒
    • notify、notifyAll都是唤醒在等待队列中的线程。notify是唤醒一个,notifyAll是唤醒所有。
  2. wait/notify经典案例:生产者/消费者
    • Provider:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      public class Provider implements Runnable{
      /**
      * 容器
      */
      private Queue<String> queue;
      /**
      * 容器最大的容量
      */
      private int maxSize;

      public Provider(Queue<String> queue, int maxSize) {
      this.queue = queue;
      this.maxSize = maxSize;
      }

      @Override public void run() {
      //死循环往queue中生成数据
      while(true){
      //wait/notify需要加锁
      synchronized (queue){
      if(queue.size() == maxSize){
      try {
      queue.wait();
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      //休息几秒
      try {
      TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      //往queue里面生产数据
      String data = UUID.randomUUID().toString();
      System.out.println("生产:"+data);
      queue.add(data);
      //因为生产了一个数据,所以需要唤醒下消费者线程去消费
      queue.notify();
      }
      }
      }
      }
    • Comsumer:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      public class Comsumer implements Runnable {
      /**
      * 容器
      */
      private Queue<String> queue;
      /**
      * 容器最大的容量
      */
      private int maxSize;

      public Comsumer(Queue<String> queue, int maxSize) {
      this.queue = queue;
      this.maxSize = maxSize;
      }

      @Override public void run() {
      //死循环,不断往容器中消费数据
      while(true){
      //wait/notify需要加锁
      synchronized(queue){
      if(queue.isEmpty()){
      try {
      queue.wait();
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      //休息几秒
      try {
      TimeUnit.MILLISECONDS.sleep(100);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      //这里说明queue里面不为空。则取数据
      String remove = queue.remove();
      System.out.println("消费:"+remove);
      //因为我们这里消费了一个,已经不是满的情况了,所以需要唤醒生产者去生产
      queue.notify();
      }
      }
      }
      }
    • Main:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      public class Main {
      public static void main(String[] args) {
      Queue<String> queue = new LinkedList<>();
      int maxSize = 10;
      Thread provider = new Thread(new Provider(queue,maxSize));
      Thread comsumer = new Thread(new Comsumer(queue,maxSize));
      provider.start();
      comsumer.start();
      }
      }
  3. 下面我们来看看wait/notify底层的代码
    • wait
      • java层面
        1
        2
        3
        4
        public final void wait() throws InterruptedException {
        wait(0);
        }
        public final native void wait(long timeout) throws InterruptedException;
        简单粗暴,直接调JVM虚拟机里面的代码。下面我们去一探究竟。
      • JVM层面
        • 上篇我们说Synchronized锁升级时,有说过一个同步队列的定义,是在objectMonitor.hpp中。然后在讲Condition的时候,提到了两个链表,一个是等待队列链表,一个是同步队列。当时也说,Condition的实现原理和wait/notify是差不多的。既然这样,那他肯定也有一个等待队列,定义同样是在objectMonitor.cpp中。如下:
        • 继续往下说,我们的wait、notify方法的实现,也是在这个类中。
        • 我们挑着几个地方来说,毕竟C++我也不太会。。。
          • 为什么wait需要获取到锁?如果没有同步代码块包裹还会抛出异常?,这段在源代码层面有体现,如下:

          • 包装成Node对象,加入到等待队列中的代码:
          • 好了,后面的代码看不懂了。原谅我菜。。。
    • notify
      • java层面
        1
        public final native void notify();
      • JVM层面
        • 和wait一样,我们直入主题,稍微浏览一下,C++实在是看不懂,请见谅。
          • 开局也是直接检查是否获取锁
          • 然后从等待队列中获取node
          • 最后加入到同步队列(只是一部分代码)
  4. 总结一下
    1. 开局来张图
    2. 剩下全靠编
      • 拿上面那个生产者消费者模型作为例子来说吧。简化一下,Provider的线程叫P,Consumer的线程叫C.
      • 假设P先获取锁,于是进入Synchronized体,判断queue是否以满,不为空就往里面加东西。然后释放锁。假设线程P的运气好,一直是他抢先获取锁。于是就一直往里面add东西,直到到了maxSize的限制。于是会调用wait()方法,进行等待。在wait方法内部,就会把这个线程加入到queue对象的等待队列中(因为调用的是queue.wait()),线程挂起,释放锁,等待唤醒。
      • 这时候,就没线程来和线程C来竞争了。C顺理成章的会拿到锁,进入Synchronized体执行代码。线程C从queue里面拿出一个元素后,调用了queue.notify()注意,这里和Condition有点不一样,Condition把等待队列中的第一个节点移到queue对象的同步队列,排队获取锁,而notify是在等待队列中随机挑一个移到queue对象的同步队列,然后线程C执行完代码后,会释放锁。这时候线程P和线程C又各凭本事来竞争锁了(竞争失败还是会在同步队列中等待)。
    3. wait/notify和sleep的区别
      • wait/notify会释放锁,而sleep不会。这是他们最大的区别

0x02 interrupted()和interrupt()方法

  1. 先提2个问题,然后我们根据这2个问题来展开了解下。
    • interrupted()和interrupt()方法的区别与联系
    • InterruptedException是什么?为什么涉及等待的方法,如sleep、wait方法都会抛出这个异常。碰到这个异常应该怎么做?
  2. 首先来看第一个问题,interrupted()和interrupt()方法的区别与联系;
    1. 简介下interrupt吧。interrupt是一个标记,标记该线程是否在等待的过程中被中断过,也就是是否被打断过,和唤醒不同,唤醒是正常流程,而中断是异常流程。举个例子说,你睡觉睡到自然醒,恩这个是正常的,但是你睡到中途,被人叫醒,这就是中断。
    2. 那么来说一下interrupted()和interrupt()方法分别是什么吧。
      • interrupted()方法
        • 借鉴下jdk doc上面的说法Tests whether the current thread has been interrupted.,翻译过来就是调用这个方法,可以获取当前线程是否被中断,并且重置为未中断状态。那么它是怎么判断是否被中断呢?这个方法签名是这样的:
          1
          2
          3
          4
          5
          6
          public static boolean interrupted() {
          //如果被中断过,则返回true,如果没被中断过,则返回false
          //这里传了true。所以每次获取完后,都会重置interrupted为false
          return currentThread().isInterrupted(true);
          }
          private native boolean isInterrupted(boolean ClearInterrupted);
        • 于是我们又得去翻JVM源码了。源码是在thread.cpp中。代码如下:
          1
          2
          3
          4
          5
          6
          7
          bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
          trace("is_interrupted", thread);
          debug_only(check_for_dangling_thread_pointer(thread);)
          // Note: If clear_interrupted==false, this simply fetches and
          // returns the value of the field osthread()->interrupted().
          return os::is_interrupted(thread, clear_interrupted);
          }
        • 因为各个系统的差别,所以os::is_interrupted()有多种实现,我们就拿在Linux下面的实现来说事吧,代码是在os_linux.cpp中。
        • 从上面看,最终的值是指向osthread里面的interrupted,点过去看一下,代码如下:
          1
          volatile bool interrupted() const { return _interrupted != 0; }
        • 从这里就看出来了,最终获取到的是_interrupted这个属性的值。
        • 然后重置interrupted的值,也可以从os_linux.cpp中的代码看的出来。如果被中断过,并且clear_interrupted=true的话,会设置_interrupted为false
      • interrup()方法
        • 还是先从java层面看一眼源码,然后再去JVM的源码里瞅瞅吧。
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          public void interrupt() {
          if (this != Thread.currentThread())
          checkAccess();

          synchronized (blockerLock) {
          Interruptible b = blocker;
          if (b != null) {
          interrupt0(); //设置中断
          b.interrupt(this);
          return;
          }
          }
          interrupt0();
          }

          private native void interrupt0();
        • 现在我们去JVM源码里看看interrupt0()是怎么实现的吧。
          • 刚开始也是先从thread.cpp中看起,代码如下:
            1
            2
            3
            4
            5
            6
            void Thread::interrupt(Thread* thread) {
            trace("interrupt", thread);
            debug_only(check_for_dangling_thread_pointer(thread);)
            //调用各个系统的interrupt方法
            os::interrupt(thread);
            }
          • 接下来我们还是以os_linux.cpp为例,代码如下:
            1
            2
            3
            4
            5
            6
            7
            8
            9
            10
            11
            12
            13
            14
            15
            16
            17
            18
            19
            20
            21
            22
            23
            24
            25
            void os::interrupt(Thread* thread) {
            assert(Thread::current() == thread || Threads_lock->owned_by_self(),
            "possibility of dangling Thread pointer");

            OSThread* osthread = thread->osthread();
            //如果线程没有被中断,就执行中断
            if (!osthread->interrupted()) {
            //设置_interrputed标志位的值为1
            osthread->set_interrupted(true);
            OrderAccess::fence();
            //唤醒sleep的线程
            ParkEvent * const slp = thread->_SleepEvent ;
            if (slp != NULL) slp->unpark() ;
            }

            // For JSR166. Unpark even if interrupt status already was set
            //如果是java的线程,则唤醒同步代码块和wait()
            if (thread->is_Java_thread())
            ((JavaThread*)thread)->parker()->unpark();

            ParkEvent * ev = thread->_ParkEvent ;
            if (ev != NULL) ev->unpark() ;
            }
            //设置标识
            void set_interrupted(bool z){ _interrupted = z ? 1 : 0; }
      • 总结一下:
        1. interrupted()方法是获取该线程是否被中断过,获取后,会把中断状态置为非中断。
        2. interrupt()方法是置该线程的中断标识为中断。并且会唤醒所有处于阻塞状态的线程。
        3. 两者的关系显而易见了,可以简单的这样理解,一个是get,一个是set。
    3. InterruptedException简单介绍
      • InterruptedException是所有的涉及阻塞的方法都会抛出的一个异常。那么为什么会抛出这个异常呢?这个异常和上面所说的interrupted()以及interrupt()有什么联系呢?
      • 假设有两个线程,P1,P2。首先我们应该知道,如果在P2中调用P1的interrupt()方法,这时候就会把P1的_interrupted标识置为1,并且会唤醒P1线程中正在阻塞的方法。
      • 那么为什么呢?首先我们知道,这些抛异常的方法都是阻塞的,而阻塞方法的释放会取决于一些外部的事件,但是阻塞方法可能因为等不到外部的触发事件而导致无法终止,所以它允许一个线程请求自己来停止它正在做的事情。当一个方法抛出 InterruptedException 时,它是在告诉调用者如果执行该方法的线程被中断,它会尝试停止正在做的事情并且通过抛出 InterruptedException 表示提前返回。所以,这个异常的意思是表示一个阻塞被其他线程中断了。然后,由于线程调用interrupt()中断方法,那么Object.waitThread.sleep 等被阻塞的线程被唤醒以后会通过 is_interrupted 方法判断中断标识的状态变化,如果发现中断标识为 true,则先清除中断标识,然后抛出InterruptedException
      • 那么如何处理呢?这里需要注意,InterruptedException 异常的抛出并不意味着线程必须终止!所以,你可以做以下操作:
        • 直接捕获异常不做任何处理
        • 将异常往外抛出
        • 停止当前线程,并打印异常信息
      • 下面我们以Thread.sleep为例,看一下他是怎么抛出异常的。源码位置是在share\vm\prims\jvm.cpp
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
        JVMWrapper("JVM_Sleep");

        if (millis < 0) {
        THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
        }
        //这里判断是否被中断
        if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
        }
        ....
  3. 整个interrupt就分析到这里了。有错误的地方,还望不吝指教。

0x03 join简析

  1. join方法简介
    • 首先join方法是Thread类中的方法。
    • join方法的作用是等待该线程结束,再往下执行。这样说法可能有点不清晰,引用Java 7 Concurrency Cookbook上的定义:join() method suspends the execution of the calling thread until the object called finishes its execution.也就是说,join方法会暂停调用该方法的线程,直到被调用方执行完成后才会继续往下执行。这样说好像也有点抽象,下面我们以一个例子来说明。
    • 一个简单的例子:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      public class JoinTest {
      public static void main(String[] args) {
      //首先创建1个线程.
      Thread t1 = new Thread(new Runnable(){
      @Override public void run() {
      System.out.println("我是线程T1,接下来我会睡2秒钟,让出CPU的执行权");
      try {
      TimeUnit.SECONDS.sleep(2);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      System.out.println("我是线程T1,我已经睡醒了,继续往下执行");
      }
      });

      t1.start();
      System.out.println("我是主线程,我下面不知道要干啥事。");
      try {
      //调用t1的join方法。
      t1.join();
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      System.out.println("我是主线程,我啥事也没干,一脸懵逼的走了。");

      }
      }
    • 结果如下:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      我是主线程,我下面不知道要干啥事。
      我是线程T1,接下来我会睡2秒钟,让出CPU的执行权
      我是线程T1,我已经睡醒了,继续往下执行
      我是主线程,我啥事也没干,一脸懵逼的走了。
      -----------------------下面是把join方法注释的结果----------------
      我是主线程,我下面不知道要干啥事。
      我是主线程,我啥事也没干,一脸懵逼的走了。
      我是线程T1,接下来我会睡2秒钟,让出CPU的执行权
      我是线程T1,我已经睡醒了,继续往下执行
    • 对比可见,线程T1睡觉的2秒钟内,是不会执行主线程的。因为在主线程中调用T1的join方法,是需要等待T1线程执行完后,才会去执行主线程。那么为什么会这样?下面就让我们一起来简单分析下。
  2. join方法实现简单分析
    • Thread.join()方法,我们还是拿上面的例子来说,源码如下:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      public final void join() throws InterruptedException {
      join(0);
      }
      //这里需要注意一下 synchronized
      public final synchronized void join(long millis) throws InterruptedException {
      long base = System.currentTimeMillis();
      long now = 0;

      if (millis < 0) {
      throw new IllegalArgumentException("timeout value is negative");
      }

      if (millis == 0) {
      //自旋判断当前线程T1是否还存活。如果存活就调用wait方法。
      while (isAlive()) {
      wait(0);
      }
      } else {
      while (isAlive()) {
      long delay = millis - now;
      if (delay <= 0) {
      break;
      }
      wait(delay);
      now = System.currentTimeMillis() - base;
      }
      }
      }
    • 下面我们来整理下需要注意的点:
      • 首先是join方法的签名,是有带synchronized的。我们知道Synchronized一定会有个对象保存锁状态,那么这里的synchronized的对象是谁?而加在方法上的,他的对象是this,那么这个this又是谁?恩,这个this就是T1,因为我们调用的t1.join()
      • 然后自旋判断是否存活,注意,这里判断的也是T1线程是否存活,然后调用wait()方法。此时需要注意一下,wait()方法是Object类中的方法。原理我们上面也分析了。总得来说,wait()方法会释放锁,并且把当前线程加入到等待队列里。这里又得注意了,当前线程指的不是T1哦,而是我们调用t1.join()方法的线程,也就是主线程!所以执行到wait(),我们的主线程自然而然的会加入到T1线程对象的等待队列中,也就是主线程阻塞了
      • 那么既然有wait(),肯定会在一个地方进行notify(),不然主线程就会一直阻塞在那里,就让我们找找notify()吧。其实这个notify是会在线程退出的时候进行调用的。不信,看源码,thread.cpp有如下方法:
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
        assert(this == JavaThread::current(), "thread consistency check");
        ....
        //就是在这里执行的notify()
        ensure_join(this);
        assert(!this->has_pending_exception(), "ensure_join should have cleared");
        .....
        }
        static void ensure_join(JavaThread* thread) {
        Handle threadObj(thread, thread->threadObj());
        assert(threadObj.not_null(), "java thread object must exist");
        ObjectLocker lock(threadObj, thread);
        thread->clear_pending_exception();
        java_lang_Thread::set_thread_status(threadObj(),java_lang_Thread::TERMINATED);
        java_lang_Thread::set_thread(threadObj(), NULL);
        //在这里调用notifyAll
        lock.notify_all(thread);
        thread->clear_pending_exception();
        }
      • 从上面可以知道,在T1线程退出的时候,会唤醒T1的等待队列中所有的node。从而在T1执行完后,主线程才能继续往下执行。

04.总结

  1. 今天这篇篇幅还是挺长的,但是总得来说,我们说了如下几点:
    • wait/notify的原理,可以说实现原理和Condition是差不多的。
    • interrupted、interrupt、interruptException的联系,通过分析我们知道了,线程中断是通过底层有个_interrupted的字段来标识的。
    • 简单分析了join的实现原理。

感谢大家观看,才疏学浅,不当之处,请不吝赐教。谢谢~


评论