线程池调用阻塞方法
Thread pool calls blocking methods
我实现了一个存储元素的队列,该队列类似于 BlockingQueue. On retrieval, the consumer may specify a Predicate 并针对队列元素进行了测试。队列将采用 FIFO 样式 return 个元素,但会跳过所有不满足此谓词的元素。所以 returned 元素可能不是队列的头部。如果 none 个队列元素满足给定的谓词,take()
-线程会休眠一段时间并重新开始。
有几个线程向这个队列添加元素,现在我需要很多线程来使用该队列中的元素。
添加元素很简单。但是我如何 "connect" 这个队列到一个工作池(最好是一个 ThreadPoolExecutor 具有动态线程管理的)从这个队列中检索元素并做一些工作?
我的队列有两种方法:
boolean add(E e);
E take(); // blocks
队列实现基本上类似于Condition, except that it's unbounded and not backed by an array, but a LinkedHashSet中的示例,它不允许重复并保持插入顺序。
我想到了这个,但我不知道这是否可行。我真的需要这个额外的话题吗?
SynchronousQueue<Runnable> workQ = new SynchronousQueue<>();
ExecutorService threadPool = new ThreadPoolExecutor(10, 100, 30L, TimeUnit.SECONDS, workQ);
new Thread(() -> {
try {
while (true) {
workQ.put(() -> process(queue.take()));
}
} catch (InterruptedException e) {
}
}).start();
这个问题背后的问题是:如何使用 ThreadPoolExecutor in combination with a queue that contains elements that are not Runnable?示例:
BlockingQueue<String> strings = new LinkedBlockingQueue<>();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 30L, TimeUnit.SECONDS, strings);
(我知道在这个例子中线程池不知道如何处理排队的字符串,但我猜它更好地揭示了问题。假设队列包含要检索的 URL。)
回答:
这是不可能的,因为线程池执行程序需要一个可运行队列(没有上限通配符 - 只是普通的可运行)。但是可以将队列更改为工作队列(BlockingQueue<Runnable>
),以便在向队列添加元素时,BooleanSupplier
将指示该元素是否可用。这样,排队的元素不需要是某种类型的,而是可以运行的。添加一个元素可能看起来像这样(E 是 Runnable):
public boolean add(E element, BooleanSupplier availability) {
lock.lock();
try {
if (data.putIfAbsent(element, availability) == null) {
notEmpty.signal();
return true;
}
return false;
} finally {
lock.unlock();
}
}
并且:
String url = "...";
queue.add(() -> wget(url), () -> unlockedStrings.contains(url));
对于 BlockingQueue 合约:
@Override
public boolean add(E element) {
return add(element, () -> Boolean.TRUE);
}
线程池必须由一些工人启动:
ThreadPoolExecutor workerPool = new ThreadPoolExecutor(minWorkers, maxWorkers, 30L, TimeUnit.SECONDS, queue);
workerPool.prestartAllCoreThreads();
我实现了一个存储元素的队列,该队列类似于 BlockingQueue. On retrieval, the consumer may specify a Predicate 并针对队列元素进行了测试。队列将采用 FIFO 样式 return 个元素,但会跳过所有不满足此谓词的元素。所以 returned 元素可能不是队列的头部。如果 none 个队列元素满足给定的谓词,take()
-线程会休眠一段时间并重新开始。
有几个线程向这个队列添加元素,现在我需要很多线程来使用该队列中的元素。
添加元素很简单。但是我如何 "connect" 这个队列到一个工作池(最好是一个 ThreadPoolExecutor 具有动态线程管理的)从这个队列中检索元素并做一些工作?
我的队列有两种方法:
boolean add(E e);
E take(); // blocks
队列实现基本上类似于Condition, except that it's unbounded and not backed by an array, but a LinkedHashSet中的示例,它不允许重复并保持插入顺序。
我想到了这个,但我不知道这是否可行。我真的需要这个额外的话题吗?
SynchronousQueue<Runnable> workQ = new SynchronousQueue<>();
ExecutorService threadPool = new ThreadPoolExecutor(10, 100, 30L, TimeUnit.SECONDS, workQ);
new Thread(() -> {
try {
while (true) {
workQ.put(() -> process(queue.take()));
}
} catch (InterruptedException e) {
}
}).start();
这个问题背后的问题是:如何使用 ThreadPoolExecutor in combination with a queue that contains elements that are not Runnable?示例:
BlockingQueue<String> strings = new LinkedBlockingQueue<>();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 30L, TimeUnit.SECONDS, strings);
(我知道在这个例子中线程池不知道如何处理排队的字符串,但我猜它更好地揭示了问题。假设队列包含要检索的 URL。)
回答:
这是不可能的,因为线程池执行程序需要一个可运行队列(没有上限通配符 - 只是普通的可运行)。但是可以将队列更改为工作队列(BlockingQueue<Runnable>
),以便在向队列添加元素时,BooleanSupplier
将指示该元素是否可用。这样,排队的元素不需要是某种类型的,而是可以运行的。添加一个元素可能看起来像这样(E 是 Runnable):
public boolean add(E element, BooleanSupplier availability) {
lock.lock();
try {
if (data.putIfAbsent(element, availability) == null) {
notEmpty.signal();
return true;
}
return false;
} finally {
lock.unlock();
}
}
并且:
String url = "...";
queue.add(() -> wget(url), () -> unlockedStrings.contains(url));
对于 BlockingQueue 合约:
@Override
public boolean add(E element) {
return add(element, () -> Boolean.TRUE);
}
线程池必须由一些工人启动:
ThreadPoolExecutor workerPool = new ThreadPoolExecutor(minWorkers, maxWorkers, 30L, TimeUnit.SECONDS, queue);
workerPool.prestartAllCoreThreads();