如何等待尚未提交到线程池的 Future?

How to wait for a Future who hasn't been submitted to a thread pool yet?

我想创建一个 "TaskPoolManager" 在 ThreadPoolExecutor 中启动 "Task"(自定义对象)并根据重要性级别、自提交以来的时间等对它们进行优先排序。(这些是任务的属性)

我的问题是当 ThreadPoolExecutor 已满时,提交到池中的其他任务将在 "FIFO" 中执行并且没有优先级。

这里是TaskPoolManager class:


class TaskPoolManager:
    def __init__(self, max_workers: int = None):
        self.max_workers = max_workers or (os.cpu_count() or 1) * 5
        self._pool_executor = ThreadPoolExecutor(max_workers=self.max_workers, 
                                                 thread_name_prefix="TaskPoolManager")
        self.pending_task: Dict[Task, Future] = {Task(func=None): Future()}
        self.running_workers = 0

    # Task are callable
    def submit(self, task: Task) -> Future:
        if self.running_workers == self.max_workers:
            return self._add_task_to_queue(task)
        else:
            return self._start_task(task)

    def _start_task(self, task: Task) -> Future:
        """Submit a task in the pool"""
        self.running_workers = self.running_workers + 1
        future = self._pool_executor.submit(task)
        future.add_done_callback(lambda x: self._completed_thread())
        return future

    def _add_task_to_queue(self, task: Task) -> Future:
        """Add task to the not started task queue"""
        not_started_future = Future()
        self.pending_task[task] = not_started_future
        return not_started_future

    def _completed_thread(self):
        """Call when a thread in the pool as terminated a task"""
        self.running_workers = self.running_workers - 1
        self._start_task_in_queue()  # By priority level

这里有一个如何使用它的例子:

manager = TaskPoolManager()

for i in range(0, 10000):
    manager.submit(Task(func=wait_random_time_task))

f = manager.submit(Task(func=wait_random_time_task))

# This isn't submitted to the thread pool yet, but need to be waitable like it is.
f.result()

有没有办法将 Future 实例化的客户端连接到 ThreadPoolExecutor.submit 在稍后执行时创建的 Future 实例?

如果没有,有没有办法 return 一个 Future 类似的对象,以后可以与未来相关联并仍然等待 .result()

换句话说:如何等待尚未提交到线程池的 Future?

终于,没那么复杂了:

def _start_task_in_queue(self):
    try:
        # Algorithm could be more complex than just the first one
        task, returned_future = next(iter(self.pending_task.items()))
    except StopIteration:
        return
    started_future = self._pool_executor.submit(task)
    started_future.add_done_callback(lambda result: returned_future.set_result(result))

我不知道这是否是解决我问题的最佳方法,因为 Future.__init__ 指定:

Initializes the future. Should not be called by clients

但它是这样工作的。