等待队列人口 python 多处理的最佳方式
Best way to wait for queue population python multiprocessing
第一次认真玩并行计算。
我在 python 中使用 multiprocessing
模块,我正在 运行 解决这个问题:
队列消费者 运行 在与队列生产者不同的进程中,前者应等待后者完成其工作,然后停止迭代队列。有时消费者比生产者快并且队列保持为空。
如果我不设置任何条件,程序将不会停止。
在示例代码中,我使用通配符 PRODUCER_IS_OVER
来举例说明我需要什么。
以下代码勾勒出问题:
def save_data(save_que, file_):
### Coroutine instantiation
PRODUCER_IS_OVER = False
empty = False
### Queue consumer
while not(empty and PRODUCER_IS_OVER):
try:
data = save_que.get()
print("saving data",data)
except:
empty = save_que.empty()
print(empty)
pass
#PRODUCER_IS_OVER = get_condition()
print ("All data saved")
return
def get_condition():
###NameError: global name 'PRODUCER_IS_OVER' is not defined
if PRODUCER_IS_OVER:
return True
else:
return False
def produce_data(save_que):
for _ in range(5):
time.sleep(random.randint(1,5))
data = random.randint(1,10)
print("sending data", data)
save_que.put(data)
### Main function here
import random
import time
from multiprocessing import Queue, Manager, Process
manager = Manager()
save_que = manager.Queue()
file_ = "file"
save_p = Process(target= save_data, args=(save_que, file_))
save_p.start()
PRODUCER_IS_OVER = False
produce_data(save_que)
PRODUCER_IS_OVER = True
save_p.join()
produce_data
需要可变的时间,我希望 save_p 进程在填充队列之前启动,以便在填充时消耗队列。
我认为有解决方法来传达何时停止迭代,但我想知道是否存在正确的方法来做到这一点。
multiprocessing.Pipe 和 .Lock 我都试过了,但我不知道如何正确有效地实现。
已解决:这是最好的方法吗?
以下代码在 Q 中实现 STOPMESSAGE,工作正常,我可以用 class、QMsg
来改进它,以防语言仅支持静态类型。
def save_data(save_que, file_):
# Coroutine instantiation
PRODUCER_IS_OVER = False
empty = False
# Queue consumer
while not(empty and PRODUCER_IS_OVER):
data = save_que.get()
empty = save_que.empty()
print("saving data", data)
if data == "STOP":
PRODUCER_IS_OVER = True
print("All data saved")
return
def get_condition():
# NameError: global name 'PRODUCER_IS_OVER' is not defined
if PRODUCER_IS_OVER:
return True
else:
return False
def produce_data(save_que):
for _ in range(5):
time.sleep(random.randint(1, 5))
data = random.randint(1, 10)
print("sending data", data)
save_que.put(data)
save_que.put("STOP")
# Main function here
import random
import time
from multiprocessing import Queue, Manager, Process
manager = Manager()
save_que = manager.Queue()
file_ = "file"
save_p = Process(target=save_data, args=(save_que, file_))
save_p.start()
PRODUCER_IS_OVER = False
produce_data(save_que)
PRODUCER_IS_OVER = True
save_p.join()
但是如果队列是由几个独立的进程产生的,这就不起作用了:在这种情况下,谁将发送 ALT 消息?
另一种解决方案是将进程索引存储在列表中并执行:
def some_alive():
for p in processes:
if p.is_alive():
return True
return False
但是 multiprocessing
仅在父进程中支持 .is_alive
方法,这在我的情况下是有限的。
您要求的是 queue.get
的默认行为。它将等待(阻塞)直到队列中有一个项目可用。发送哨兵值确实是结束子进程的首选方式。
您的场景可以简化为:
import random
import time
from multiprocessing import Manager, Process
def save_data(save_que, file_):
for data in iter(save_que.get, 'STOP'):
print("saving data", data)
print("All data saved")
return
def produce_data(save_que):
for _ in range(5):
time.sleep(random.randint(1, 5))
data = random.randint(1, 10)
print("sending data", data)
save_que.put(data)
save_que.put("STOP")
if __name__ == '__main__':
manager = Manager()
save_que = manager.Queue()
file_ = "file"
save_p = Process(target=save_data, args=(save_que, file_))
save_p.start()
produce_data(save_que)
save_p.join()
编辑以回答评论中的问题:
How should I implement the stop message in case the cue is accessed by several different agents and each one has a randomized time for finishing its task?
没什么不同,您必须将尽可能多的哨兵值放入队列中。
一个实用函数,returns 流记录器可以查看操作的位置:
def get_stream_logger(level=logging.DEBUG):
"""Return logger with configured StreamHandler."""
stream_logger = logging.getLogger('stream_logger')
stream_logger.handlers = []
stream_logger.setLevel(level)
sh = logging.StreamHandler()
sh.setLevel(level)
fmt = '[%(asctime)s %(levelname)-8s %(processName)s] --- %(message)s'
formatter = logging.Formatter(fmt)
sh.setFormatter(formatter)
stream_logger.addHandler(sh)
return stream_logger
具有多个消费者的代码:
import random
import time
from multiprocessing import Manager, Process
import logging
def save_data(save_que, file_):
stream_logger = get_stream_logger()
for data in iter(save_que.get, 'STOP'):
time.sleep(random.randint(1, 5)) # random delay
stream_logger.debug(f"saving: {data}") # DEBUG
stream_logger.debug("all data saved") # DEBUG
return
def produce_data(save_que, n_workers):
stream_logger = get_stream_logger()
for _ in range(5):
time.sleep(random.randint(1, 5))
data = random.randint(1, 10)
stream_logger.debug(f"producing: {data}") # DEBUG
save_que.put(data)
for _ in range(n_workers):
save_que.put("STOP")
if __name__ == '__main__':
file_ = "file"
n_processes = 2
manager = Manager()
save_que = manager.Queue()
processes = []
for _ in range(n_processes):
processes.append(Process(target=save_data, args=(save_que, file_)))
for p in processes:
p.start()
produce_data(save_que, n_workers=n_processes)
for p in processes:
p.join()
示例输出:
[2018-09-02 20:10:35,885 DEBUG MainProcess] --- producing: 2
[2018-09-02 20:10:38,887 DEBUG MainProcess] --- producing: 8
[2018-09-02 20:10:38,887 DEBUG Process-2] --- saving: 2
[2018-09-02 20:10:39,889 DEBUG MainProcess] --- producing: 8
[2018-09-02 20:10:40,889 DEBUG Process-3] --- saving: 8
[2018-09-02 20:10:40,890 DEBUG Process-2] --- saving: 8
[2018-09-02 20:10:42,890 DEBUG MainProcess] --- producing: 1
[2018-09-02 20:10:43,891 DEBUG Process-3] --- saving: 1
[2018-09-02 20:10:46,893 DEBUG MainProcess] --- producing: 5
[2018-09-02 20:10:46,894 DEBUG Process-3] --- all data saved
[2018-09-02 20:10:50,895 DEBUG Process-2] --- saving: 5
[2018-09-02 20:10:50,896 DEBUG Process-2] --- all data saved
Process finished with exit code 0
第一次认真玩并行计算。
我在 python 中使用 multiprocessing
模块,我正在 运行 解决这个问题:
队列消费者 运行 在与队列生产者不同的进程中,前者应等待后者完成其工作,然后停止迭代队列。有时消费者比生产者快并且队列保持为空。 如果我不设置任何条件,程序将不会停止。
在示例代码中,我使用通配符 PRODUCER_IS_OVER
来举例说明我需要什么。
以下代码勾勒出问题:
def save_data(save_que, file_):
### Coroutine instantiation
PRODUCER_IS_OVER = False
empty = False
### Queue consumer
while not(empty and PRODUCER_IS_OVER):
try:
data = save_que.get()
print("saving data",data)
except:
empty = save_que.empty()
print(empty)
pass
#PRODUCER_IS_OVER = get_condition()
print ("All data saved")
return
def get_condition():
###NameError: global name 'PRODUCER_IS_OVER' is not defined
if PRODUCER_IS_OVER:
return True
else:
return False
def produce_data(save_que):
for _ in range(5):
time.sleep(random.randint(1,5))
data = random.randint(1,10)
print("sending data", data)
save_que.put(data)
### Main function here
import random
import time
from multiprocessing import Queue, Manager, Process
manager = Manager()
save_que = manager.Queue()
file_ = "file"
save_p = Process(target= save_data, args=(save_que, file_))
save_p.start()
PRODUCER_IS_OVER = False
produce_data(save_que)
PRODUCER_IS_OVER = True
save_p.join()
produce_data
需要可变的时间,我希望 save_p 进程在填充队列之前启动,以便在填充时消耗队列。
我认为有解决方法来传达何时停止迭代,但我想知道是否存在正确的方法来做到这一点。
multiprocessing.Pipe 和 .Lock 我都试过了,但我不知道如何正确有效地实现。
已解决:这是最好的方法吗?
以下代码在 Q 中实现 STOPMESSAGE,工作正常,我可以用 class、QMsg
来改进它,以防语言仅支持静态类型。
def save_data(save_que, file_):
# Coroutine instantiation
PRODUCER_IS_OVER = False
empty = False
# Queue consumer
while not(empty and PRODUCER_IS_OVER):
data = save_que.get()
empty = save_que.empty()
print("saving data", data)
if data == "STOP":
PRODUCER_IS_OVER = True
print("All data saved")
return
def get_condition():
# NameError: global name 'PRODUCER_IS_OVER' is not defined
if PRODUCER_IS_OVER:
return True
else:
return False
def produce_data(save_que):
for _ in range(5):
time.sleep(random.randint(1, 5))
data = random.randint(1, 10)
print("sending data", data)
save_que.put(data)
save_que.put("STOP")
# Main function here
import random
import time
from multiprocessing import Queue, Manager, Process
manager = Manager()
save_que = manager.Queue()
file_ = "file"
save_p = Process(target=save_data, args=(save_que, file_))
save_p.start()
PRODUCER_IS_OVER = False
produce_data(save_que)
PRODUCER_IS_OVER = True
save_p.join()
但是如果队列是由几个独立的进程产生的,这就不起作用了:在这种情况下,谁将发送 ALT 消息?
另一种解决方案是将进程索引存储在列表中并执行:
def some_alive():
for p in processes:
if p.is_alive():
return True
return False
但是 multiprocessing
仅在父进程中支持 .is_alive
方法,这在我的情况下是有限的。
您要求的是 queue.get
的默认行为。它将等待(阻塞)直到队列中有一个项目可用。发送哨兵值确实是结束子进程的首选方式。
您的场景可以简化为:
import random
import time
from multiprocessing import Manager, Process
def save_data(save_que, file_):
for data in iter(save_que.get, 'STOP'):
print("saving data", data)
print("All data saved")
return
def produce_data(save_que):
for _ in range(5):
time.sleep(random.randint(1, 5))
data = random.randint(1, 10)
print("sending data", data)
save_que.put(data)
save_que.put("STOP")
if __name__ == '__main__':
manager = Manager()
save_que = manager.Queue()
file_ = "file"
save_p = Process(target=save_data, args=(save_que, file_))
save_p.start()
produce_data(save_que)
save_p.join()
编辑以回答评论中的问题:
How should I implement the stop message in case the cue is accessed by several different agents and each one has a randomized time for finishing its task?
没什么不同,您必须将尽可能多的哨兵值放入队列中。
一个实用函数,returns 流记录器可以查看操作的位置:
def get_stream_logger(level=logging.DEBUG):
"""Return logger with configured StreamHandler."""
stream_logger = logging.getLogger('stream_logger')
stream_logger.handlers = []
stream_logger.setLevel(level)
sh = logging.StreamHandler()
sh.setLevel(level)
fmt = '[%(asctime)s %(levelname)-8s %(processName)s] --- %(message)s'
formatter = logging.Formatter(fmt)
sh.setFormatter(formatter)
stream_logger.addHandler(sh)
return stream_logger
具有多个消费者的代码:
import random
import time
from multiprocessing import Manager, Process
import logging
def save_data(save_que, file_):
stream_logger = get_stream_logger()
for data in iter(save_que.get, 'STOP'):
time.sleep(random.randint(1, 5)) # random delay
stream_logger.debug(f"saving: {data}") # DEBUG
stream_logger.debug("all data saved") # DEBUG
return
def produce_data(save_que, n_workers):
stream_logger = get_stream_logger()
for _ in range(5):
time.sleep(random.randint(1, 5))
data = random.randint(1, 10)
stream_logger.debug(f"producing: {data}") # DEBUG
save_que.put(data)
for _ in range(n_workers):
save_que.put("STOP")
if __name__ == '__main__':
file_ = "file"
n_processes = 2
manager = Manager()
save_que = manager.Queue()
processes = []
for _ in range(n_processes):
processes.append(Process(target=save_data, args=(save_que, file_)))
for p in processes:
p.start()
produce_data(save_que, n_workers=n_processes)
for p in processes:
p.join()
示例输出:
[2018-09-02 20:10:35,885 DEBUG MainProcess] --- producing: 2
[2018-09-02 20:10:38,887 DEBUG MainProcess] --- producing: 8
[2018-09-02 20:10:38,887 DEBUG Process-2] --- saving: 2
[2018-09-02 20:10:39,889 DEBUG MainProcess] --- producing: 8
[2018-09-02 20:10:40,889 DEBUG Process-3] --- saving: 8
[2018-09-02 20:10:40,890 DEBUG Process-2] --- saving: 8
[2018-09-02 20:10:42,890 DEBUG MainProcess] --- producing: 1
[2018-09-02 20:10:43,891 DEBUG Process-3] --- saving: 1
[2018-09-02 20:10:46,893 DEBUG MainProcess] --- producing: 5
[2018-09-02 20:10:46,894 DEBUG Process-3] --- all data saved
[2018-09-02 20:10:50,895 DEBUG Process-2] --- saving: 5
[2018-09-02 20:10:50,896 DEBUG Process-2] --- all data saved
Process finished with exit code 0