小强的进阶之路 · 2019年08月21日

Executor线程池只看这一篇就够了

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。

线程实现方式

Thread、Runnable、Callable

//实现Runnable接口的类将被Thread执行,表示一个基本任务
public interface Runnable {
    //run方法就是它所有内容,就是实际执行的任务
    public abstract void run();
}
//Callable同样是任务,与Runnable接口的区别在于它接口泛型,同时它执行任务候带有返回值;
//Callable的使用通过外层封装成Future来使用
public interface Callable<V> {
    //相对于run方法,call方法带有返回值
    V call() throws Exception;
}

注意:启动Thread线程只能用start(JNI方法)来启动,start方法通知虚拟机,虚拟机通过调用器映射到底层操作系统,通过操作系统来创建线程来执行当前任务的run方法

Executor框架

Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。
从图中可以看出Exectuor下有一个重要的子接口ExecutorService,其中定义了线程池的具体行为:

  • execute(Runnable runnable):执行Runnable类型的任务
  • submit(task):用来提交Callable或者Runnable任务,并返回代表此任务的Future对象
  • shutdown():在完成已经提交的任务后封闭办事,不在接管新的任务
  • shutdownNow():停止所有正在履行的任务并封闭办事
  • isTerminated():是一个钩子函数,测试是否所有任务都履行完毕了
  • isShutdown():是一个钩子函数,测试是否该ExecutorService是否被关闭

Executor框架

ExecutorService中的重点属性:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

ctl:对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它包含两部分信息:线程池的运行状态(runState)和线程池内有效线程的数量(workerCount)。

这里可以看到,使用Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY 就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。

ctl相关方法:

//获取运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//获取活动线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }
//获取运行状态和活动线程数的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池的状态:

RUNNING = ­1 << COUNT_BITS; //高3位为111
SHUTDOWN = 0 << COUNT_BITS; //高3位为000
STOP = 1 << COUNT_BITS; //高3位为001
TIDYING = 2 << COUNT_BITS; //高3位为010
TERMINATED = 3 << COUNT_BITS; //高3位为011

线程池运行状态扭转

1、RUNNING

  • 状态说明:线程池处于RUNNING状态,能够接收新任务,以及对已添加的任务进行处理。
  • 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。

2、SHUTDOWN

  • 状态说明:线程池处于SHUTDOWN状态,不接收新任务,能够处理已经添加的任务。
  • 状态切换:调用shutdown()方法时,线程池由RUNNING -> SHUTDOWN。

3、STOP

  • 状态说明:线程池处于STOP状态,不接收新任务,不处理已提交的任务,并且会中断正在处理的任务。
  • 状态切换:调用线程池中的shutdownNow()方法时,线程池由(RUNNING or SHUTDOWN) -> STOP。

4、TIDYING

  • 状态说明:当所有的任务已经停止,ctl记录“任务数量”为0,线程池会变为TIDYING状态。当线程池处于TIDYING状态时,会执行钩子函数 terminated()。 terminated()在ThreadPoolExecutor类中是空, 的,若用户想在线程池变为TIDYING时,进行相应处理,可以通过重载 terminated()函数来实现。
  • 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行任务也为空时,就会由SHUTDOWN -> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP-> TIDYING。

5、TERMINATED

  • 状态说明:线程池线程池彻底停止,线程池处于TERMINATED状态,
  • 状态切换:线程池处于TIDYING状态时,执行完terminated()之后, 就会由TIDYING->TERMINATED。

线程池使用

RunTask类:

public class RunTask implements Runnable {
    public void run() {
        System.out.println("Thread name:"+Thread.currentThread().getName());
    }
}

ExecutorSample类:

public class ExecutorSample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i=0;i<20;i++){
            //提交任务无返回值
            executor.execute(new RunTask());
            //任务执行完成后有返回值
            Future<Object> future = executor.submit(new RunTask());
        }
    }
}

线程池的具体使用:

  • ThreadPoolExecutor 默认线程池
  • ScheduledThreadPoolExecutor 定时线程池

ThreadPoolExecutor

线程池的创建:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
  • corePoolSize:线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
  • maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。
  • keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize时候,如果这时候没有新的任务提交,核心线程外的线程不会立即被销毁,而是会等待,直到等待的时间超过了keepAliveTime

unit:keepAliveTime的单位时间

  • workQueue:用于保存等待被执行的任务的阻塞队列,且任务必须实现Runnable接口,在JDK中提供了如下阻塞队列:

ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务。
LinkedBlockingQueue:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQueue。
SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常高于LinkedBlockingQueue。

  • PriorityBlockingQueue:具有优先级的无界阻塞队列。
  • threadFactory:ThreadFactory 类型的变量,用来创建新线程。默认使用ThreadFactory.defaultThreadFactory来创建线程, 会使新创建线程具有相同的NORM_PRIORITY优先级并且都是非守护线程,同时也设置了线程名称。
  • handler:线程池的饱和策略。当阻塞队列满了,且没有空闲的工作队列,如果继续提交任务,必须采用一种策略处理该任务.

线程池的监控:

public long getTaskCount() //线程池已执行与未执行的任务总数
public long getCompletedTaskCount() //已完成的任务数
public int getPoolSize() //线程池当前的线程数
public int getActiveCount() //线程池中正在执行任务的线程数量

线程池的原理

线程池执行示意图

  • 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意这一个步骤需要获取全局锁)。
  • 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
  • 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意这一个步骤需要获取全局锁)。
  • 如果创建的新线程将使当前运行的线程超出maximumPoolSize,任务将被执行饱和策略。

ThreadPoolExecutor 采用上述的设计思路,是为执行execute()方法时,尽可能避免获取全局锁(一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后,几乎所有的execute()方法调用都是在执行步骤2,而步骤2不需要获取全局锁。

还没关注我的公众号?
  • 扫文末二维码关注公众号【小强的进阶之路】可领取如下:
  • 学习资料: 1T视频教程:涵盖Javaweb前后端教学视频、机器学习/人工智能教学视频、Linux系统教程视频、雅思考试视频教程;
  • 100多本书:包含C/C++、Java、Python三门编程语言的经典必看图书、LeetCode题解大全;
  • 软件工具:几乎包括你在编程道路上的可能会用到的大部分软件;
  • 项目源码:20个JavaWeb项目源码。

小强的进阶之路二维码

推荐阅读
目录
极术微信服务号
关注极术微信号
实时接收点赞提醒和评论通知
安谋科技学堂公众号
关注安谋科技学堂
实时获取安谋科技及 Arm 教学资源
安谋科技招聘公众号
关注安谋科技招聘
实时获取安谋科技中国职位信息