Python 运行 多个程序并行

Python run multiple programs in parallel

假设我有数千个传感器读数被添加到 Redis 或 Apache Kafka 等队列中,并且 foreach sensor_id 让其自己的工作人员根据历史读数进行 运行s 计算。每个 sensor_id 都会发布到一个新主题,然后开始流式阅读。

每个工作人员都将从读取数据库开始,以实例化它的一些变量阈值。除了 sensor_id 和使用 sensor_id 作为查找键检索的变量阈值外,每个工作人员的代码都是相同的。一旦 sensor_id 添加到队列中,它就会被分配一个永远 运行 的工作人员。

这样做会不会更有效率:

  1. multiprocessing.Pool 无限期地启动成千上万的 运行 工人?
  2. 或“python script.py param1 param2 &” 数千次,只是改变参数来实例化? (我会以编程方式生成一个 bash 脚本以包含所有这些逻辑 fyi)

我希望所有工作人员尽快运行并行,这些是CPU绑定任务,而不是I/O绑定。启动所有工作程序的最佳方式是什么?

澄清:传感器读数将每秒生成一次 sensor_id 所以对于 3000 个传感器,每秒生成 3000 个事件,3000 个主题在队列。有效负载示例是 JSON {sensor_id: "hash", temperature: 85, rpm: 1200, ...}。我理想的设置是每个工作人员在内存中维护最后 200 个左右的读数以进行 运行 计算。另一种方法是一个中央队列,循环工作人员必须首先建立数据库连接才能读取 sensor_id 它从队列中弹出的 200 个读数,但这需要时间。

我的理解是Redis列表比Publish/Subscribe更可靠,更适合单条消息需要被多个消费者消费的情况。如果所有传感器都写入同一个列表,那么您的应用程序也会大大简化,然后您可以拥有一个由相同工作人员组成的处理池,循环读取该列表。该消息自然会识别涉及哪个传感器,并且当工作人员读取消息以获取之前看到的新传感器 ID 时,它必须通过从数据库中读取相关信息来对该传感器进行“首次”初始化,然后将其保存在由传感器 ID 键入的字典中。最终,这本词典最终可能有 3,000 个条目。 由此得出的结论是,池应该使用包含所有 3,000 个条目的字典进行一次初始化,所有工作人员甚至在开始阅读消息之前都可以访问这些条目。

但是,如果出于某种原因所有 3,000 个传感器必须写入 3,000 个不同的 Redis 列表(如果您 使用 Redis 列表开始)。然后,我们的想法是找到一种方法,以某种方式从 3,000 个列表中“同时”读取,在消息可用时检索消息,并将它们写入工作人员可以从中读取的单个队列,从而简化工作人员逻辑。此代码基于 12.13。 Python 食谱第 3 版 中的轮询多线程队列 可用 here。这已经适用于轮询 3,000 Redis 个列表并将读取的项目发送到单个 multiprocessing.Queue 实例。此代码还包括一个由 producer 工作人员组成的处理池,这些工作人员模拟传感器读数的创建,这些读数被添加到 3,000 个列表中的一个列表中,您在此处的实际代码中不会有这些读数。但是 3,000 PollableList 个实例必须可供负责获取传感器读数的任何代码访问。我们这里也只有一个 Process 从消息队列(对象)中读取并打印它们,以保持打印“有序”。实际上,正如我所提到的,您会有一个进程池。

不幸的是,Cookbook 中描述的技术似乎对传递给 select 的列表的大小有限制,因此被迫将列表大小限制为 500 以确保安全这意味着我必须将 3,000 个 Redis 列表分成 6 组,每组 500 个列表。

import redis
import socket
import os
import json
import select
from multiprocessing import Process, Queue
from multiprocessing.pool import ThreadPool
from functools import partial

class PollableList():
    r = redis.Redis()

    def __init__(self, idx):
        self.idx = idx
        self.list_name = f'sensor_{idx}'
        # Create a pair of connected sockets
        if os.name == 'posix':
            self._putsocket, self._getsocket = socket.socketpair()
        else:
            # Compatibility on non-POSIX systems
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.bind(('127.0.0.1', 0))
            server.listen(1)
            self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self._putsocket.connect(server.getsockname())
            self._getsocket, _ = server.accept()
            server.close()

    def fileno(self):
        return self._getsocket.fileno()

    def put(self, item):
        PollableList.r.rpush(self.list_name, json.dumps(item))
        self._putsocket.send(b'x')

    def get(self):
        self._getsocket.recv(1)
        return json.loads(PollableList.r.lpop(self.list_name).decode())

def producer(lists):
    """ emulate the 3000 sensors """
    # Feed data to the lists (we won't run indefinitely)
    for _ in range(3):
        for lst in lists:
            lst.put({'id': lst.idx, 'value': lst.idx * 100 + 1})

def consumer(q, list_names):
    '''
    Consumer that reads data on multiple lists simultaneously
    '''
    while True:
        can_read, _, _ = select.select(list_names, [], [])
        for r in can_read:
            item = r.get()
            q.put(item)

# in actual use case, there would be a pool of workers:
def worker(q):
    message_number = 0
    while True:
        item = q.get()
        message_number += 1
        print(message_number, item)


def main():
    lists = [PollableList(i) for i in range(0, 3000)]
    # select cannot handle all 3000 sockets at once:
    lists1 = lists[0:500]
    lists2 = lists[500:1000]
    lists3 = lists[1000:1500]
    lists4 = lists[1500:2000]
    lists5 = lists[2000:2500]
    lists6 = lists[2500:3000]

    p0 = Process(target=producer, args=(lists,))
    p0.daemon = True
    p0.start()

    q = Queue()
    thread_pool = ThreadPool(6)
    thread_pool.map_async(partial(consumer, q), [lists1, lists2, lists3, lists4, lists5, lists6])

    # This would in reality be a process pool of workers reading from q:
    p1 = Process(target=worker, args=(q,))
    p1.daemon = True
    p1.start()

    # wait for all 9000 messages to be displayed by worker:
    input('Hit enter to terminate...\n')

# required for Windows:
if __name__ == '__main__':
    main()