Concurrency::parallel_for 中的 Pybind11 并行处理问题

Pybind11 Parallel-Processing Issue in Concurrency::parallel_for

我有一个 python 代码可以在矩阵上执行 filtering。我已经使用 pybind11 创建了一个以序列化方式成功运行的 C++ 接口(请参阅下面的代码)。

我正在尝试使其并行处理,以期与其序列化版本相比减少计算时间。为此,我将大小为 M×N 的数组拆分为三个大小为 M×(N/3) 的子矩阵,以便使用相同的接口并行处理它们。

我使用 ppl.h 库制作了一个并行 for 循环,并在每个循环中对大小为 M×(N/3) 的子矩阵调用 python 函数。

#include <iostream>
#include <ppl.h>

#include "pybind11/embed.h"
#include <pybind11/iostream.h>
#include <pybind11/stl_bind.h>
#include "pybind11/eigen.h"
#include "pybind11/stl.h"
#include "pybind11/numpy.h"
#include "pybind11/functional.h"
#include <Eigen/Dense>

namespace py = pybind11;

class myClass
{
public:
    myClass()
    {
        m_module = py::module::import("myFilterScript");
        m_handle = m_module.attr("medianFilter");
    };

    void medianFilterSerialized(Eigen::Ref<Eigen::MatrixXf> input, int windowSize) 
    {
        Eigen::MatrixXf output;
        output.resizeLike(input);
        output = m_handle(input, windowSize).cast<Eigen::MatrixXf>();
    };

    void medianFilterParallelizedUsingPPL(Eigen::Ref<Eigen::MatrixXf> input, int windowSize) 
    {
        Eigen::MatrixXf output;
        output.resizeLike(input);
        /* Acquire GIL before calling Python code */
        //py::gil_scoped_acquire acquire;
        Concurrency::parallel_for(size_t(0), size_t(3), [&](size_t i)
        {
            output.block(0, i * input.cols() / 3, input.rows(), input.cols() / 3) = m_handle(input.block(0, i * input.cols() / 3, input.rows(), input.cols() / 3).array(), windowSize).cast<Eigen::MatrixXf>();
        });
        //py::gil_scoped_release release;
    };

private:
    py::scoped_interpreter m_guard;
    py::module m_module;
    py::handle m_handle;
    py::object m_object;
};


int main()
{
    myClass c;

    Eigen::MatrixXf input = Eigen::MatrixXf::Random(240, 120);

    c.medianFilterSerialized(input, 3); 
    c.medianFilterParallelizedUsingPPL(input, 3);

    return 0;
}

myFilterScript.py:

import threading
import numpy as np
import bottleneck as bn # can be installed from https://pypi.org/project/Bottleneck/

def medianFilter(input, windowSize):
    return bn.move_median(input, window=windowSize, axis=0)

无论是否使用 py::gil_scoped_acquire 我的代码在到达 for 循环时崩溃:

Access violation reading location // or:
Unhandled exception at 0x00007FF98BB8DB8E (ucrtbase.dll) in Pybind11_Parallelizing.exe: Fatal program exit requested.

有人可以帮助我理解 python 模块的加载函数是否可以以多处理或多线程方式并行调用?我的代码中缺少什么?请告诉我。提前致谢。

py::gil_scoped_acquire是一个RAII对象,用于在一个范围内获取GIL,类似地,py::gil_scoped_release在一个"inverse" RAII中,用于释放一个范围内的GIL。因此,在相关范围内,你只需要前者。

获取 GIL 的范围在调用 Python 的函数上,因此在您传递给 parallel_for 的 lambda 中:每个执行的线程都需要持有 GIL 以访问任何Python 个对象或 API,在本例中为 m_handle。然而,在 lambda 中这样做会完全序列化代码,使线程的使用没有实际意义,因此它会出于错误的原因解决你的问题。

这将是使用 pybind11 中没有直接支持的子解释器的情况 (https://pybind11.readthedocs.io/en/stable/advanced/embedding.html#sub-interpreter-support), so the C API would be the ticket (https://docs.python.org/3/c-api/init.html#c.Py_NewInterpreter)。重点是操作的数据是非 Python 并且所有操作原则上都是独立的。

但是,您需要知道 Bottleneck 是否是线程安全的。粗略地看,它似乎没有 global/static 数据 AFAICT。从理论上讲,还有一些并行化的空间:当调用 move_median 进入用于绑定 Bottleneck 的 Cython 代码时,您需要保持 GIL(它取消装箱变量,从而调用 Python API),那么 Cython 可以在输入 Bottleneck 的 C 代码时释放 GIL,并在退出时重新获取,然后在 RAII 作用域结束时在 lambda 中释放。然后 C 代码并行运行。

但问题就变成了:为什么首先要通过 Python 绑定从 C++ 调用 C 库?这里似乎是一个简单的解决方案:跳过 Python 并直接调用 move_median C 函数。