ThreadPoolExecutor 神秘地拒绝可运行对象
ThreadPoolExecutor mysteriously rejecting runnables
鉴于以下单元测试,有人可以向我解释为什么 ThreadPoolExecutor 在某些时候拒绝任务吗?
@Test
public void testRejectionBehavior() throws Exception {
final AtomicLong count = new AtomicLong(0);
final AtomicInteger activeThreads = new AtomicInteger(0);
for (;;) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(20, 20,
0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
int prestarted = pool.prestartAllCoreThreads();
pool.allowCoreThreadTimeOut(false);
System.out.println("Prestarted #" + prestarted);
for (int i = 0; i < 100; i++) {
final int thisTasksActive = activeThreads.incrementAndGet();
pool.execute(new Runnable() {
@Override
public void run() {
long value = count.incrementAndGet();
if (value % 50 == 0) {
System.out.println("Execution #" + value + " / active: " + thisTasksActive);
}
if (Thread.currentThread().getName().equals("main")) {
throw new IllegalStateException("Execution #" + value + " / active: " + thisTasksActive);
}
activeThreads.decrementAndGet();
}
});
Thread.sleep(5);
}
}
}
我的输出如下所示:
....
Execution #200 / active: 1
Prestarted #20
java.lang.IllegalStateException: Execution #201 / active: 1 / pool stats: java.util.concurrent.ThreadPoolExecutor@156643d4[Running, pool size = 20, active threads = 20, queued tasks = 0, completed tasks = 0]
如您所见,它执行了大约 200 次,然后突然拒绝了新迭代的第一个任务。
您没有为执行程序提供适当的队列来存储任务。SynchronousQueue
没有容量,甚至没有 1。您填充了线程池,然后您的下一个任务必须 运行 在主线程上,这是这种情况下的正常行为。
SynchronousQueue
是一个奇怪的野兽,我唯一一次看到它在 SO 上的代码中使用是在执行程序中。在 "why does my code act weird" 这样的问题中。您是怎么想出在这里使用 SynchronousQueue
的?
好的,在深入研究 ThreadPoolExecutor
之后发现,在创建 ThreadPoolExecutor
时使用给定的参数并不能立即执行任务。
即使您调用 pool.prestartAllCoreThreads();
,实际上也存在竞争条件。你看,prestartAllCoreThreads()
创建了实现 Runnable
接口的新 ThreadPoolExecutor.Worker
实例。在实例化它们时,它们将内部状态设置为 -1,使它们在 ThreadPoolExecutor
的 toString()
输出中显示为 "active threads"。现在还在它们的构造函数中,Worker 实例创建了一个新线程并将自己设置为该线程的 Runnable
。直到它们的 run()
方法被新启动的线程实际调用时,它们才将它们的状态设置为可用于执行任务并随后调用 workQueue.take()
方法。
简而言之,当您有一个带有同步队列的 ThreadPooExecutor
并预启动所有线程时,这些线程可能需要一段时间才能真正启动并阻塞在 queue.take()
状态。直到那时你才可以提交任务并且不会被拒绝执行。
鉴于以下单元测试,有人可以向我解释为什么 ThreadPoolExecutor 在某些时候拒绝任务吗?
@Test
public void testRejectionBehavior() throws Exception {
final AtomicLong count = new AtomicLong(0);
final AtomicInteger activeThreads = new AtomicInteger(0);
for (;;) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(20, 20,
0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
int prestarted = pool.prestartAllCoreThreads();
pool.allowCoreThreadTimeOut(false);
System.out.println("Prestarted #" + prestarted);
for (int i = 0; i < 100; i++) {
final int thisTasksActive = activeThreads.incrementAndGet();
pool.execute(new Runnable() {
@Override
public void run() {
long value = count.incrementAndGet();
if (value % 50 == 0) {
System.out.println("Execution #" + value + " / active: " + thisTasksActive);
}
if (Thread.currentThread().getName().equals("main")) {
throw new IllegalStateException("Execution #" + value + " / active: " + thisTasksActive);
}
activeThreads.decrementAndGet();
}
});
Thread.sleep(5);
}
}
}
我的输出如下所示:
....
Execution #200 / active: 1
Prestarted #20
java.lang.IllegalStateException: Execution #201 / active: 1 / pool stats: java.util.concurrent.ThreadPoolExecutor@156643d4[Running, pool size = 20, active threads = 20, queued tasks = 0, completed tasks = 0]
如您所见,它执行了大约 200 次,然后突然拒绝了新迭代的第一个任务。
您没有为执行程序提供适当的队列来存储任务。SynchronousQueue
没有容量,甚至没有 1。您填充了线程池,然后您的下一个任务必须 运行 在主线程上,这是这种情况下的正常行为。
SynchronousQueue
是一个奇怪的野兽,我唯一一次看到它在 SO 上的代码中使用是在执行程序中。在 "why does my code act weird" 这样的问题中。您是怎么想出在这里使用 SynchronousQueue
的?
好的,在深入研究 ThreadPoolExecutor
之后发现,在创建 ThreadPoolExecutor
时使用给定的参数并不能立即执行任务。
即使您调用 pool.prestartAllCoreThreads();
,实际上也存在竞争条件。你看,prestartAllCoreThreads()
创建了实现 Runnable
接口的新 ThreadPoolExecutor.Worker
实例。在实例化它们时,它们将内部状态设置为 -1,使它们在 ThreadPoolExecutor
的 toString()
输出中显示为 "active threads"。现在还在它们的构造函数中,Worker 实例创建了一个新线程并将自己设置为该线程的 Runnable
。直到它们的 run()
方法被新启动的线程实际调用时,它们才将它们的状态设置为可用于执行任务并随后调用 workQueue.take()
方法。
简而言之,当您有一个带有同步队列的 ThreadPooExecutor
并预启动所有线程时,这些线程可能需要一段时间才能真正启动并阻塞在 queue.take()
状态。直到那时你才可以提交任务并且不会被拒绝执行。