如何在读取新数据和更新图表的同时 post 处理多个数据集

How to post-process multiple datasets while reading new data and updating a graph

我有以下情况:

  1. 数据集由外部设备以不同的时间间隔(在 0.1 秒到 90 秒之间)生成。代码在采集之间休眠。

  2. 每个数据集都需要post处理(这是CPU绑定的,单线程的,需要10s到20s)。 Post-处理不应阻塞 (1)。

  3. 采集和 post 处理应该异步工作,每当一个数据集完成时,我想更新 Jupyter 笔记本中的 pyplot 图(当前使用 ipython 小部件) ,使用来自 post 处理的数据。绘图也不应阻止 (1)。

顺序执行(1)和(2)很容易:我获取所有数据集,将其存储在列表中,然后处理每个项目,然后显示。

我不知道如何以并行方式设置它以及如何开始。我是否使用回调函数?回调是否跨进程工作?如何设置正确数量的进程(一个采集,处理和绘制每个核心的其余部分)。所有进程都可以修改所有数据集的相同列表吗?有没有更好的数据结构可以使用?可以在 Python 内完成吗?

这是您需要的 类 以及您如何根据我在评论中描述的(或多或少)的想法将它们组合在一起的一般概述。还有其他方法,但我认为这是最容易理解的。还有更多实现消息队列的“工业实力”产品,但学习曲线更加陡峭。

from multiprocessing import Process, Queue, cpu_count


def acquirer_process(post_process_queue):
    while True:
        # get next file and put in on the post processing queue
        info_about_file_just_acquired = acquire_next_file()
        post_process_queue.put(info_about_file_just_acquired)

def post_process_process(post_process_queue, plotting_queue):
    while True:
        info_about_file_just_acquired = post_process_queue.get()
        # post process this file:
        info_about_post_processed_file = post_process(info_about_file_just_acquired)
        plotting_queue.put(info_about_post_processed_file)

def plotting_process(plotting_queue):
    while True:
        # Get plotting info for next post-processed file:
        info_about_post_processed_file = plotting_queue.get()
        # Plot it:
        plot(info_about_post_processed_file)

def main():
    """
    The main program.
    """
    n_processors = cpu_count()
    # We need one acquirer process
    # We need one plotting process since the assumption is
    # that only a single process (thread) can be plotting at a time
    # That leaves n_processors - 2 free to work in parallel post processing acquired files:
    post_process_queue = Queue()
    plotting_queue = Queue()
    processes = []
    # All these processes that follow are "daemon" processes and will automatically
    # terminate when the main process terminates:
    processes.append(Process(target=acquirer_process, args=(post_process_queue,), daemon=True))
    processes.append(Process(target=plotting_process, args=(plotting_queue,), daemon=True))
    for _ in range(n_processors - 2):
        processes.append(Process(target=post_process_process, args=(post_process_queue, plotting_queue), daemon=True))
    # Start the processes:
    for process in processes:
        process.start()
    # Pause the main process:
    input('Hit enter to terminate:')

# Required for Windows:
if __name__ == '__main__':
    main()