线程池调用阻塞方法

Thread pool calls blocking methods

我实现了一个存储元素的队列,该队列类似于 BlockingQueue. On retrieval, the consumer may specify a Predicate 并针对队列元素进行了测试。队列将采用 FIFO 样式 return 个元素,但会跳过所有不满足此谓词的元素。所以 returned 元素可能不是队列的头部。如果 none 个队列元素满足给定的谓词,take()-线程会休眠一段时间并重新开始。

有几个线程向这个队列添加元素,现在我需要很多线程来使用该队列中的元素。

添加元素很简单。但是我如何 "connect" 这个队列到一个工作池(最好是一个 ThreadPoolExecutor 具有动态线程管理的)从这个队列中检索元素并做一些工作?

我的队列有两种方法:

boolean add(E e);
E take(); // blocks

队列实现基本上类似于Condition, except that it's unbounded and not backed by an array, but a LinkedHashSet中的示例,它不允许重复并保持插入顺序。

我想到了这个,但我不知道这是否可行。我真的需要这个额外的话题吗?

SynchronousQueue<Runnable> workQ = new SynchronousQueue<>();
ExecutorService threadPool = new ThreadPoolExecutor(10, 100, 30L, TimeUnit.SECONDS, workQ);
new Thread(() -> {
    try {
        while (true) {
            workQ.put(() -> process(queue.take()));
        }
    } catch (InterruptedException e) {
    }
}).start();

这个问题背后的问题是:如何使用 ThreadPoolExecutor in combination with a queue that contains elements that are not Runnable?示例:

BlockingQueue<String> strings = new LinkedBlockingQueue<>();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 30L, TimeUnit.SECONDS, strings);

(我知道在这个例子中线程池不知道如何处理排队的字符串,但我猜它更好地揭示了问题。假设队列包含要检索的 URL。)

回答: 这是不可能的,因为线程池执行程序需要一个可运行队列(没有上限通配符 - 只是普通的可运行)。但是可以将队列更改为工作队列(BlockingQueue<Runnable>),以便在向队列添加元素时,BooleanSupplier 将指示该元素是否可用。这样,排队的元素不需要是某种类型的,而是可以运行的。添加一个元素可能看起来像这样(E 是 Runnable):

public boolean add(E element, BooleanSupplier availability) {
    lock.lock();
    try {
        if (data.putIfAbsent(element, availability) == null) {
            notEmpty.signal();
            return true;
        }
        return false;
    } finally {
        lock.unlock();
    }
}

并且:

String url = "...";
queue.add(() -> wget(url), () -> unlockedStrings.contains(url));

对于 BlockingQueue 合约:

@Override
public boolean add(E element) {
    return add(element, () -> Boolean.TRUE);
}

线程池必须由一些工人启动:

ThreadPoolExecutor workerPool = new ThreadPoolExecutor(minWorkers, maxWorkers, 30L, TimeUnit.SECONDS, queue);
workerPool.prestartAllCoreThreads();