如何在三个或更多进程之间共享队列?
How can I share a queue among three or more processes?
我有三个进程。一个进程从磁盘读取数据。其他两个进程根据第一个进程读取的数据进行一些计算。
以下代码是我的草图:
def read(pathList, q):
for path in pathList:
q.put(readFunc(path))
q.put(None)
return
def calc0(src_q, des_q):
while True:
data = src_q.get()
if data is None:
break
else:
des_q.put(calcFunc0(data))
return
def calc1(src_q, des_q):
while True:
data = src_q.get()
if data is None:
break
else:
des_q.put(calcFunc1(data))
return
if __name__ == '__main__':
with Manager() as m:
dataQueue = m.queue()
res0 = m.queue()
res1 = m.queue()
readProcess = Process(target=read, args=(readPathList, dataQueue))
readProcess.start()
calcProcess0 = Process(target=calc0, args=(dataQueue, res0))
calcProcess0.start()
calcProcess1 = Process(target=calc1, args=(dataQueue, res1))
calcProcess1.start()
readProcess.join()
calcProcess0.join()
calcProcess1.join()
然而,上面的代码有一个严重的问题:我无法从队列中获取数据两次!那么,如何将队列中的数据共享给三个或更多进程呢?
HALF9000(使用 multiprocessing.Queue
)提供的评论是对 managed 队列的改进,对于 Mark Setchell 的评论还有很多要说的Redis 路线,如果你将做很多这种类型的 publish/subscribe 工作并且你想要一些非常健壮的东西。但对于可能的一次性情况来说,这有点过头了。
我认为性能最好的解决方案是使用未充分利用的 multiprocessing.Pipe
构建 multiprocessing.Queue
。它不如 Queue 灵活,因为它实际上只支持一个生产者和一个消费者,但这就是您的目的所需要的,而且它的性能要高得多。
当函数 Pipe([*duplex*])
被调用时,它 returns 一对 conn1
、conn2
)代表管道末端的 multiprocessing.connection.Connection
个对象。如果双工是False
,则管道是单向的:conn1
只能用于接收消息,conn2
只能用于发送消息.对于此应用程序,您只需要单向连接。这个想法是将 read
作为第二个参数传递给函数 list 连接,它应该在这些连接上广播它已读取的数据到需要处理它的各种进程。
from multiprocessing import Process, Pipe
from threading import Thread
import time
def read(pathList, conn_list):
for path in pathList:
value = readFunc(path)
# simulate lots of data:
for _ in range(1_000):
for conn in conn_list:
conn.send(value)
for conn in conn_list:
conn.send(None)
def calc0(src_conn, des_conn):
while True:
data = src_conn.recv()
if data is None:
break
des_conn.send(calcFunc0(data))
des_conn.send(None)
def calc1(src_conn, des_conn):
while True:
data = src_conn.recv()
if data is None:
break
des_conn.send(calcFunc1(data))
des_conn.send(None)
# dummy functions for testing
def readFunc(path):
return path
def calcFunc0(data):
return data.upper()
def calcFunc1(data):
return data.lower()
def process_results(results, conn):
while True:
data = conn.recv()
if data is None:
break
results.append(data)
if __name__ == '__main__':
t = time.time()
readPathList = ['Aa', 'Bb', 'Cc', 'Dd', 'Ee']
res0_recv, res0_send = Pipe(False)
data0_recv, data0_send = Pipe(False)
res1_recv, res1_send = Pipe(False)
data1_recv, data1_send = Pipe(False)
results0 = []
results1 = []
# start threads to process results
t0 = Thread(target=process_results, args=(results0, res0_recv))
t1 = Thread(target=process_results, args=(results1, res1_recv))
t0.start()
t1.start()
readProcess = Process(target=read, args=(readPathList, [data0_send, data1_send]))
readProcess.start()
calcProcess0 = Process(target=calc0, args=(data0_recv, res0_send))
calcProcess0.start()
calcProcess1 = Process(target=calc1, args=(data1_recv, res1_send))
calcProcess1.start()
readProcess.join()
calcProcess0.join()
calcProcess1.join()
t0.join()
t1.join()
elapsed = time.time() - t
print(len(results0), results0[0], results1[0], elapsed)
打印:
5000 AA aa 0.34799909591674805
更新
如果各种连接让代码有点难以理解,那么我们可以将连接数据隐藏在 class、Efficient_Queue
中,这可能会导致代码更容易破译:
from multiprocessing import Process, Pipe
from threading import Thread
import time
class Efficient_Queue:
def __init__(self):
self._recv_conn, self._send_conn = Pipe(False)
def put(self, obj):
self._send_conn.send(obj)
return self
def get(self):
return self._recv_conn.recv()
def read(pathList, q_list):
for path in pathList:
value = readFunc(path)
# simulate lots of data:
for _ in range(1_000):
for q in q_list:
q.put(value)
for q in q_list:
q.put(None)
def calc0(src_q, des_q):
while True:
data = src_q.get()
if data is None:
break
des_q.put(calcFunc0(data))
des_q.put(None)
def calc1(src_q, des_q):
while True:
data = src_q.get()
if data is None:
break
des_q.put(calcFunc1(data))
des_q.put(None)
# dummy functions for testing
def readFunc(path):
return path
def calcFunc0(data):
return data.upper()
def calcFunc1(data):
return data.lower()
def process_results(results, q):
while True:
data = q.get()
if data is None:
break
results.append(data)
if __name__ == '__main__':
t = time.time()
readPathList = ['Aa', 'Bb', 'Cc', 'Dd', 'Ee']
res0_q = Efficient_Queue()
res1_q = Efficient_Queue()
data0_q = Efficient_Queue()
data1_q = Efficient_Queue()
results0 = []
results1 = []
# start threads to process results
t0 = Thread(target=process_results, args=(results0, res0_q))
t1 = Thread(target=process_results, args=(results1, res1_q))
t0.start()
t1.start()
readProcess = Process(target=read, args=(readPathList, [data0_q, data1_q]))
readProcess.start()
calcProcess0 = Process(target=calc0, args=(data0_q, res0_q))
calcProcess0.start()
calcProcess1 = Process(target=calc1, args=(data1_q, res1_q))
calcProcess1.start()
readProcess.join()
calcProcess0.join()
calcProcess1.join()
t0.join()
t1.join()
elapsed = time.time() - t
print(len(results0), results0[0], results1[0], elapsed)
打印:
5000 AA aa 0.3409993648529053
当 Efficient_Queue
个实例被 multiprocessing.Queue
个实例替换时,我们得到:
5000 AA aa 0.576676607131958
当 multiprocessing.Queue
实例被 托管 队列替换时(即 m.Queue()
其中 m
是 Manager()
),我们得到:
5000 AA aa 2.8409862518310547
正如我在评论中建议的那样,我使用 Redis PubSub 进行了以下操作。
注意安装Redis是一个相当简单轻量级的过程,也可以很容易运行使用docker
,映射容器端口6379到主机的 6379:
docker run --name Redis -p 6379:6379 redis:latest
另请注意,由于 Redis 是联网的,您可以 运行 不同机器上的发布者和订阅者。
另请注意,您可以根据需要 运行 任意数量的订阅者和发布者,而无需更改两者的代码。
这里是发布者:
#!/usr/bin/env python3
import sys
import redis
import logging
if __name__ == '__main__':
# Configurable settings
host, port = 'localhost', 6379
topic = 'stuff'
nMsgs = 20
logging.basicConfig(level=logging.DEBUG, format='[Redis:Publisher] %(levelname)s:%(message)s')
# Redis connection
logging.info(f'Running with host={host}, port={port}')
r = redis.Redis(host=host, port=port, db=0)
# Test Redis is running
if not r.ping():
logging.critical('Redis is not responding')
sys.exit(1)
logging.info('Redis is responding')
# Publish a bunch of messages
for i in range(nMsgs):
message = f'Test message {i+1}/{nMsgs}'
logging.info(f'Published: {message}')
r.publish(topic, message)
# Tell subscribers the show is over
r.publish(topic, 'quit')
logging.info('Done')
这是订阅者:
#!/usr/bin/env python3
import sys
import redis
import logging
if __name__ == '__main__':
# Configurable settings
host, port = 'localhost', 6379
topic = 'stuff'
logging.basicConfig(level=logging.DEBUG, format='[Redis:Subscriber] %(levelname)s:%(message)s')
# Redis connection
logging.info(f'Running with host={host}, port={port}')
r = redis.Redis(host=host, port=port, db=0)
# Test Redis is running
if not r.ping():
logging.critical('Redis is not responding')
sys.exit(1)
logging.info('Redis is responding')
logging.info(f'Subscribing to topic: {topic}')
sub = r.pubsub()
sub.subscribe(topic)
for message in sub.listen():
if message['data'] == b'quit':
logging.info(f'Teardown requested')
break
logging.info(f'Received: {message}')
logging.info('Done')
请注意,还有一个异步订阅者选项,让您可以做其他事情(例如 运行 GUI),并且只要收到消息就给您回电。
示例输出 - 发布者
[Redis:Publisher] INFO:Running with host=localhost, port=6379
[Redis:Publisher] INFO:Redis is responding
[Redis:Publisher] INFO:Published: Test message 1/20
[Redis:Publisher] INFO:Published: Test message 2/20
[Redis:Publisher] INFO:Published: Test message 3/20
[Redis:Publisher] INFO:Published: Test message 4/20
[Redis:Publisher] INFO:Published: Test message 5/20
[Redis:Publisher] INFO:Published: Test message 6/20
[Redis:Publisher] INFO:Published: Test message 7/20
[Redis:Publisher] INFO:Published: Test message 8/20
[Redis:Publisher] INFO:Published: Test message 9/20
[Redis:Publisher] INFO:Published: Test message 10/20
[Redis:Publisher] INFO:Published: Test message 11/20
[Redis:Publisher] INFO:Published: Test message 12/20
[Redis:Publisher] INFO:Published: Test message 13/20
[Redis:Publisher] INFO:Published: Test message 14/20
[Redis:Publisher] INFO:Published: Test message 15/20
[Redis:Publisher] INFO:Published: Test message 16/20
[Redis:Publisher] INFO:Published: Test message 17/20
[Redis:Publisher] INFO:Published: Test message 18/20
[Redis:Publisher] INFO:Published: Test message 19/20
[Redis:Publisher] INFO:Published: Test message 20/20
[Redis:Publisher] INFO:Done
示例输出 - 订阅者
[Redis:Subscriber] INFO:Running with host=localhost, port=6379
[Redis:Subscriber] INFO:Redis is responding
[Redis:Subscriber] INFO:Subscribing to topic: stuff
[Redis:Subscriber] INFO:Received: {'type': 'subscribe', 'pattern': None, 'channel': b'stuff', 'data': 1}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 1/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 2/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 3/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 4/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 5/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 6/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 7/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 8/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 9/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 10/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 11/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 12/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 13/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 14/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 15/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 16/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 17/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 18/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 19/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 20/20'}
[Redis:Subscriber] INFO:Teardown requested
[Redis:Subscriber] INFO:Done
我有三个进程。一个进程从磁盘读取数据。其他两个进程根据第一个进程读取的数据进行一些计算。 以下代码是我的草图:
def read(pathList, q):
for path in pathList:
q.put(readFunc(path))
q.put(None)
return
def calc0(src_q, des_q):
while True:
data = src_q.get()
if data is None:
break
else:
des_q.put(calcFunc0(data))
return
def calc1(src_q, des_q):
while True:
data = src_q.get()
if data is None:
break
else:
des_q.put(calcFunc1(data))
return
if __name__ == '__main__':
with Manager() as m:
dataQueue = m.queue()
res0 = m.queue()
res1 = m.queue()
readProcess = Process(target=read, args=(readPathList, dataQueue))
readProcess.start()
calcProcess0 = Process(target=calc0, args=(dataQueue, res0))
calcProcess0.start()
calcProcess1 = Process(target=calc1, args=(dataQueue, res1))
calcProcess1.start()
readProcess.join()
calcProcess0.join()
calcProcess1.join()
然而,上面的代码有一个严重的问题:我无法从队列中获取数据两次!那么,如何将队列中的数据共享给三个或更多进程呢?
HALF9000(使用 multiprocessing.Queue
)提供的评论是对 managed 队列的改进,对于 Mark Setchell 的评论还有很多要说的Redis 路线,如果你将做很多这种类型的 publish/subscribe 工作并且你想要一些非常健壮的东西。但对于可能的一次性情况来说,这有点过头了。
我认为性能最好的解决方案是使用未充分利用的 multiprocessing.Pipe
构建 multiprocessing.Queue
。它不如 Queue 灵活,因为它实际上只支持一个生产者和一个消费者,但这就是您的目的所需要的,而且它的性能要高得多。
当函数 Pipe([*duplex*])
被调用时,它 returns 一对 conn1
、conn2
)代表管道末端的 multiprocessing.connection.Connection
个对象。如果双工是False
,则管道是单向的:conn1
只能用于接收消息,conn2
只能用于发送消息.对于此应用程序,您只需要单向连接。这个想法是将 read
作为第二个参数传递给函数 list 连接,它应该在这些连接上广播它已读取的数据到需要处理它的各种进程。
from multiprocessing import Process, Pipe
from threading import Thread
import time
def read(pathList, conn_list):
for path in pathList:
value = readFunc(path)
# simulate lots of data:
for _ in range(1_000):
for conn in conn_list:
conn.send(value)
for conn in conn_list:
conn.send(None)
def calc0(src_conn, des_conn):
while True:
data = src_conn.recv()
if data is None:
break
des_conn.send(calcFunc0(data))
des_conn.send(None)
def calc1(src_conn, des_conn):
while True:
data = src_conn.recv()
if data is None:
break
des_conn.send(calcFunc1(data))
des_conn.send(None)
# dummy functions for testing
def readFunc(path):
return path
def calcFunc0(data):
return data.upper()
def calcFunc1(data):
return data.lower()
def process_results(results, conn):
while True:
data = conn.recv()
if data is None:
break
results.append(data)
if __name__ == '__main__':
t = time.time()
readPathList = ['Aa', 'Bb', 'Cc', 'Dd', 'Ee']
res0_recv, res0_send = Pipe(False)
data0_recv, data0_send = Pipe(False)
res1_recv, res1_send = Pipe(False)
data1_recv, data1_send = Pipe(False)
results0 = []
results1 = []
# start threads to process results
t0 = Thread(target=process_results, args=(results0, res0_recv))
t1 = Thread(target=process_results, args=(results1, res1_recv))
t0.start()
t1.start()
readProcess = Process(target=read, args=(readPathList, [data0_send, data1_send]))
readProcess.start()
calcProcess0 = Process(target=calc0, args=(data0_recv, res0_send))
calcProcess0.start()
calcProcess1 = Process(target=calc1, args=(data1_recv, res1_send))
calcProcess1.start()
readProcess.join()
calcProcess0.join()
calcProcess1.join()
t0.join()
t1.join()
elapsed = time.time() - t
print(len(results0), results0[0], results1[0], elapsed)
打印:
5000 AA aa 0.34799909591674805
更新
如果各种连接让代码有点难以理解,那么我们可以将连接数据隐藏在 class、Efficient_Queue
中,这可能会导致代码更容易破译:
from multiprocessing import Process, Pipe
from threading import Thread
import time
class Efficient_Queue:
def __init__(self):
self._recv_conn, self._send_conn = Pipe(False)
def put(self, obj):
self._send_conn.send(obj)
return self
def get(self):
return self._recv_conn.recv()
def read(pathList, q_list):
for path in pathList:
value = readFunc(path)
# simulate lots of data:
for _ in range(1_000):
for q in q_list:
q.put(value)
for q in q_list:
q.put(None)
def calc0(src_q, des_q):
while True:
data = src_q.get()
if data is None:
break
des_q.put(calcFunc0(data))
des_q.put(None)
def calc1(src_q, des_q):
while True:
data = src_q.get()
if data is None:
break
des_q.put(calcFunc1(data))
des_q.put(None)
# dummy functions for testing
def readFunc(path):
return path
def calcFunc0(data):
return data.upper()
def calcFunc1(data):
return data.lower()
def process_results(results, q):
while True:
data = q.get()
if data is None:
break
results.append(data)
if __name__ == '__main__':
t = time.time()
readPathList = ['Aa', 'Bb', 'Cc', 'Dd', 'Ee']
res0_q = Efficient_Queue()
res1_q = Efficient_Queue()
data0_q = Efficient_Queue()
data1_q = Efficient_Queue()
results0 = []
results1 = []
# start threads to process results
t0 = Thread(target=process_results, args=(results0, res0_q))
t1 = Thread(target=process_results, args=(results1, res1_q))
t0.start()
t1.start()
readProcess = Process(target=read, args=(readPathList, [data0_q, data1_q]))
readProcess.start()
calcProcess0 = Process(target=calc0, args=(data0_q, res0_q))
calcProcess0.start()
calcProcess1 = Process(target=calc1, args=(data1_q, res1_q))
calcProcess1.start()
readProcess.join()
calcProcess0.join()
calcProcess1.join()
t0.join()
t1.join()
elapsed = time.time() - t
print(len(results0), results0[0], results1[0], elapsed)
打印:
5000 AA aa 0.3409993648529053
当 Efficient_Queue
个实例被 multiprocessing.Queue
个实例替换时,我们得到:
5000 AA aa 0.576676607131958
当 multiprocessing.Queue
实例被 托管 队列替换时(即 m.Queue()
其中 m
是 Manager()
),我们得到:
5000 AA aa 2.8409862518310547
正如我在评论中建议的那样,我使用 Redis PubSub 进行了以下操作。
注意安装Redis是一个相当简单轻量级的过程,也可以很容易运行使用docker
,映射容器端口6379到主机的 6379:
docker run --name Redis -p 6379:6379 redis:latest
另请注意,由于 Redis 是联网的,您可以 运行 不同机器上的发布者和订阅者。
另请注意,您可以根据需要 运行 任意数量的订阅者和发布者,而无需更改两者的代码。
这里是发布者:
#!/usr/bin/env python3
import sys
import redis
import logging
if __name__ == '__main__':
# Configurable settings
host, port = 'localhost', 6379
topic = 'stuff'
nMsgs = 20
logging.basicConfig(level=logging.DEBUG, format='[Redis:Publisher] %(levelname)s:%(message)s')
# Redis connection
logging.info(f'Running with host={host}, port={port}')
r = redis.Redis(host=host, port=port, db=0)
# Test Redis is running
if not r.ping():
logging.critical('Redis is not responding')
sys.exit(1)
logging.info('Redis is responding')
# Publish a bunch of messages
for i in range(nMsgs):
message = f'Test message {i+1}/{nMsgs}'
logging.info(f'Published: {message}')
r.publish(topic, message)
# Tell subscribers the show is over
r.publish(topic, 'quit')
logging.info('Done')
这是订阅者:
#!/usr/bin/env python3
import sys
import redis
import logging
if __name__ == '__main__':
# Configurable settings
host, port = 'localhost', 6379
topic = 'stuff'
logging.basicConfig(level=logging.DEBUG, format='[Redis:Subscriber] %(levelname)s:%(message)s')
# Redis connection
logging.info(f'Running with host={host}, port={port}')
r = redis.Redis(host=host, port=port, db=0)
# Test Redis is running
if not r.ping():
logging.critical('Redis is not responding')
sys.exit(1)
logging.info('Redis is responding')
logging.info(f'Subscribing to topic: {topic}')
sub = r.pubsub()
sub.subscribe(topic)
for message in sub.listen():
if message['data'] == b'quit':
logging.info(f'Teardown requested')
break
logging.info(f'Received: {message}')
logging.info('Done')
请注意,还有一个异步订阅者选项,让您可以做其他事情(例如 运行 GUI),并且只要收到消息就给您回电。
示例输出 - 发布者
[Redis:Publisher] INFO:Running with host=localhost, port=6379
[Redis:Publisher] INFO:Redis is responding
[Redis:Publisher] INFO:Published: Test message 1/20
[Redis:Publisher] INFO:Published: Test message 2/20
[Redis:Publisher] INFO:Published: Test message 3/20
[Redis:Publisher] INFO:Published: Test message 4/20
[Redis:Publisher] INFO:Published: Test message 5/20
[Redis:Publisher] INFO:Published: Test message 6/20
[Redis:Publisher] INFO:Published: Test message 7/20
[Redis:Publisher] INFO:Published: Test message 8/20
[Redis:Publisher] INFO:Published: Test message 9/20
[Redis:Publisher] INFO:Published: Test message 10/20
[Redis:Publisher] INFO:Published: Test message 11/20
[Redis:Publisher] INFO:Published: Test message 12/20
[Redis:Publisher] INFO:Published: Test message 13/20
[Redis:Publisher] INFO:Published: Test message 14/20
[Redis:Publisher] INFO:Published: Test message 15/20
[Redis:Publisher] INFO:Published: Test message 16/20
[Redis:Publisher] INFO:Published: Test message 17/20
[Redis:Publisher] INFO:Published: Test message 18/20
[Redis:Publisher] INFO:Published: Test message 19/20
[Redis:Publisher] INFO:Published: Test message 20/20
[Redis:Publisher] INFO:Done
示例输出 - 订阅者
[Redis:Subscriber] INFO:Running with host=localhost, port=6379
[Redis:Subscriber] INFO:Redis is responding
[Redis:Subscriber] INFO:Subscribing to topic: stuff
[Redis:Subscriber] INFO:Received: {'type': 'subscribe', 'pattern': None, 'channel': b'stuff', 'data': 1}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 1/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 2/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 3/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 4/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 5/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 6/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 7/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 8/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 9/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 10/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 11/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 12/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 13/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 14/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 15/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 16/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 17/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 18/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 19/20'}
[Redis:Subscriber] INFO:Received: {'type': 'message', 'pattern': None, 'channel': b'stuff', 'data': b'Test message 20/20'}
[Redis:Subscriber] INFO:Teardown requested
[Redis:Subscriber] INFO:Done