Python 运行 多个程序并行
Python run multiple programs in parallel
假设我有数千个传感器读数被添加到 Redis 或 Apache Kafka 等队列中,并且 foreach sensor_id 让其自己的工作人员根据历史读数进行 运行s 计算。每个 sensor_id 都会发布到一个新主题,然后开始流式阅读。
每个工作人员都将从读取数据库开始,以实例化它的一些变量阈值。除了 sensor_id 和使用 sensor_id 作为查找键检索的变量阈值外,每个工作人员的代码都是相同的。一旦 sensor_id 添加到队列中,它就会被分配一个永远 运行 的工作人员。
这样做会不会更有效率:
- multiprocessing.Pool 无限期地启动成千上万的 运行 工人?
- 或“python script.py param1 param2 &” 数千次,只是改变参数来实例化? (我会以编程方式生成一个 bash 脚本以包含所有这些逻辑 fyi)
我希望所有工作人员尽快运行并行,这些是CPU绑定任务,而不是I/O绑定。启动所有工作程序的最佳方式是什么?
澄清:传感器读数将每秒生成一次 sensor_id 所以对于 3000 个传感器,每秒生成 3000 个事件,3000 个主题在队列。有效负载示例是 JSON {sensor_id: "hash", temperature: 85, rpm: 1200, ...}。我理想的设置是每个工作人员在内存中维护最后 200 个左右的读数以进行 运行 计算。另一种方法是一个中央队列,循环工作人员必须首先建立数据库连接才能读取 sensor_id 它从队列中弹出的 200 个读数,但这需要时间。
我的理解是Redis
列表比Publish/Subscribe更可靠,更适合单条消息需要被多个消费者消费的情况。如果所有传感器都写入同一个列表,那么您的应用程序也会大大简化,然后您可以拥有一个由相同工作人员组成的处理池,循环读取该列表。该消息自然会识别涉及哪个传感器,并且当工作人员读取消息以获取之前看到的新传感器 ID 时,它必须通过从数据库中读取相关信息来对该传感器进行“首次”初始化,然后将其保存在由传感器 ID 键入的字典中。最终,这本词典最终可能有 3,000 个条目。 由此得出的结论是,池应该使用包含所有 3,000 个条目的字典进行一次初始化,所有工作人员甚至在开始阅读消息之前都可以访问这些条目。
但是,如果出于某种原因所有 3,000 个传感器必须写入 3,000 个不同的 Redis
列表(如果您 是 使用 Redis
列表开始)。然后,我们的想法是找到一种方法,以某种方式从 3,000 个列表中“同时”读取,在消息可用时检索消息,并将它们写入工作人员可以从中读取的单个队列,从而简化工作人员逻辑。此代码基于 12.13。 Python 食谱第 3 版 中的轮询多线程队列 可用 here。这已经适用于轮询 3,000 Redis
个列表并将读取的项目发送到单个 multiprocessing.Queue
实例。此代码还包括一个由 producer
工作人员组成的处理池,这些工作人员模拟传感器读数的创建,这些读数被添加到 3,000 个列表中的一个列表中,您在此处的实际代码中不会有这些读数。但是 3,000 PollableList
个实例必须可供负责获取传感器读数的任何代码访问。我们这里也只有一个 Process
从消息队列(对象)中读取并打印它们,以保持打印“有序”。实际上,正如我所提到的,您会有一个进程池。
不幸的是,Cookbook 中描述的技术似乎对传递给 select
的列表的大小有限制,因此被迫将列表大小限制为 500 以确保安全这意味着我必须将 3,000 个 Redis
列表分成 6 组,每组 500 个列表。
import redis
import socket
import os
import json
import select
from multiprocessing import Process, Queue
from multiprocessing.pool import ThreadPool
from functools import partial
class PollableList():
r = redis.Redis()
def __init__(self, idx):
self.idx = idx
self.list_name = f'sensor_{idx}'
# Create a pair of connected sockets
if os.name == 'posix':
self._putsocket, self._getsocket = socket.socketpair()
else:
# Compatibility on non-POSIX systems
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 0))
server.listen(1)
self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._putsocket.connect(server.getsockname())
self._getsocket, _ = server.accept()
server.close()
def fileno(self):
return self._getsocket.fileno()
def put(self, item):
PollableList.r.rpush(self.list_name, json.dumps(item))
self._putsocket.send(b'x')
def get(self):
self._getsocket.recv(1)
return json.loads(PollableList.r.lpop(self.list_name).decode())
def producer(lists):
""" emulate the 3000 sensors """
# Feed data to the lists (we won't run indefinitely)
for _ in range(3):
for lst in lists:
lst.put({'id': lst.idx, 'value': lst.idx * 100 + 1})
def consumer(q, list_names):
'''
Consumer that reads data on multiple lists simultaneously
'''
while True:
can_read, _, _ = select.select(list_names, [], [])
for r in can_read:
item = r.get()
q.put(item)
# in actual use case, there would be a pool of workers:
def worker(q):
message_number = 0
while True:
item = q.get()
message_number += 1
print(message_number, item)
def main():
lists = [PollableList(i) for i in range(0, 3000)]
# select cannot handle all 3000 sockets at once:
lists1 = lists[0:500]
lists2 = lists[500:1000]
lists3 = lists[1000:1500]
lists4 = lists[1500:2000]
lists5 = lists[2000:2500]
lists6 = lists[2500:3000]
p0 = Process(target=producer, args=(lists,))
p0.daemon = True
p0.start()
q = Queue()
thread_pool = ThreadPool(6)
thread_pool.map_async(partial(consumer, q), [lists1, lists2, lists3, lists4, lists5, lists6])
# This would in reality be a process pool of workers reading from q:
p1 = Process(target=worker, args=(q,))
p1.daemon = True
p1.start()
# wait for all 9000 messages to be displayed by worker:
input('Hit enter to terminate...\n')
# required for Windows:
if __name__ == '__main__':
main()
假设我有数千个传感器读数被添加到 Redis 或 Apache Kafka 等队列中,并且 foreach sensor_id 让其自己的工作人员根据历史读数进行 运行s 计算。每个 sensor_id 都会发布到一个新主题,然后开始流式阅读。
每个工作人员都将从读取数据库开始,以实例化它的一些变量阈值。除了 sensor_id 和使用 sensor_id 作为查找键检索的变量阈值外,每个工作人员的代码都是相同的。一旦 sensor_id 添加到队列中,它就会被分配一个永远 运行 的工作人员。
这样做会不会更有效率:
- multiprocessing.Pool 无限期地启动成千上万的 运行 工人?
- 或“python script.py param1 param2 &” 数千次,只是改变参数来实例化? (我会以编程方式生成一个 bash 脚本以包含所有这些逻辑 fyi)
我希望所有工作人员尽快运行并行,这些是CPU绑定任务,而不是I/O绑定。启动所有工作程序的最佳方式是什么?
澄清:传感器读数将每秒生成一次 sensor_id 所以对于 3000 个传感器,每秒生成 3000 个事件,3000 个主题在队列。有效负载示例是 JSON {sensor_id: "hash", temperature: 85, rpm: 1200, ...}。我理想的设置是每个工作人员在内存中维护最后 200 个左右的读数以进行 运行 计算。另一种方法是一个中央队列,循环工作人员必须首先建立数据库连接才能读取 sensor_id 它从队列中弹出的 200 个读数,但这需要时间。
我的理解是Redis
列表比Publish/Subscribe更可靠,更适合单条消息需要被多个消费者消费的情况。如果所有传感器都写入同一个列表,那么您的应用程序也会大大简化,然后您可以拥有一个由相同工作人员组成的处理池,循环读取该列表。该消息自然会识别涉及哪个传感器,并且当工作人员读取消息以获取之前看到的新传感器 ID 时,它必须通过从数据库中读取相关信息来对该传感器进行“首次”初始化,然后将其保存在由传感器 ID 键入的字典中。最终,这本词典最终可能有 3,000 个条目。 由此得出的结论是,池应该使用包含所有 3,000 个条目的字典进行一次初始化,所有工作人员甚至在开始阅读消息之前都可以访问这些条目。
但是,如果出于某种原因所有 3,000 个传感器必须写入 3,000 个不同的 Redis
列表(如果您 是 使用 Redis
列表开始)。然后,我们的想法是找到一种方法,以某种方式从 3,000 个列表中“同时”读取,在消息可用时检索消息,并将它们写入工作人员可以从中读取的单个队列,从而简化工作人员逻辑。此代码基于 12.13。 Python 食谱第 3 版 中的轮询多线程队列 可用 here。这已经适用于轮询 3,000 Redis
个列表并将读取的项目发送到单个 multiprocessing.Queue
实例。此代码还包括一个由 producer
工作人员组成的处理池,这些工作人员模拟传感器读数的创建,这些读数被添加到 3,000 个列表中的一个列表中,您在此处的实际代码中不会有这些读数。但是 3,000 PollableList
个实例必须可供负责获取传感器读数的任何代码访问。我们这里也只有一个 Process
从消息队列(对象)中读取并打印它们,以保持打印“有序”。实际上,正如我所提到的,您会有一个进程池。
不幸的是,Cookbook 中描述的技术似乎对传递给 select
的列表的大小有限制,因此被迫将列表大小限制为 500 以确保安全这意味着我必须将 3,000 个 Redis
列表分成 6 组,每组 500 个列表。
import redis
import socket
import os
import json
import select
from multiprocessing import Process, Queue
from multiprocessing.pool import ThreadPool
from functools import partial
class PollableList():
r = redis.Redis()
def __init__(self, idx):
self.idx = idx
self.list_name = f'sensor_{idx}'
# Create a pair of connected sockets
if os.name == 'posix':
self._putsocket, self._getsocket = socket.socketpair()
else:
# Compatibility on non-POSIX systems
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 0))
server.listen(1)
self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._putsocket.connect(server.getsockname())
self._getsocket, _ = server.accept()
server.close()
def fileno(self):
return self._getsocket.fileno()
def put(self, item):
PollableList.r.rpush(self.list_name, json.dumps(item))
self._putsocket.send(b'x')
def get(self):
self._getsocket.recv(1)
return json.loads(PollableList.r.lpop(self.list_name).decode())
def producer(lists):
""" emulate the 3000 sensors """
# Feed data to the lists (we won't run indefinitely)
for _ in range(3):
for lst in lists:
lst.put({'id': lst.idx, 'value': lst.idx * 100 + 1})
def consumer(q, list_names):
'''
Consumer that reads data on multiple lists simultaneously
'''
while True:
can_read, _, _ = select.select(list_names, [], [])
for r in can_read:
item = r.get()
q.put(item)
# in actual use case, there would be a pool of workers:
def worker(q):
message_number = 0
while True:
item = q.get()
message_number += 1
print(message_number, item)
def main():
lists = [PollableList(i) for i in range(0, 3000)]
# select cannot handle all 3000 sockets at once:
lists1 = lists[0:500]
lists2 = lists[500:1000]
lists3 = lists[1000:1500]
lists4 = lists[1500:2000]
lists5 = lists[2000:2500]
lists6 = lists[2500:3000]
p0 = Process(target=producer, args=(lists,))
p0.daemon = True
p0.start()
q = Queue()
thread_pool = ThreadPool(6)
thread_pool.map_async(partial(consumer, q), [lists1, lists2, lists3, lists4, lists5, lists6])
# This would in reality be a process pool of workers reading from q:
p1 = Process(target=worker, args=(q,))
p1.daemon = True
p1.start()
# wait for all 9000 messages to be displayed by worker:
input('Hit enter to terminate...\n')
# required for Windows:
if __name__ == '__main__':
main()