java线程池实现

线程池 Thread Pool 是一种基于池化思想管理线程的工具,线程池维护多个线程,等待管理者分配可并发执行的任务:

  • 一方面避免了处理任务时创建销毁线程开销的代价,
  • 另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

合理地使用线程池有如下优点:

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
  • 提高响应速度:当任务达到时,任务可以不需要等到线程创建就能立即执行;
  • 提高线程的可管理性:线程资源是稀缺的,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

ThreadPoolExecutor基础与使用

在java中 ThreadPoolExecutor 是核心的线程池实现,这个类实现了一个线程池需要的各个方法,它实现了任务提交、线程管理、监控等等方法。

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){}

线程池核心参数

  • corePoolSize:核心线程数。线程池在创建后,会一直保持这个数量的线程。当有任务提交时,如果当前运行的线程数小于corePoolSize,线程池会创建一个新的核心线程来执行任务。
  • maximumPoolSize:最大线程数。线程池中允许创建的最大线程数。当工作队列已满,且当前运行的线程数小于maximumPoolSize时,线程池会创建新的非核心线程来执行任务。
  • keepAliveTime:非核心线程空闲时间。当线程池中的线程数量超过corePoolSize时,如果某个线程空闲时间超过keepAliveTime,它就会被终止,直到线程数量回到corePoolSize。
  • TimeUnit unit:keepAliveTime的时间单位。
  • BlockingQueue workQueue:任务队列。当核心线程都在忙碌时,新提交的任务会被放入这个队列中等待执行。
  • ThreadFactory threadFactory:线程工厂。用于创建新线程。
  • RejectedExecutionHandler handler:拒绝策略。当线程池和工作队列都已满时,用于处理新任务的策略。

BlockingQueue阻塞队列

Java线程池(ThreadPoolExecutor)常用的几种阻塞队列类型如下:

  • ArrayBlockingQueue

    • 实现:基于数组实现有界的阻塞队列,遵循 FIFO(先进先出)原则。
    • 队列满时: 当队列已满,提交任务的线程会被阻塞,直到队列有可用空间。
    • 公平性: 构造函数可选择是否启用公平策略。公平模式下,等待时间最长的线程优先获得队列访问权。
    • 适用场景: 任务提交和处理速度都可控,需要限制等待任务数量以控制系统资源的场景。
  • LinkedBlockingQueue

    • 实现:基于链表实现的可选有界(默认无界)的阻塞队列。
    • 无界模式: 如果不指定容量,LinkedBlockingQueue 的最大容量为 Integer.MAX_VALUE。当核心线程数已满时,所有新任务都会进入此队列。由于队列无限大,永远不会触发最大线程数的创建,也不会执行拒绝策略。这可以防止任务被拒绝,但如果任务处理速度跟不上,可能导致内存溢出。
    • 有界模式: 在构造时可指定容量,例如 new LinkedBlockingQueue<>(100)。此时,它类似于 ArrayBlockingQueue,在队列满时会阻塞生产者。
    • 适用场景: 任务提交速度相对稳定,且任务量不会瞬间暴增,或者希望任务队列尽可能容纳所有任务的场景。
  • SynchronousQueue:一个没有容量的阻塞队列,每次插入操作都必须等待另一个线程的移除操作。

    • 直接交付: 任务提交后不会被存储在队列中,而是直接交付给工作线程。如果没有空闲线程,会立即创建一个新线程(不超过 maximumPoolSize)来执行任务。
    • 即时执行: 这种“手递手”的模式确保了任务提交后能被立即处理,但可能导致线程数快速增长。
    • 适用场景: 高响应性要求,需要快速处理突发性任务,且 maximumPoolSize 足够大以避免拒绝任务的场景。newFixedThreadPool 和 newCachedThreadPool 在内部就使用了 SynchronousQueue。
  • LinkedTransferQueue:TransferQueue 接口的一个无界、非阻塞、线程安全的实现。

    • 融合了 SynchronousQueue 的即时传递特性和 LinkedBlockingQueue 的链表队列特性,并且使用 无锁(CAS操作) 的方式来最大化并发性能。
    • 无界: 逻辑上无容量限制。
    • 高效传输: 结合了 SynchronousQueue 的即时传输能力和 LinkedBlockingQueue 的队列能力,性能极高。transfer() 方法,生产者可以阻塞等待消费者接收,实现零缓冲区的同步交付。
    • 适用场景:适用于需要低延迟、高并发消息传递的场景。
  • PriorityBlockingQueue:一个无界的阻塞队列,其中元素根据其自然排序或指定的 Comparator 进行排序。

    • 优先级: 队列中的任务不是按照 FIFO 顺序执行,而是按照优先级顺序执行。
    • 无界: 类似 LinkedBlockingQueue 的无界模式,maximumPoolSize 无效,且任务不会被拒绝,但可能导致内存问题。
    • 适用场景: 需要根据任务的优先级来安排执行顺序,而不是简单的 FIFO 顺序的场景。
  • DelayQueue(少用于线程池,但也是阻塞队列的一种)

    • 一个无界阻塞队列,只在元素的延迟时间到期时才允许取出元素。
    • 延迟执行: 放入 DelayQueue 的元素必须实现 Delayed 接口,该接口定义了任务的延迟时间。只有当队列中的任务到期时,消费者才能取出并执行该任务。
    • 适用场景: 需要延时执行任务的场景,例如定时任务调度、缓存过期清理等。

线程池最常用的是前三种:ArrayBlockingQueue、LinkedBlockingQueue 和 SynchronousQueue。可以通过ThreadPoolExecutor的构造函数自定义队列类型,实现不同的任务调度策略。

基于LinkedTransferQueue实现ThreadPoolExecutor自定义队列:

LinkedTransferQueue 实现了一个基于链接节点的、线程安全的 TransferQueue 接口,该队列中的元素可以在生产者线程和消费者线程之间高效传输,LinkedTransferQueue 通常用于需要高效、线程安全的数据传输的场景,尤其是当生产和消费速率不一致时。

相较于SynchronousQueue,区别在于通用性、内部缓冲能力、并发实现方式:

  • SynchronousQueue 是一个纯粹的握手(handoff)机制。它就像一个接力棒交换点,生产者扔出任务的同时,必须有消费者准备接住,否则双方都会阻塞。它永远不会在内部存储元素。如果需要一个简单的、高效的、零容量的线程间直接数据交换点时,例如 Executors.newCachedThreadPool() 的默认队列。
  • LinkedTransferQueue则更灵活。如果调用 put() 或 offer() 方法,且当前没有等待的消费者,它会将元素像普通队列一样存储在内部链表中。只有在使用 transfer() 方法时,才会强制执行即时传递的阻塞行为。如果需要一个更通用的消息传递工具,它可以根据情况动态选择是即时传递(低延迟)还是排队缓冲(高吞吐量),并且需要极致的并发性能时。

和LinkedBlockingQueue相比,LinkedTransferQueue有更好的性能。LinkedTransferQueue采用 无锁(Lock-Free) 算法,完全依赖于底层的 CAS (Compare-And-Swap) 原子操作来更新链表节点和指针。这消除了锁带来的开销和竞争,尤其是在多核处理器环境下,性能通常远超 LinkedBlockingQueue。而LinkedBlockingQueue使用了两把独立的 ReentrantLock 锁 (takeLock 和 putLock) 来保护队列的头部和尾部。生产者和消费者可以在不同的锁下并行操作,例如,一个线程在添加元素到队尾,另一个线程在从队头移除元素,但在高并发生产者之间或高并发消费者之间仍然存在锁竞争。

  • LinkedBlockingQueue: 适合传统的生产者-消费者模型,需要一个稳定的缓冲区来平衡不同生产/消费速率的场景。它提供了容量控制(有界队列可以防止内存耗尽)。
  • LinkedTransferQueue: 适用于高性能的消息传递、事件驱动架构或需要低延迟同步通信的场景。它的无界特性意味着需要自行管理任务堆积导致的内存使用。下面是一个基于LinkedTransferQueue容量限制的队列实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class WorkerQueue extends LinkedTransferQueue<Runnable> {

private DefaultThreadPoolExecutor executor;

public void setExecutor(DefaultThreadPoolExecutor executor) {
this.executor = executor;
}

public boolean force(Runnable o) {
if (!executor.isShutdown()) {
throw new RejectedExecutionException(
"Executor not running, can't force a command into the queue");
}
return super.offer(o);
}

// 仿tomcat TaskQueue
@Override
public boolean offer(@Nonnull Runnable runnable) {
int poolSize = executor.getPoolSize();

// 当线程数达到最大线程数时,新提交任务入队
if (poolSize == executor.getMaximumPoolSize()) {
return super.offer(runnable);
}

// 当提交的任务数小于线程池中已有的线程数时,即有空闲线程,任务入队即可
if (executor.getSubmittedTasksCount().get() <= poolSize) {
return super.offer(runnable);
}

// 如果当前线程数量未达到最大线程数,直接返回false,让线程池创建新线程
if (poolSize < executor.getMaximumPoolSize()) {
return false;
}
// 最后的兜底,放入队列
return super.offer(runnable);
}
}

ThreadFactory

ThreadFactory 表示线程工厂,用于指定为线程池创建新线程的方式,threadFactory可以设置线程名称、线程组、优先级等参数。在jdk中,ThreadFactory 是一个接口,在使用时需要自己实现,下面是一个自定义ThreadFactory实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class DefaultThreadFactory implements ThreadFactory {
private final AtomicInteger poolId = new AtomicInteger(0);
private final AtomicInteger threadId = new AtomicInteger(0);

private final String prefix;
private final boolean daemon;
private final int priority;
protected final ThreadGroup group;

public DefaultThreadFactory(String name) {
this(name, false);
}

public DefaultThreadFactory(String name, boolean daemon) {
this(name, daemon, Thread.NORM_PRIORITY, null);
}

public DefaultThreadFactory(String name, boolean daemon, ThreadGroup group) {
this(name, daemon, Thread.NORM_PRIORITY, group);
}

public DefaultThreadFactory(String name, boolean daemon, int priority, ThreadGroup group) {
this.prefix = name + "-" + poolId.incrementAndGet() + "-thread-";
this.daemon = daemon;
this.priority = priority;
this.group = group;
}

@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(group, runnable, prefix + threadId.incrementAndGet(), 0);
if (thread.isDaemon() != daemon) {
thread.setDaemon(daemon);
}

if (thread.getPriority() != priority) {
thread.setPriority(priority);
}
return thread;
}
}

通过实现ThreadFactory接口,我们可以完全控制线程的创建过程,使其更适合特定的应用场景:

  • 需要为线程池中的线程提供有意义的名称时。
  • 需要统一配置线程属性时。
  • 需要跟踪线程创建情况时。
  • 需要为线程添加特殊的异常处理时。

RejectedExecutionHandler

Java 线程池 ThreadPoolExecutor 在无法处理新提交的任务时(即线程池已满,且工作队列也已满),会使用 RejectedExecutionHandler 定义的拒绝策略进行处理。Java提供了四种内置的拒绝策略,都在ThreadPoolExecutor.AbortPolicy等静态类中:

  • AbortPolicy(中止策略):

    • 默认策略。直接抛出 RejectedExecutionException 运行时异常,调用者线程可以捕获这个异常,并进行相应处理。
    • 适用场景:
      • 关键任务场景: 当系统负载已满,且新任务非常重要,不允许被静默丢弃时。
      • 快速失败: 需要立即感知到系统容量已满,并及时向上游反馈或记录日志。
  • CallerRunsPolicy:由提交任务的线程(调用者)自己来执行这个任务。

    • 不抛弃任务,而是将任务回退到调用该线程池的线程中执行。例如,在 main 线程提交任务,如果线程池满了,该任务就在 main 线程中执行。
    • 适用场景:
      • 减缓生产速度: 当上游任务的生产者线程开始执行自己的任务时,它会被阻塞或减慢提交速度,从而实现了天然的“反压”(Backpressure)机制,平衡了系统的负载。
      • 保障任务不丢失: 确保每一个任务都会被执行,但会增加调用者线程的负担。
  • DiscardPolicy(丢弃策略):

    • 默默地丢弃新提交的任务,不做任何处理,也不抛出异常。
    • 适用场景:
      • 非关键任务:适用于那些不重要、丢失了也无妨的日志记录、统计信息收集等任务。
      • 允许数据丢失的监控系统:在高并发场景下,少量数据丢失可以接受,以保障核心系统的稳定性。
  • DiscardOldestPolicy(丢弃最老策略)

    • 丢弃工作队列中等待时间最长的那个任务(队列头部的任务),然后尝试重新提交当前新任务。如果重新提交失败(例如队列还是满的或线程池已关闭),则继续尝试或失败。
    • 适用场景:
      • 时效性任务: 适用于需要处理最新数据、而旧数据价值不大的场景(例如实时行情数据、传感器最新读数)。
      • 保持队列最新: 确保队列中保存的任务具有较高的时效性。

如果需要自定义丢弃策略可以实现 RejectedExecutionHandler 接口,创建自定义的拒绝逻辑:

1
2
3
4
5
6
7
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 在这里实现自定义逻辑,例如:记录日志、将任务保存到数据库或消息队列,稍后重试或发送邮件或告警通知
System.out.println("Task Rejected: " + r.toString());
}
}

注意:在选择拒绝策略时,需要权衡任务的重要性、系统的稳定性和性能。

自定义 ThreadPoolExecutor 实现

  • 自定义WorkQueue - 前面基于 LinkedTransferQueue 实现有界队列
  • 自定义ThreadFactory - 前面实现的 DefaultThreadFactory
  • 自定义RejectedExecutionHandler - 前面的示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class DefaultThreadPoolExecutor extends ThreadPoolExecutor {

private static final int MAXIMUM_POOL_SIZE = 200;
private static final int KEEP_ALIVE_TIME = 60 * 1000;
private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() + 1;

// 正在处理的任务数
@Getter
protected final AtomicInteger submittedTasksCount = new AtomicInteger(0);

// 最大允许同时处理的任务数
@Getter
protected final int maxTask;

// 其他构造方法省略

public DefaultThreadPoolExecutor(int corePoolSize, int maximumPoolSize, int queueCapacity,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
((WorkerQueue) getQueue()).setExecutor(this);
this.maxTask = maximumPoolSize + queueCapacity;
}

@Override
public void execute(@Nonnull Runnable command) {
int count = submittedTasksCount.incrementAndGet();
// 超过最大的并发任务限制,进行 reject
// 依赖的LinkedTransferQueue没有长度限制,因此这里进行控制
if (count > maxTask) {
submittedTasksCount.decrementAndGet();
getRejectedExecutionHandler().rejectedExecution(command, this);
}

try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (!((WorkerQueue) getQueue()).force(command)) {
submittedTasksCount.decrementAndGet();
getRejectedExecutionHandler().rejectedExecution(command, this);
}
}
}

protected void afterExecute(Runnable r, Throwable t) {
submittedTasksCount.decrementAndGet();
}
}

使用:

1
2
3
4
5
6
7
8
9
10
11
DefaultThreadPoolExecutor executor = new DefaultThreadPoolExecutor(
10,
30,
60,
60,
TimeUnit.SECONDS,
new WorkerQueue(),
new DefaultThreadFactory("test"),
new CustomRejectedExecutionHandler());
executor.execute(()-> System.out.println("test DefaultThreadPoolExecutor execute"));
executor.submit(()-> System.out.println("test DefaultThreadPoolExecutor submit"));

ThreadPoolExecutor 运行时原理

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。

  • 任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:
    • 直接申请线程执行该任务;
    • 缓冲到队列中等待线程执行;
    • 拒绝该任务。
  • 线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

线程池生命周期

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行由内部维护。线程池内部使用一个 32 位的整数维护两个值:运行状态 runState 和线程数量 workerCount 两个参数维护在一起,其中高 3 位用于存放线程池状态,低 29 位表示线程数 CAPACITY 。用一个 AtomicInteger 变量存储来个值,可避免数据不一致问题,避免了锁的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ThreadPoolExecutor extends AbstractExecutorService {
// 状态 RUNNING 线程数 = 0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 这里 COUNT_BITS 设置为 29(32-3),意味着前三位用于存放线程状态,后29位用于存放线程数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 最大容量(2^29 - 1 = 536870911)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 线程运行状态,总共有5个状态,需要3位来表示(所以偏移量的29 = 32 - 3)
/**
* RUNNING : 接受新任务并且处理已经进入阻塞队列的任务
* SHUTDOWN : 不接受新任务,但是处理已经进入阻塞队列的任务
* STOP : 不接受新任务,不处理已经进入阻塞队列的任务并且中断正在运行的任务
* TIDYING : 所有的任务都已经终止,workerCount为0, 线程转化为TIDYING状态并且调用terminated钩子函数
* TERMINATED: terminated钩子函数已经运行完成
**/
//111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 将CAPACITY取非后和c进行取与运算,可以得到高3位的值,即线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }

// 将c和CAPACITY取与运算,可以得到低29位的值,即线程池的个数
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
}

由源码可以看到,Java 线程池(ThreadPoolExecutor)有五种状态,由一个名为 ctl 的 AtomicInteger 变量进行管理:
RUNNING(运行中)

  • 状态描述: 线程池的初始状态,也是正常工作状态。
  • 行为: 接受新提交的任务,并处理任务队列中的任务。
  • 状态切换: 线程池创建后即处于 RUNNING 状态。

SHUTDOWN(关闭)

  • 状态描述: 线程池进入温和的关闭状态。
  • 行为: 不再接受新的任务,但会继续处理任务队列中已有的任务,直到所有任务都执行完毕。
  • 状态切换: 调用 shutdown() 方法会使线程池进入 SHUTDOWN 状态。

STOP(停止)

  • 状态描述: 线程池进入强制关闭状态。
  • 行为: 不再接受新任务,不处理任务队列中剩余的任务,并且会中断所有正在执行的任务。
  • 状态切换: 调用 shutdownNow() 方法会使线程池进入 STOP 状态。

TIDYING(整理)

  • 状态描述: 线程池处于资源清理阶段,所有任务都已终止,工作线程数也已变为0。
  • 行为: 在该状态下,会执行 terminated() 钩子方法,进行最后的清理工作。
  • 状态切换:
    • 从 SHUTDOWN 状态进入:当任务队列为空且线程池中的工作线程数为0时。
    • 从 STOP 状态进入:当线程池中的工作线程数为0时。

TERMINATED(终止)

  • 状态描述: 线程池已彻底终止。
  • 行为: 线程池中的所有任务都已处理完毕,所有工作线程都已退出,terminated() 方法也已执行完成。
  • 状态切换: 从 TIDYING 状态进入。

线程池提交任务流程

上图中,当一个任务通过execute()或submit()方法提交给ThreadPoolExecutor时,它会遵循以下步骤:

  • 检查核心线程数:如果当前运行的线程数少于corePoolSize,线程池会立刻创建一个新的核心线程来执行任务。
  • 进入任务队列:如果当前运行的线程数已达到corePoolSize,任务会被放入workQueue中排队等待。
  • 创建非核心线程:如果队列已满,但当前运行的线程数小于maximumPoolSize,线程池会创建新的非核心线程来处理队列中的任务。
  • 执行拒绝策略:如果线程数已达到maximumPoolSize且队列已满,线程池会根据预设的handler(拒绝策略)来处理这个新任务。
  • 线程复用:当一个工作线程执行完任务后,它不会被销毁,而是会从任务队列中循环获取下一个任务并执行。如果这个线程是非核心线程,并且空闲时间超过keepAliveTime,它才会被终止。

线程池调优

配置线程池的线程数没有一成不变的规则,它主要取决于任务类型(是CPU密集型还是I/O密集型)和服务器的硬件资源(如CPU核心数)
核心考量因素

  • 任务类型:CPU密集型 vs I/O密集型
    • CPU密集型任务:这类任务需要大量CPU计算,比如复杂的数学运算、图像处理等。它们会持续占用CPU,如果线程数远大于CPU核心数,会造成频繁的线程上下文切换,反而降低性能。
    • I/O密集型任务:这类任务需要频繁地进行磁盘读写、网络通信等I/O操作。当一个线程在等待I/O时,它会释放CPU,让其他线程有机会执行。因此,可以配置较多的线程数以充分利用CPU的空闲时间。
  • 服务器硬件资源:CPU核心数可以通过 Runtime.getRuntime().availableProcessors() 方法获取当前服务器的CPU核心数。这是配置线程池大小的重要参考值。

线程池大小配置策略
CPU密集型任务:对于CPU密集型任务,线程池的最佳大小应该等于或略大于CPU核心数。

  • 推荐大小:CPU核心数 + 1
  • 原因:多出的一个线程可以防止因偶尔的页缺失或其它系统中断而导致的CPU空闲,确保CPU始终处于忙碌状态。

I/O密集型任务:对于I/O密集型任务,线程池可以配置得更大一些,因为线程在等待I/O操作时不会消耗CPU。

  • 推荐公式:CPU核心数 * (1 + 任务等待时间 / 任务计算时间)
  • 计算等待时间与计算时间比率(Blocking Coefficient):这个比率需要通过性能测试和监控来估算。例如,如果任务的等待时间是计算时间的5倍,那么比率就是5,如果难以获取精确比率,一个经验值是 2 * CPU核心数
  • 注意事项:过多的线程数也会导致内存消耗和上下文切换增加,因此需要进行基准测试和监控来找到最佳平衡点。

混合型任务:如果线程池中包含CPU密集型和I/O密集型两种任务,最佳实践是:

  • 拆分线程池:为不同类型的任务创建独立的线程池,然后根据各自的特性进行配置。
  • 优点:
    • 避免互相影响:I/O密集型任务的阻塞不会影响CPU密集型任务的执行。
    • 便于调优:可以针对性地调整不同线程池的参数。

如果需要动态调整ThreadPoolExecutor参数,可以通过对应参数的 set 方法进行操作,set相关的方法是线程安全的,如setMaximumPoolSize(int maximumPoolSize)。但需要注意的是 LinkedBlockingQueue 的容量通常无法动态修改,并且动态参数调整可能会影响正在执行的任务。

Tomcat线程池

Tomcat 的整体架构包含连接器和容器两大部分,其中连接器负责与外部通信,容器负责内部逻辑处理。在连接器中:

  • 使用 ProtocolHandler 接口来封装I/O模型和应用层协议的差异,其中I/O模型可以选择非阻塞I/O、异步I/O或APR,应用层协议可以选择HTTP、HTTPS或AJP。ProtocolHandler将I/O模型和应用层协议进行组合,让EndPoint只负责字节流的收发,Processor负责将字节流解析为Tomcat Request/Response对象,实现功能模块的高内聚和低耦合,ProtocolHandler接口继承关系如下图示。
  • 通过适配器 Adapter 将Tomcat Request对象转换为标准的ServletRequest对象。

在Tomcat中,通过AbstractEndpoint类提供底层的网络I/O的处理,若用户没有配置自定义公共线程池,则AbstractEndpoint通过createExecutor方法来创建Tomcat默认线程池。其中,TaskQueue、ThreadPoolExecutor分别为Tomcat自定义任务队列、线程池实现。

1
2
3
4
5
6
7
8
9
10
11
12
public void createExecutor() {
internalExecutor = true;
if (getUseVirtualThreads()) {
executor = new VirtualThreadExecutor(getName() + "-virt-");
} else {
TaskQueue taskqueue = new TaskQueue(maxQueueSize);
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), getThreadsMaxIdleTime(),
TimeUnit.MILLISECONDS, taskqueue, tf);
taskqueue.setParent((ThreadPoolExecutor) executor);
}
}

Tomcat自定义线程池继承于java.util.concurrent. ThreadPoolExecutor,并新增了一些成员变量来更高效地统计已经提交但尚未完成的任务数量(submittedCount),包括已经在队列中的任务和已经交给工作线程但还未开始执行的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0) {
throw new IllegalArgumentException();
}
if (workQueue == null || threadFactory == null || handler == null) {
throw new NullPointerException();
}

this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;

prestartAllCoreThreads();
}

Tomcat在自定义线程池ThreadPoolExecutor中重写了execute()方法,并实现对提交执行的任务进行submittedCount加一。Tomcat在自定义ThreadPoolExecutor中,当线程池抛出RejectedExecutionException异常后,会调用force()方法再次向TaskQueue中进行添加任务的尝试。如果添加失败,则submittedCount减一后,再抛出RejectedExecutionException。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void execute(Runnable command) {
submittedCount.incrementAndGet();
try {
executeInternal(command);
} catch (RejectedExecutionException rx) {
if (getQueue() instanceof TaskQueue) {
// If the Executor is close to maximum pool size, concurrent
// calls to execute() may result (due to Tomcat's use of
// TaskQueue) in some tasks being rejected rather than queued.
// If this happens, add them to the queue.
final TaskQueue queue = (TaskQueue) getQueue();
if (!queue.force(command)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}

在Tomcat中重新定义了一个阻塞队列TaskQueue,它继承于LinkedBlockingQueue。在Tomcat中,核心线程数默认值为10,最大线程数默认为200,为了避免线程到达核心线程数后后续任务放入队列等待,Tomcat通过自定义任务队列TaskQueue重写offer方法实现了核心线程池数达到配置数后线程的创建。具体地,从线程池任务调度机制实现可知,当offer方法返回false时,线程池将尝试创建新新线程,从而实现任务的快速响应。TaskQueue核心实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class TaskQueue extends LinkedBlockingQueue<Runnable> {

private transient volatile ThreadPoolExecutor parent = null;
public TaskQueue() {
super();
}

public TaskQueue(int capacity) {
super(capacity);
}

public TaskQueue(Collection<? extends Runnable> c) {
super(c);
}

public void setParent(ThreadPoolExecutor tp) {
parent = tp;
}

/**
* Used to add a task to the queue if the task has been rejected by the Executor.
*
* @param o The task to add to the queue
*
* @return {@code true} if the task was added to the queue,
* otherwise {@code false}
*/
public boolean force(Runnable o) {
if (parent == null || parent.isShutdown()) {
throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
}
//forces the item onto the queue, to be used if the task is rejected
return super.offer(o);
}

@Override
public boolean offer(Runnable o) {
//we can't do any checks
//forces the item onto the queue, to be used if the task is rejected
// 1. parent为线程池,Tomcat中为自定义线程池实例
if (parent==null) {
return super.offer(o);
}
// 2. 当线程数达到最大线程数时,新提交任务入队
//we are maxed out on threads, simply queue the object
if (parent.getPoolSizeNoLock() == parent.getMaximumPoolSize()) {
return super.offer(o);
}
// 3. 当提交的任务数小于线程池中已有的线程数时,即有空闲线程,任务入队即可
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount() <= parent.getPoolSizeNoLock()) {
return super.offer(o);
}
// 4. 【关键点】如果当前线程数量未达到最大线程数,直接返回false,让线程池创建新线程
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSizeNoLock() < parent.getMaximumPoolSize()) {
return false;
}
// 5. 最后的兜底,放入队列
//if we reached here, we need to add it to the queue
return super.offer(o);
}

@Override
public Runnable poll(long timeout, TimeUnit unit)
throws InterruptedException {
Runnable runnable = super.poll(timeout, unit);
if (runnable == null && parent != null) {
// the poll timed out, it gives an opportunity to stop the current
// thread if needed to avoid memory leaks.
parent.stopCurrentThreadIfNeeded();
}
return runnable;
}

@Override
public Runnable take() throws InterruptedException {
if (parent != null && parent.currentThreadShouldBeStopped()) {
return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
}
return super.take();
}
}

自定义设计实现一个线程池

一个线程池的核心是一个生产者-消费者模型:

  • 生产者(任务提交者):向线程池提交任务。
  • 缓冲区(任务队列):存储待执行的任务,使用阻塞队列实现线程安全。
  • 消费者(工作线程):不断从任务队列中取出任务并执行。

实现思路

  • 生产者-消费者模型:execute 方法是生产者,将 Runnable 任务放入 BlockingQueue(任务队列)。WorkerThread 是消费者,从队列中取出任务并执行。

  • BlockingQueue 的选择:使用 BlockingQueue(如 LinkedBlockingQueue)来处理线程安全问题。它天生支持阻塞式地“放入”(put)和“取出”(take/poll)。

  • Worker 线程的生命周期:WorkerThread 的 run() 方法是一个核心循环。

    • 通过调用 getTask() 来获取任务。
    • getTask() 必须使用带超时的 poll(timeout),而不是 take()。如果使用 take(),当调用 shutdown()(优雅关闭)时,如果队列为空,工作线程将永远阻塞在 take(),导致线程池无法终止。使用 poll(timeout) 允许线程在超时后醒来,重新检查 state 变量,从而在 state == SHUTDOWN 且队列为空时安全退出。
  • volatile 状态:使用 volatile int state 来管理线程池的状态(RUNNING, SHUTDOWN, STOP)。volatile 保证了当一个线程(例如调用 shutdown() 的主线程)修改 state 时,所有其他工作线程都能立即看到这个变化。

  • shutdown() vs shutdownNow():

    • shutdown():将状态设为 SHUTDOWN。getTask() 方法检测到此状态后,会继续处理队列中剩余的任务,但一旦队列为空,getTask() 将返回 null,导致 WorkerThread 退出。
    • shutdownNow():将状态设为 STOP,并调用所有 worker.interrupt()。正在 poll() 或 sleep() 的线程会抛出 InterruptedException。getTask() 捕获此异常后返回 null,导致线程立即退出。

一个简单的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Task implements Runnable {
private LinkedBlockingQueue taskQueue = new LinkedBlockingQueue();
private AtomicBoolean running = new AtomicBoolean(true);

public void submitTask(Object task) throws InterruptedException {
taskQueue.put(task);
}

@Override
public void run() {
while(running.get()) {
try {
// 如果没有任务,会使线程阻塞,一旦有任务,会被唤醒
Object task = taskQueue.take();
doSomething(task);
} catch (Throwable e) {
e.printStackTrace();
}
}
}

public void shutdown() {
if(running.compareAndSet(true, false)) {
System.out.println(Thread.currentThread() + " is stoped");
}
}

private void doSomething(Object task) {
}
}

java线程池实现
http://example.com/2025/07/13/java-juc-线程池原理/
作者
ares
发布于
2025年7月13日
许可协议