如何等待尚未提交到线程池的 Future?
How to wait for a Future who hasn't been submitted to a thread pool yet?
我想创建一个 "TaskPoolManager" 在 ThreadPoolExecutor
中启动 "Task"(自定义对象)并根据重要性级别、自提交以来的时间等对它们进行优先排序。(这些是任务的属性)
我的问题是当 ThreadPoolExecutor
已满时,提交到池中的其他任务将在 "FIFO" 中执行并且没有优先级。
这里是TaskPoolManager
class:
class TaskPoolManager:
def __init__(self, max_workers: int = None):
self.max_workers = max_workers or (os.cpu_count() or 1) * 5
self._pool_executor = ThreadPoolExecutor(max_workers=self.max_workers,
thread_name_prefix="TaskPoolManager")
self.pending_task: Dict[Task, Future] = {Task(func=None): Future()}
self.running_workers = 0
# Task are callable
def submit(self, task: Task) -> Future:
if self.running_workers == self.max_workers:
return self._add_task_to_queue(task)
else:
return self._start_task(task)
def _start_task(self, task: Task) -> Future:
"""Submit a task in the pool"""
self.running_workers = self.running_workers + 1
future = self._pool_executor.submit(task)
future.add_done_callback(lambda x: self._completed_thread())
return future
def _add_task_to_queue(self, task: Task) -> Future:
"""Add task to the not started task queue"""
not_started_future = Future()
self.pending_task[task] = not_started_future
return not_started_future
def _completed_thread(self):
"""Call when a thread in the pool as terminated a task"""
self.running_workers = self.running_workers - 1
self._start_task_in_queue() # By priority level
这里有一个如何使用它的例子:
manager = TaskPoolManager()
for i in range(0, 10000):
manager.submit(Task(func=wait_random_time_task))
f = manager.submit(Task(func=wait_random_time_task))
# This isn't submitted to the thread pool yet, but need to be waitable like it is.
f.result()
有没有办法将 Future
实例化的客户端连接到 ThreadPoolExecutor.submit
在稍后执行时创建的 Future
实例?
如果没有,有没有办法 return 一个 Future
类似的对象,以后可以与未来相关联并仍然等待 .result()
?
换句话说:如何等待尚未提交到线程池的 Future?
终于,没那么复杂了:
def _start_task_in_queue(self):
try:
# Algorithm could be more complex than just the first one
task, returned_future = next(iter(self.pending_task.items()))
except StopIteration:
return
started_future = self._pool_executor.submit(task)
started_future.add_done_callback(lambda result: returned_future.set_result(result))
我不知道这是否是解决我问题的最佳方法,因为 Future.__init__
指定:
Initializes the future. Should not be called by clients
但它是这样工作的。
我想创建一个 "TaskPoolManager" 在 ThreadPoolExecutor
中启动 "Task"(自定义对象)并根据重要性级别、自提交以来的时间等对它们进行优先排序。(这些是任务的属性)
我的问题是当 ThreadPoolExecutor
已满时,提交到池中的其他任务将在 "FIFO" 中执行并且没有优先级。
这里是TaskPoolManager
class:
class TaskPoolManager:
def __init__(self, max_workers: int = None):
self.max_workers = max_workers or (os.cpu_count() or 1) * 5
self._pool_executor = ThreadPoolExecutor(max_workers=self.max_workers,
thread_name_prefix="TaskPoolManager")
self.pending_task: Dict[Task, Future] = {Task(func=None): Future()}
self.running_workers = 0
# Task are callable
def submit(self, task: Task) -> Future:
if self.running_workers == self.max_workers:
return self._add_task_to_queue(task)
else:
return self._start_task(task)
def _start_task(self, task: Task) -> Future:
"""Submit a task in the pool"""
self.running_workers = self.running_workers + 1
future = self._pool_executor.submit(task)
future.add_done_callback(lambda x: self._completed_thread())
return future
def _add_task_to_queue(self, task: Task) -> Future:
"""Add task to the not started task queue"""
not_started_future = Future()
self.pending_task[task] = not_started_future
return not_started_future
def _completed_thread(self):
"""Call when a thread in the pool as terminated a task"""
self.running_workers = self.running_workers - 1
self._start_task_in_queue() # By priority level
这里有一个如何使用它的例子:
manager = TaskPoolManager()
for i in range(0, 10000):
manager.submit(Task(func=wait_random_time_task))
f = manager.submit(Task(func=wait_random_time_task))
# This isn't submitted to the thread pool yet, but need to be waitable like it is.
f.result()
有没有办法将 Future
实例化的客户端连接到 ThreadPoolExecutor.submit
在稍后执行时创建的 Future
实例?
如果没有,有没有办法 return 一个 Future
类似的对象,以后可以与未来相关联并仍然等待 .result()
?
换句话说:如何等待尚未提交到线程池的 Future?
终于,没那么复杂了:
def _start_task_in_queue(self):
try:
# Algorithm could be more complex than just the first one
task, returned_future = next(iter(self.pending_task.items()))
except StopIteration:
return
started_future = self._pool_executor.submit(task)
started_future.add_done_callback(lambda result: returned_future.set_result(result))
我不知道这是否是解决我问题的最佳方法,因为 Future.__init__
指定:
Initializes the future. Should not be called by clients
但它是这样工作的。