在多个不同的工作人员之间共享排队的工作负载

Share queued workload over multiple distinct workers

关于

我有一个 class DataRetriever 需要用 API 凭据实例化。我有五组不同的 API 凭据,因此我想实例化 DataRetriever 的五个实例。 DataRetriever 只有一个 public 方法 retrieve,顾名思义,它将根据传递给该方法的 id 使用 subprocess 检索一些数据。

当前方法

我正在使用 queue,如示例片段所示。我用需要检索的所有 id 数据流填充队列。

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()

q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for item in source():
    q.put(item)

# block until all tasks are done
q.join()

# stop workers
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

问题

我总是可以使用观察者模式,但我想知道是否有 Python 方法来做这样的事情?

您可以执行以下操作:

def worker(q_request, q_response, api_cred):
    dr = DataRetriever(api_cred)
    while True:
        stream_id = q_request.get() # that's blocking unless q.get(False)
        if stream_id == "stop":
            sys.exit(0)
        dr.retrieve(stream_id) # that can take some time (assume blocking)
        q_response.put(stream_id) # signal job has ended to parent process

api_cred = [cred1, cred2, cred3, cred4, cred5]
q_request, q_response = queue.Queue(), queue.Queue()

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(q_request, q_response, api_cred[i]))
    t.start()
    threads.append(t)

for item in source():
    q_request.put(item)
    print("Stream ID %s was successfully retrieved." %q_response.get())

这假设 dr.retrieve(stream_id) 正在阻塞,或者您有某种方式知道由 dr.retrieve(stream_id) 启动的子进程尚未完成,因此您的工作人员将阻塞直到它完成(否则实现DataRetriever 的必须更改)。

q.get() 默认情况下是阻塞的,因此您的 worker 进程将与其他进程一起等待对象来接收它。 Queue() 对象也是 FIFO,因此您可以确定工作将在 worker 个进程之间平均分配。