使用字典类型时多处理中的 EOF 错误?

EOF error in multiprocessing while using dictionary type?

我有下面这段代码,它完全符合我的要求。但是对于文件大小更高的文件,它会中断并给我一个错误:

错误:“引发 EOFError”

def co(operation, in_queue, processed_lines):
    while True:
        item = in_queue.get()
        line_number, line = item

        if line is None:
            return

        line = line + operation + "changed"
        processed_lines[line_number] = line

def _fo(file_name, operation):

    manager = Manager()
    results = manager.dict()
    work = manager.Queue(10)

    pool = []
    for i in range(10):
        p = Process(target=co, args=(operation, work, results))
        p.start()
        pool.append(p)

    with open(file_name) as f:
        num_lines = 0
        iters = itertools.chain(f, (None,) * 10)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)
                num_lines += 1

    for p in pool:
        p.join()

    return [results[idx] for idx in range(num_lines - 10)]

现在我知道在我的多进程可以写回结果之前我的主进程正在关闭但是我无法解决它。我已经使用 p.join() 优雅地关闭了我的进程。我尝试将 p.close() 放在 p.join() 之前,但随后出现错误:“'Process' 对象没有属性 'close'”。

请问我可以做些什么来解决这个问题?

错误:

2020-10-01T15:55:22.488-05:00   item = in_queue.get()

2020-10-01T15:55:22.488-05:00   File "<string>", line 2, in get

2020-10-01T15:55:22.488-05:00   File "/opt/python3.6/lib/python3.6/multiprocessing/managers.py", line 757, in _callmethod

2020-10-01T15:55:22.488-05:00   kind, result = conn.recv()

2020-10-01T15:55:22.488-05:00   File "/opt/python3.6/lib/python3.6/multiprocessing/connection.py", line 250, in recv

2020-10-01T15:55:22.488-05:00   buf = self._recv_bytes()

2020-10-01T15:55:22.488-05:00   File "/opt/python3.6/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes

2020-10-01T15:55:22.488-05:00   buf = self._recv(4)

2020-10-01T15:55:22.488-05:00   File "/opt/python3.6/lib/python3.6/multiprocessing/connection.py", line 383, in _recv

2020-10-01T15:55:22.488-05:00   raise EOFError

2020-10-01T15:55:22.488-05:00   EOFError

Now I am aware that my main process is closing before my multi process could write back to results however I am unable to resolve it.

所以这似乎不是输入文件大小的问题,而是需要处理的更大的文件会花费更多时间,而且您是否愿意提前终止主程序?或者出于某种原因,您选择在处理完成之前退出主进程。听起来您需要一种额外的适当取消形式来停止工作人员,然后再继续前进并退出。

I am already gracefully closing my process using p.join().

Process.join() 并不是主进程的正常关闭。它只是意味着特定范围处于阻塞状态,直到您的工作进程列表选择终止。如果出于任何原因你用 KeyboardInterrupt 杀死你的应用程序或告诉你的主线程退出而这是 运行 在另一个线程中,你的主线程将终止并且你的子进程将在尝试读取时遇到 EOF从父流程获取更多工作项。

主进程和子进程的实现方式是这样设置的,即所有工作进程都将被发送到队列中的 None 值以指示他们退出,然后解除所有的阻塞Process.join() 来电主要。如果您没有在此之前向每个工作人员发送 None 并退出您的 main,您可能会遇到 EOF 问题,因为工作人员尚未停止。

I tried putting p.close() before p.join() but then it gives me error: "'Process' object has no attribute 'close'".

https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.Process.close
“版本 3.7 中的新功能。”

这意味着您 运行 的 python 版本低于 3.7。但是如果您的主进程要提前退出,您可以调用 terminatekill。最好停止向您的工作人员发送线路,并发送最终的 None 值让工作人员优雅地停止,然后使用 join() 调用等待他们。

with open(file_name) as f:
    num_lines = 0
    iters = itertools.chain(f, (None,) * 10)
    for num_and_line in enumerate(iters):
        work.put(num_and_line)
            num_lines += 1

此代码块在每一行上迭代,将其发送到队列,最后为每个工作人员发送一个 None 值(在本例中为 10)。如果您决定要取消工作,那么您需要停止发送行,而是发送 10 None 个值,然后中断。

如需更多详细信息,您需要描述您的取消情况。