如何在三个或更多进程之间共享队列?

How can I share a queue among three or more processes?

我有三个进程。一个进程从磁盘读取数据。其他两个进程根据第一个进程读取的数据进行一些计算。 以下代码是我的草图:

def read(pathList, q):
    for path in pathList:
        q.put(readFunc(path))
    q.put(None)
    return


def calc0(src_q, des_q):
    while True:
        data = src_q.get()
        if data is None:
            break
        else:
            des_q.put(calcFunc0(data))
    return


def calc1(src_q, des_q):
    while True:
        data = src_q.get()
        if data is None:
            break
        else:
            des_q.put(calcFunc1(data))
    return

if __name__ == '__main__':
    with Manager() as m:
        dataQueue = m.queue()
        res0 = m.queue()
        res1 = m.queue()
        readProcess = Process(target=read, args=(readPathList, dataQueue))
        readProcess.start()
        calcProcess0 = Process(target=calc0, args=(dataQueue, res0))
        calcProcess0.start()
        calcProcess1 = Process(target=calc1, args=(dataQueue, res1))
        calcProcess1.start()
        readProcess.join()
        calcProcess0.join()
        calcProcess1.join()

然而,上面的代码有一个严重的问题:我无法从队列中获取数据两次!那么,如何将队列中的数据共享给三个或更多进程呢?

HALF9000(使用 multiprocessing.Queue)提供的评论是对 managed 队列的改进,对于 Mark Setchell 的评论还有很多要说的Redis 路线,如果你将做很多这种类型的 publish/subscribe 工作并且你想要一些非常健壮的东西。但对于可能的一次性情况来说,这有点过头了。

我认为性能最好的解决方案是使用未充分利用的 multiprocessing.Pipe 构建 multiprocessing.Queue。它不如 Queue 灵活,因为它实际上只支持一个生产者和一个消费者,但这就是您的目的所需要的,而且它的性能要高得多。

当函数 Pipe([*duplex*]) 被调用时,它 returns 一对 conn1conn2)代表管道末端的 multiprocessing.connection.Connection 个对象。如果双工False,则管道是单向的:conn1只能用于接收消息,conn2只能用于发送消息.对于此应用程序,您只需要单向连接。这个想法是将 read 作为第二个参数传递给函数 list 连接,它应该在这些连接上广播它已读取的数据到需要处理它的各种进程。

from multiprocessing import Process, Pipe
from threading import Thread
import time

def read(pathList, conn_list):
    for path in pathList:
        value = readFunc(path)
        # simulate lots of data:
        for _ in range(1_000):
            for conn in conn_list:
                conn.send(value)
    for conn in conn_list:
        conn.send(None)

def calc0(src_conn, des_conn):
    while True:
        data = src_conn.recv()
        if data is None:
            break
        des_conn.send(calcFunc0(data))
    des_conn.send(None)

def calc1(src_conn, des_conn):
    while True:
        data = src_conn.recv()
        if data is None:
            break
        des_conn.send(calcFunc1(data))
    des_conn.send(None)

# dummy functions for testing

def readFunc(path):
    return path

def calcFunc0(data):
    return data.upper()

def calcFunc1(data):
    return data.lower()

def process_results(results, conn):
    while True:
        data = conn.recv()
        if data is None:
            break
        results.append(data)

if __name__ == '__main__':
    t = time.time()
    readPathList = ['Aa', 'Bb', 'Cc', 'Dd', 'Ee']

    res0_recv, res0_send = Pipe(False)
    data0_recv, data0_send = Pipe(False)
    res1_recv, res1_send = Pipe(False)
    data1_recv, data1_send = Pipe(False)
    results0 = []
    results1 = []
    # start threads to process results
    t0 = Thread(target=process_results, args=(results0, res0_recv))
    t1 = Thread(target=process_results, args=(results1, res1_recv))
    t0.start()
    t1.start()
    readProcess = Process(target=read, args=(readPathList, [data0_send, data1_send]))
    readProcess.start()
    calcProcess0 = Process(target=calc0, args=(data0_recv, res0_send))
    calcProcess0.start()
    calcProcess1 = Process(target=calc1, args=(data1_recv, res1_send))
    calcProcess1.start()
    readProcess.join()
    calcProcess0.join()
    calcProcess1.join()
    t0.join()
    t1.join()
    elapsed = time.time() - t
    print(len(results0), results0[0], results1[0], elapsed)

打印:

5000 AA aa 0.34799909591674805

更新

如果各种连接让代码有点难以理解,那么我们可以将连接数据隐藏在 class、Efficient_Queue 中,这可能会导致代码更容易破译:

from multiprocessing import Process, Pipe
from threading import Thread
import time

class Efficient_Queue:
    def __init__(self):
        self._recv_conn, self._send_conn = Pipe(False)

    def put(self, obj):
        self._send_conn.send(obj)
        return self

    def get(self):
        return self._recv_conn.recv()

def read(pathList, q_list):
    for path in pathList:
        value = readFunc(path)
        # simulate lots of data:
        for _ in range(1_000):
            for q in q_list:
                q.put(value)
    for q in q_list:
        q.put(None)

def calc0(src_q, des_q):
    while True:
        data = src_q.get()
        if data is None:
            break
        des_q.put(calcFunc0(data))
    des_q.put(None)

def calc1(src_q, des_q):
    while True:
        data = src_q.get()
        if data is None:
            break
        des_q.put(calcFunc1(data))
    des_q.put(None)

# dummy functions for testing

def readFunc(path):
    return path

def calcFunc0(data):
    return data.upper()

def calcFunc1(data):
    return data.lower()

def process_results(results, q):
    while True:
        data = q.get()
        if data is None:
            break
        results.append(data)

if __name__ == '__main__':
    t = time.time()
    readPathList = ['Aa', 'Bb', 'Cc', 'Dd', 'Ee']

    res0_q = Efficient_Queue()
    res1_q = Efficient_Queue()
    data0_q = Efficient_Queue()
    data1_q = Efficient_Queue()
    results0 = []
    results1 = []
    # start threads to process results
    t0 = Thread(target=process_results, args=(results0, res0_q))
    t1 = Thread(target=process_results, args=(results1, res1_q))
    t0.start()
    t1.start()
    readProcess = Process(target=read, args=(readPathList, [data0_q, data1_q]))
    readProcess.start()
    calcProcess0 = Process(target=calc0, args=(data0_q, res0_q))
    calcProcess0.start()
    calcProcess1 = Process(target=calc1, args=(data1_q, res1_q))
    calcProcess1.start()
    readProcess.join()
    calcProcess0.join()
    calcProcess1.join()
    t0.join()
    t1.join()
    elapsed = time.time() - t
    print(len(results0), results0[0], results1[0], elapsed)

打印:

5000 AA aa 0.3409993648529053

Efficient_Queue 个实例被 multiprocessing.Queue 个实例替换时,我们得到:

5000 AA aa 0.576676607131958

multiprocessing.Queue 实例被 托管 队列替换时(即 m.Queue() 其中 mManager()),我们得到:

5000 AA aa 2.8409862518310547

正如我在评论中建议的那样,我使用 Redis PubSub 进行了以下操作。

注意安装Redis是一个相当简单轻量级的过程,也可以很容易运行使用docker,映射容器端口6379到主机的 6379:

docker run --name Redis -p 6379:6379 redis:latest

另请注意,由于 Redis 是联网的,您可以 运行 不同机器上的发布者和订阅者。

另请注意,您可以根据需要 运行 任意数量的订阅者和发布者,而无需更改两者的代码。

这里是发布者:

#!/usr/bin/env python3

import sys
import redis
import logging

if __name__ == '__main__':

    # Configurable settings
    host, port = 'localhost', 6379
    topic = 'stuff'
    nMsgs = 20

    logging.basicConfig(level=logging.DEBUG, format='[Redis:Publisher] %(levelname)s:%(message)s')

    # Redis connection
    logging.info(f'Running with host={host}, port={port}')
    r = redis.Redis(host=host, port=port, db=0)

    # Test Redis is running
    if not r.ping():
        logging.critical('Redis is not responding')
        sys.exit(1)

    logging.info('Redis is responding')

    # Publish a bunch of messages
    for i in range(nMsgs):
        message = f'Test message {i+1}/{nMsgs}'
        logging.info(f'Published: {message}')
        r.publish(topic, message)

    # Tell subscribers the show is over
    r.publish(topic, 'quit')

    logging.info('Done')

这是订阅者:

#!/usr/bin/env python3

import sys
import redis
import logging

if __name__ == '__main__':

    # Configurable settings
    host, port = 'localhost', 6379
    topic = 'stuff'

    logging.basicConfig(level=logging.DEBUG, format='[Redis:Subscriber] %(levelname)s:%(message)s')

    # Redis connection
    logging.info(f'Running with host={host}, port={port}')
    r = redis.Redis(host=host, port=port, db=0)

    # Test Redis is running
    if not r.ping():
        logging.critical('Redis is not responding')
        sys.exit(1)

    logging.info('Redis is responding')
    logging.info(f'Subscribing to topic: {topic}')
    sub = r.pubsub()
    sub.subscribe(topic)

    for message in sub.listen():
        if message['data'] == b'quit':
            logging.info(f'Teardown requested')
            break
        logging.info(f'Received: {message}')

    logging.info('Done')

请注意,还有一个异步订阅者选项,让您可以做其他事情(例如 运行 GUI),并且只要收到消息就给您回电。

示例输出 - 发布者

[Redis:Publisher] INFO:Running with host=localhost, port=6379
[Redis:Publisher] INFO:Redis is responding
[Redis:Publisher] INFO:Published: Test message 1/20
[Redis:Publisher] INFO:Published: Test message 2/20
[Redis:Publisher] INFO:Published: Test message 3/20
[Redis:Publisher] INFO:Published: Test message 4/20
[Redis:Publisher] INFO:Published: Test message 5/20
[Redis:Publisher] INFO:Published: Test message 6/20
[Redis:Publisher] INFO:Published: Test message 7/20
[Redis:Publisher] INFO:Published: Test message 8/20
[Redis:Publisher] INFO:Published: Test message 9/20
[Redis:Publisher] INFO:Published: Test message 10/20
[Redis:Publisher] INFO:Published: Test message 11/20
[Redis:Publisher] INFO:Published: Test message 12/20
[Redis:Publisher] INFO:Published: Test message 13/20
[Redis:Publisher] INFO:Published: Test message 14/20
[Redis:Publisher] INFO:Published: Test message 15/20
[Redis:Publisher] INFO:Published: Test message 16/20
[Redis:Publisher] INFO:Published: Test message 17/20
[Redis:Publisher] INFO:Published: Test message 18/20
[Redis:Publisher] INFO:Published: Test message 19/20
[Redis:Publisher] INFO:Published: Test message 20/20
[Redis:Publisher] INFO:Done

示例输出 - 订阅者

[Redis:Subscriber] INFO:Running with host=localhost, port=6379
[Redis:Subscriber] INFO:Redis is responding
[Redis:Subscriber] INFO:Subscribing to topic: stuff
[Redis:Subscriber] INFO:Received: {'type': 'subscribe', 'pattern': None, 'channel': b'stuff', 'data': 1}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 1/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 2/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 3/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 4/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 5/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 6/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 7/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 8/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 9/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 10/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 11/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 12/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 13/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 14/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 15/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 16/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 17/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 18/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 19/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 20/20'}
[Redis:Subscriber] INFO:Teardown requested
[Redis:Subscriber] INFO:Done