概述

线程池
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

在执行一个异步任务或并发任务时,往往是通过直接new Thread()方法来创建新的线程,这样做弊端较多,更好的解决方案是合理地利用线程池,线程池的优势很明显

  1. 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
  2. 提高系统响应速度,当有任务到达时,无需等待新线程的创建便能立即执行;
  3. 方便线程并发数的管控,线程若是无限制的创建,不仅会额外消耗大量系统资源,更是占用过多资源而阻塞系统或oom等状况,从而降低系统的稳定性。线程池能有效管控线程,统一分配、调优,提供资源使用率;
  4. 更强大的功能,线程池提供了定时、定期以及可控线程数等功能的线程池,使用方便简单

    用法

    Executors类提供了4种不同的线程池:newCachedThreadPool, newFixedThreadPool, newScheduledThreadPool, newSingleThreadExecutor
  5. newCachedThreadPool

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public void cachedThreadPoolDemo(){
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    for (int i = 0; i < 5; i++) {
    final int index = i;
    cachedThreadPool.execute(new Runnable() {
    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName()+", index="+index);
    }
    });
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    //运行结果
    pool-1-thread-1, index=0
    pool-1-thread-1, index=1
    pool-1-thread-1, index=2
    pool-1-thread-1, index=3
    pool-1-thread-1, index=4
  6. newFixedThreadPool
    创建一个固定大小的线程池,该方法可指定线程池的固定大小,对于超出的线程会在LinkedBlockingQueue队列中等待

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public void fixedThreadPoolDemo(){
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 6; i++) {
    final int index = i;
    fixedThreadPool.execute(new Runnable() {
    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName()+", index="+index);
    }
    });
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    // 运行结果
    pool-1-thread-1, index=0
    pool-1-thread-2, index=1
    pool-1-thread-3, index=2
    pool-1-thread-1, index=3
    pool-1-thread-2, index=4
    pool-1-thread-3, index=5
  7. newSingleThreadExecutor
    创建一个只有线程的线程池,该方法无参数,所有任务都保存队列LinkedBlockingQueue中,等待唯一的单线程来执行任务,并保证所有任务按照指定顺序(FIFO或优先级)执行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public void singleThreadExecutorDemo(){
    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 3; i++) {
    final int index = i;
    singleThreadExecutor.execute(new Runnable() {
    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName()+", index="+index);
    }
    });
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    // 运行结果
    pool-1-thread-1, index=0
    pool-1-thread-1, index=1
    pool-1-thread-1, index=2
  8. newScheduledThreadPool
    创建一个可定时执行或周期执行任务的线程池,该方法可指定线程池的核心线程个数。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public void scheduledThreadPoolDemo(){
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
    //定时执行一次的任务,延迟1s后执行
    scheduledThreadPool.schedule(new Runnable() {
    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName()+", delay 1s");
    }
    }, 1, TimeUnit.SECONDS);
    //周期性地执行任务,延迟2s后,每3s一次地周期性执行任务
    scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName()+", every 3s");
    }
    }, 2, 3, TimeUnit.SECONDS);
    }
    //运行结果
    pool-1-thread-1, delay 1s
    pool-1-thread-1, every 3s
    pool-1-thread-2, every 3s
    pool-1-thread-2, every 3s
  • schedule(Runnable command, long delay, TimeUnit unit),延迟一定时间后执行Runnable任务;
  • schedule(Callable callable, long delay, TimeUnit unit),延迟一定时间后执行Callable任务;
  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit),延迟一定时间后,以间隔period时间的频率周期性地执行任务;
  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,TimeUnit unit),与scheduleAtFixedRate()方法很类似,但是不同的是scheduleWithFixedDelay()方法的周期时间间隔是以上一个任务执行结束到下一个任务开始执行的间隔,而scheduleAtFixedRate()方法的周期时间间隔是以上一个任务开始执行到下一个任务开始执行的间隔,也就是这一些任务系列的触发时间都是可预知的。

    四种方法的参数对比

    原理

  • ThreadPoolExecutor
    1
    2
    3
    4
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    Executors.defaultThreadFactory(), defaultHandler);
    }

创建线程池,在构造一个新的线程池时,必须满足下面的条件:

  1. corePoolSize(线程池基本大小)必须大于或等于0;
  2. maximumPoolSize(线程池最大大小)必须大于或等于1;
  3. maximumPoolSize必须大于或等于corePoolSize;
  4. keepAliveTime(线程存活保持时间)必须大于或等于0;
  5. workQueue(任务队列)不能为空;
  6. threadFactory(线程工厂)不能为空,默认为DefaultThreadFactory类
  7. handler(线程饱和策略)不能为空,默认策略为ThreadPoolExecutor.AbortPolicy。
  • 参数详解
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
    maximumPoolSize <= 0 ||
    maximumPoolSize < corePoolSize ||
    keepAliveTime < 0)
    throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
    }

参数说明

  1. corePoolSize(线程池基本大小):当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize时,才会根据是否存在空闲线程,来决定是否需要创建新的线程。除了利用提交新任务来创建和启动线程(按需构造),也可以通过 prestartCoreThread() 或 prestartAllCoreThreads() 方法来提前启动线程池中的基本线程。

  2. maximumPoolSize(线程池最大大小):线程池所允许的最大线程个数。当队列满了,且已创建的线程数小于maximumPoolSize,则线程池会创建新的线程来执行任务。另外,对于无界队列,可忽略该参数。

  3. keepAliveTime(线程存活保持时间):默认情况下,当线程池的线程个数多于corePoolSize时,线程的空闲时间超过keepAliveTime则会终止。但只要keepAliveTime大于0,allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程。另外,也可以使用setKeepAliveTime()动态地更改参数。

  4. unit(存活时间的单位):时间单位,分为7类,从细到粗顺序:NANOSECONDS(纳秒),MICROSECONDS(微妙),MILLISECONDS(毫秒),SECONDS(秒),MINUTES(分),HOURS(小时),DAYS(天);

  5. workQueue(任务队列):用于传输和保存等待执行任务的阻塞队列。可以使用此队列与线程池进行交互:
    如果运行的线程数少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
    如果运行的线程数等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
    如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

  6. threadFactory(线程工厂):用于创建新线程。由同一个threadFactory创建的线程,属于同一个ThreadGroup,创建的线程优先级都为Thread.NORM_PRIORITY,以及是非守护进程状态。threadFactory创建的线程也是采用new Thread()方式,threadFactory创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号);

  7. handler(线程饱和策略):当线程池和队列都满了,则表明该线程池已达饱和状态。
    ThreadPoolExecutor.AbortPolicy:处理程序遭到拒绝,则直接抛出运行时异常 RejectedExecutionException。(默认策略)
    ThreadPoolExecutor.CallerRunsPolicy:调用者所在线程来运行该任务,此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
    ThreadPoolExecutor.DiscardPolicy:无法执行的任务将被删除。
    ThreadPoolExecutor.DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重新尝试执行任务(如果再次失败,则重复此过程)。

  • 排队有三种通用策略
  1. 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

  2. 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

  3. 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。