在多个不同的工作人员之间共享排队的工作负载
Share queued workload over multiple distinct workers
关于
我有一个 class DataRetriever
需要用 API 凭据实例化。我有五组不同的 API 凭据,因此我想实例化 DataRetriever
的五个实例。 DataRetriever
只有一个 public 方法 retrieve
,顾名思义,它将根据传递给该方法的 id
使用 subprocess
检索一些数据。
- 给定的 API 凭据不能同时打开多个流(具有任何 ID)
- a
DataRetriever
最多只能有一个到 API 的连接,因此不能在仍在检索流的 DataRetriever
实例上调用 DataRetriever#retrieve(id)
数据
- 数据量各不相同,因此子进程退出的时间可以是几秒到几分钟之间的任何时间
当前方法
我正在使用 queue
,如示例片段所示。我用需要检索的所有 id
数据流填充队列。
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
问题
我总是可以使用观察者模式,但我想知道是否有 Python 方法来做这样的事情?
- 如何确保上面代码片段中的
worker
将排队的工作负载分配给仅空闲的 DataRetriever
s,同时无缝地使用 DataRetriever
的所有五个实例?
- 在研究过程中,我发现
ProcessPoolExecutor
无法使这些示例适应我的场景。这可能是解决方案吗?
您可以执行以下操作:
def worker(q_request, q_response, api_cred):
dr = DataRetriever(api_cred)
while True:
stream_id = q_request.get() # that's blocking unless q.get(False)
if stream_id == "stop":
sys.exit(0)
dr.retrieve(stream_id) # that can take some time (assume blocking)
q_response.put(stream_id) # signal job has ended to parent process
api_cred = [cred1, cred2, cred3, cred4, cred5]
q_request, q_response = queue.Queue(), queue.Queue()
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(q_request, q_response, api_cred[i]))
t.start()
threads.append(t)
for item in source():
q_request.put(item)
print("Stream ID %s was successfully retrieved." %q_response.get())
这假设 dr.retrieve(stream_id)
正在阻塞,或者您有某种方式知道由 dr.retrieve(stream_id)
启动的子进程尚未完成,因此您的工作人员将阻塞直到它完成(否则实现DataRetriever
的必须更改)。
q.get()
默认情况下是阻塞的,因此您的 worker
进程将与其他进程一起等待对象来接收它。 Queue()
对象也是 FIFO,因此您可以确定工作将在 worker
个进程之间平均分配。
关于
我有一个 class DataRetriever
需要用 API 凭据实例化。我有五组不同的 API 凭据,因此我想实例化 DataRetriever
的五个实例。 DataRetriever
只有一个 public 方法 retrieve
,顾名思义,它将根据传递给该方法的 id
使用 subprocess
检索一些数据。
- 给定的 API 凭据不能同时打开多个流(具有任何 ID)
- a
DataRetriever
最多只能有一个到 API 的连接,因此不能在仍在检索流的DataRetriever
实例上调用DataRetriever#retrieve(id)
数据 - 数据量各不相同,因此子进程退出的时间可以是几秒到几分钟之间的任何时间
当前方法
我正在使用 queue
,如示例片段所示。我用需要检索的所有 id
数据流填充队列。
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
问题
我总是可以使用观察者模式,但我想知道是否有 Python 方法来做这样的事情?
- 如何确保上面代码片段中的
worker
将排队的工作负载分配给仅空闲的DataRetriever
s,同时无缝地使用DataRetriever
的所有五个实例? - 在研究过程中,我发现
ProcessPoolExecutor
无法使这些示例适应我的场景。这可能是解决方案吗?
您可以执行以下操作:
def worker(q_request, q_response, api_cred):
dr = DataRetriever(api_cred)
while True:
stream_id = q_request.get() # that's blocking unless q.get(False)
if stream_id == "stop":
sys.exit(0)
dr.retrieve(stream_id) # that can take some time (assume blocking)
q_response.put(stream_id) # signal job has ended to parent process
api_cred = [cred1, cred2, cred3, cred4, cred5]
q_request, q_response = queue.Queue(), queue.Queue()
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(q_request, q_response, api_cred[i]))
t.start()
threads.append(t)
for item in source():
q_request.put(item)
print("Stream ID %s was successfully retrieved." %q_response.get())
这假设 dr.retrieve(stream_id)
正在阻塞,或者您有某种方式知道由 dr.retrieve(stream_id)
启动的子进程尚未完成,因此您的工作人员将阻塞直到它完成(否则实现DataRetriever
的必须更改)。
q.get()
默认情况下是阻塞的,因此您的 worker
进程将与其他进程一起等待对象来接收它。 Queue()
对象也是 FIFO,因此您可以确定工作将在 worker
个进程之间平均分配。