无法在队列中放入 () + get() 更大的数据帧

Unable to put() + get() larger DataFrames in a Queue

下面的代码模拟了我面临的多处理问题。

有两个函数 - f1f2 - return (pandas) dataframes with n 行到调用函数 run_fns(n)。两个函数要运行并行

该代码适用于 n 的较小值(例如 n <= 700),但对于 n 的较大值(说 n >= 7000).

我尝试使用 Queue([maxsize]) 调用 Queue 并使用各种 maxsize 值,包括默认值、0、-1 和许多其他大小的数字都没有改变这种行为。

非常欢迎任何解决方案、变通方法或替代方法。我还有一个次要问题:我真的需要包括

if __name__ == "__main__":

在什么地方?如果有,在哪里?

代码: f1returns n行3列,f2returns n行5列。数据帧是用随机生成的整数构建的。

import numpy as np
import pandas as pd
from multiprocessing import Process, Queue


def run_fns(n):
    """Run p1 and p2 in parallel, and get the returned dataframes."""
    q1 = Queue()
    q2 = Queue()
    p1 = Process(target=f1, args=(n, q1))
    p2 = Process(target=f2, args=(n, q2))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    df1 = q1.get()
    df2 = q2.get()
    return df1, df2


def f1(n, q):
    """Create a dataframe with n rows and 3 columns."""
    df = pd.DataFrame(np.random.randint(n * 3, size=(n, 3)))
    q.put(df)


def f2(n, q):
    """Create a dataframe with n rows and 5 columns."""
    df = pd.DataFrame(np.random.randint(n * 5, size=(n, 5)))
    q.put(df)

您遇到了多处理 programming guidelines 中记录的典型问题。

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate.

您需要确保在加入流程之前获取数据。

# start the processes
p1.start()
p2.start()
# drain the queues
df1 = q1.get()
df2 = q2.get()
# then join the queues
p1.join()
p2.join()

return df1, df2