如何在读取新数据和更新图表的同时 post 处理多个数据集
How to post-process multiple datasets while reading new data and updating a graph
我有以下情况:
数据集由外部设备以不同的时间间隔(在 0.1 秒到 90 秒之间)生成。代码在采集之间休眠。
每个数据集都需要post处理(这是CPU绑定的,单线程的,需要10s到20s)。 Post-处理不应阻塞 (1)。
采集和 post 处理应该异步工作,每当一个数据集完成时,我想更新 Jupyter 笔记本中的 pyplot 图(当前使用 ipython 小部件) ,使用来自 post 处理的数据。绘图也不应阻止 (1)。
顺序执行(1)和(2)很容易:我获取所有数据集,将其存储在列表中,然后处理每个项目,然后显示。
我不知道如何以并行方式设置它以及如何开始。我是否使用回调函数?回调是否跨进程工作?如何设置正确数量的进程(一个采集,处理和绘制每个核心的其余部分)。所有进程都可以修改所有数据集的相同列表吗?有没有更好的数据结构可以使用?可以在 Python 内完成吗?
这是您需要的 类 以及您如何根据我在评论中描述的(或多或少)的想法将它们组合在一起的一般概述。还有其他方法,但我认为这是最容易理解的。还有更多实现消息队列的“工业实力”产品,但学习曲线更加陡峭。
from multiprocessing import Process, Queue, cpu_count
def acquirer_process(post_process_queue):
while True:
# get next file and put in on the post processing queue
info_about_file_just_acquired = acquire_next_file()
post_process_queue.put(info_about_file_just_acquired)
def post_process_process(post_process_queue, plotting_queue):
while True:
info_about_file_just_acquired = post_process_queue.get()
# post process this file:
info_about_post_processed_file = post_process(info_about_file_just_acquired)
plotting_queue.put(info_about_post_processed_file)
def plotting_process(plotting_queue):
while True:
# Get plotting info for next post-processed file:
info_about_post_processed_file = plotting_queue.get()
# Plot it:
plot(info_about_post_processed_file)
def main():
"""
The main program.
"""
n_processors = cpu_count()
# We need one acquirer process
# We need one plotting process since the assumption is
# that only a single process (thread) can be plotting at a time
# That leaves n_processors - 2 free to work in parallel post processing acquired files:
post_process_queue = Queue()
plotting_queue = Queue()
processes = []
# All these processes that follow are "daemon" processes and will automatically
# terminate when the main process terminates:
processes.append(Process(target=acquirer_process, args=(post_process_queue,), daemon=True))
processes.append(Process(target=plotting_process, args=(plotting_queue,), daemon=True))
for _ in range(n_processors - 2):
processes.append(Process(target=post_process_process, args=(post_process_queue, plotting_queue), daemon=True))
# Start the processes:
for process in processes:
process.start()
# Pause the main process:
input('Hit enter to terminate:')
# Required for Windows:
if __name__ == '__main__':
main()
我有以下情况:
数据集由外部设备以不同的时间间隔(在 0.1 秒到 90 秒之间)生成。代码在采集之间休眠。
每个数据集都需要post处理(这是CPU绑定的,单线程的,需要10s到20s)。 Post-处理不应阻塞 (1)。
采集和 post 处理应该异步工作,每当一个数据集完成时,我想更新 Jupyter 笔记本中的 pyplot 图(当前使用 ipython 小部件) ,使用来自 post 处理的数据。绘图也不应阻止 (1)。
顺序执行(1)和(2)很容易:我获取所有数据集,将其存储在列表中,然后处理每个项目,然后显示。
我不知道如何以并行方式设置它以及如何开始。我是否使用回调函数?回调是否跨进程工作?如何设置正确数量的进程(一个采集,处理和绘制每个核心的其余部分)。所有进程都可以修改所有数据集的相同列表吗?有没有更好的数据结构可以使用?可以在 Python 内完成吗?
这是您需要的 类 以及您如何根据我在评论中描述的(或多或少)的想法将它们组合在一起的一般概述。还有其他方法,但我认为这是最容易理解的。还有更多实现消息队列的“工业实力”产品,但学习曲线更加陡峭。
from multiprocessing import Process, Queue, cpu_count
def acquirer_process(post_process_queue):
while True:
# get next file and put in on the post processing queue
info_about_file_just_acquired = acquire_next_file()
post_process_queue.put(info_about_file_just_acquired)
def post_process_process(post_process_queue, plotting_queue):
while True:
info_about_file_just_acquired = post_process_queue.get()
# post process this file:
info_about_post_processed_file = post_process(info_about_file_just_acquired)
plotting_queue.put(info_about_post_processed_file)
def plotting_process(plotting_queue):
while True:
# Get plotting info for next post-processed file:
info_about_post_processed_file = plotting_queue.get()
# Plot it:
plot(info_about_post_processed_file)
def main():
"""
The main program.
"""
n_processors = cpu_count()
# We need one acquirer process
# We need one plotting process since the assumption is
# that only a single process (thread) can be plotting at a time
# That leaves n_processors - 2 free to work in parallel post processing acquired files:
post_process_queue = Queue()
plotting_queue = Queue()
processes = []
# All these processes that follow are "daemon" processes and will automatically
# terminate when the main process terminates:
processes.append(Process(target=acquirer_process, args=(post_process_queue,), daemon=True))
processes.append(Process(target=plotting_process, args=(plotting_queue,), daemon=True))
for _ in range(n_processors - 2):
processes.append(Process(target=post_process_process, args=(post_process_queue, plotting_queue), daemon=True))
# Start the processes:
for process in processes:
process.start()
# Pause the main process:
input('Hit enter to terminate:')
# Required for Windows:
if __name__ == '__main__':
main()