C++ 和 Python 之间的数据损坏管道

Data corruption Piping between C++ and Python

我正在编写一些代码,从 Python 获取二进制数据,将其通过管道传输到 C++,对数据进行一些处理(在本例中计算互信息度量),然后将结果通过管道传输回python。在测试时,我发现如果我发送的数据是一组 2 个尺寸小于 1500 X 1500 的数组,一切正常,但如果我发送 2 个 2K X 2K 的数组,我会得到很多损坏的废话。

我目前认为代码的算法部分很好,因为它在使用小型 (<=1500 X1500) 阵列进行测试期间提供了预期的答案。这使我相信这是 stdin 或 stdout 管道的问题。那也许我在某处超过了一些内在的限制。

下面是Python代码和C++代码。

Python代码:

import subprocess
import struct
import sys
import numpy as np

#set up the variables needed 
bytesPerDouble = 8
sizeX = 2000
sizeY = 2000
offset = sizeX*sizeY
totalBytesPerArray = sizeX*sizeY*bytesPerDouble
totalBytes = totalBytesPerArray*2                   #the 2 is because we pass 2 different versions of the 2D array

#setup the testing data array 
a = np.zeros(sizeX*sizeY*2, dtype='d')
for i in range(sizeX):
    for j in range(sizeY):
        a[j+i*sizeY] = i
        a[j+i*sizeY+offset] = i
        if i % 10 == 0:
            a[j+i*sizeY+offset] = j

data = a.tobytes('C')      

strTotalBytes = str(totalBytes)
strLineBytes  = str(sizeY*bytesPerDouble)

#communicate with c++ code
print("starting C++ code")     
command =   "C:\Python27\PythonPipes.exe"
proc = subprocess.Popen([command, strTotalBytes, strLineBytes, str(sizeY), str(sizeX)], stdin=subprocess.PIPE,stderr=subprocess.PIPE,stdout=subprocess.PIPE)

ByteBuffer = (data)
proc.stdin.write(ByteBuffer)

print("Reading results back from C++")
for i in range(sizeX):
    returnvalues = proc.stdout.read(sizeY*bytesPerDouble)
    a = buffer(returnvalues)
    b = struct.unpack_from(str(sizeY)+'d', a)
    print str(b) + " " + str(i)

print('done')

C++代码: 主要功能:

int main(int argc, char **argv) {
    int count = 0;
    long totalbytes = stoi(argv[argc-4], nullptr,10);       //bytes being transfered
    long bytechunk = stoi(argv[argc - 3], nullptr, 10); //bytes being transfered at a time
    long height = stoi(argv[argc-2], nullptr, 10);  //bytes being transfered at a time
    long width  = stoi(argv[argc-1], nullptr, 10);  //bytes being transfered at a time
    long offset = totalbytes / sizeof(double) / 2;


    data = new double[totalbytes/sizeof(double)];
    int columnindex = 0;
    //read in data from pipe
    while (count<totalbytes) {

        fread(&(data[columnindex]), 1, bytechunk, stdin);
        columnindex += bytechunk / sizeof(double);
        count += bytechunk;

    }


    //calculate the data transform
    MutualInformation MI = MutualInformation();
    MI.Initialize(data, height, width, offset);
    MI.calcMI();
    count = 0;
    //*
    //write out data to pipe
    columnindex = 0;
    while (count<totalbytes/2) {

        fwrite(&(MI.getOutput()[columnindex]), 1, bytechunk, stdout);
        fflush(stdout);
        count += bytechunk;
        columnindex += bytechunk/sizeof(double);
    }
    //*/
    delete [] data;

    return 0;
}

如果您需要实际处理代码:

double MutualInformation::calcMI(){
    double rvalue = 0.0;
    std::map<int, map<int, double>> lHistXY = map<int, map<int, double>>();
    std::map<int, double> lHistX = map<int, double>();
    std::map<int, double> lHistY = map<int, double>();
    typedef std::map<int, std::map<int, double>>::iterator HistXY_iter;
    typedef std::map<int, double>::iterator HistY_iter;

    //calculate Entropys and MI
    double MI = 0.0;
    double Hx = 0.0;
    double Hy = 0.0;
    double Px = 0.0;
    double Py = 0.0;
    double Pxy = 0.0;

    //scan through the image
    int ip = 0;
    int jp = 0;
    int chipsize = 3;

    //setup zero array
    double * zeros = new double[this->mHeight];
    for (int j = 0; j < this->mHeight; j++){
        zeros[j] = 0.0;
    }

    //zero out Output array
    for (int i = 0; i < this->mWidth; i++){
        memcpy(&(this->mOutput[i*this->mHeight]), zeros, this->mHeight*8);
    }


    double index = 0.0;
    for (int ioutter = chipsize; ioutter < (this->mWidth - chipsize); ioutter++){
        //write out processing status
        //index = (double)ioutter;
        //fwrite(&index, 8, 1, stdout);
        //fflush(stdout);
        //*
        for (int j = chipsize; j < (this->mHeight - chipsize); j++){

            //clear the histograms
            lHistX.clear();
            lHistY.clear();
            lHistXY.clear();
            //chip out a section of the image
            for (int k = -chipsize; k <= chipsize; k++){
                for (int l = -chipsize; l <= chipsize; l++){
                    ip = ioutter + k;
                    jp = j + l;
                    //update X histogram
                    if (lHistX.count(int(this->mData[ip*this->mHeight + jp]))){
                        lHistX[int(this->mData[ip*this->mHeight + jp])] += 1.0;
                    }else{
                        lHistX[int(this->mData[ip*this->mHeight + jp])] = 1.0;

                    }
                    //update Y histogram
                    if (lHistY.count(int(this->mData[ip*this->mHeight + jp+this->mOffset]))){
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] += 1.0;
                    }
                    else{
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] = 1.0;

                    }

                    //update X and Y Histogram
                    if (lHistXY.count(int(this->mData[ip*this->mHeight + jp]))){ 
                        //X Key exists check if Y key exists
                        if (lHistXY[int(this->mData[ip*this->mHeight + jp])].count(int(this->mData[ip*this->mHeight + jp + this->mOffset]))){
                            //X & Y keys exist
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] += 1;
                        }else{
                            //X exist but Y doesn't
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                        }
                    }else{
                        //X Key Didn't exist
                        lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                    };
                }
            }

            //calculate PMI, Hx, Hy
            // iterator->first = key
            // iterator->second = value

             MI = 0.0;
             Hx = 0.0;
             Hy = 0.0;

            for (HistXY_iter Hist2D_iter = lHistXY.begin(); Hist2D_iter != lHistXY.end(); Hist2D_iter++) {

                Px = lHistX[Hist2D_iter->first] / ((double) this->mOffset);
                Hx -= Px*log(Px);

                for (HistY_iter HistY_iter = Hist2D_iter->second.begin(); HistY_iter != Hist2D_iter->second.end(); HistY_iter++) {
                    Py = lHistY[HistY_iter->first] / ((double) this->mOffset);
                    Hy -= Py*log(Py);
                    Pxy = HistY_iter->second / ((double) this->mOffset);
                    MI += Pxy*log(Pxy / Py / Px);
                }
            }

            //normalize PMI to max(Hx,Hy) so that the PMI value runs from 0 to 1
            if (Hx >= Hy && Hx > 0.0){
                MI /= Hx;
            }else if(Hy > Hx && Hy > 0.0){
                MI /= Hy;
            }
            else{
                MI = 0.0;
            }

            //write PMI to data output array
            if (MI < 1.1){
                this->mOutput[ioutter*this->mHeight + j] = MI;
            }
            else{
                this->mOutput[ioutter*this->mHeight + j] = 0.0;

            }

        }



    }

    return rvalue;
}

使用 return 有意义的数组,我得到的输出范围在 0 和 1 之间,如下所示:

(0.0, 0.0, 0.0, 0.7160627908692593, 0.6376472316395495, 0.5728801401524277,...

对于 2Kx2K 或更高的数组,我会像这样胡说八道(即使代码将值限制在 0 和 1 之间):

(-2.2491400820412374e+228, -2.2491400820412374e+228, -2.2491400820412374e+228, -2.2491400820412374e+228, -2.2491400820+420=228,...

我想知道为什么这段代码在分配到 0.0 和 1 之间后会破坏数据集,这是否是管道问题、stdin/stdout 问题、一些缓冲区问题排序,或者我根本没有看到的编码问题。

Update 我尝试使用 Chris 建议的代码以较小的块传递数据,但没有成功。还要注意的是,我在 stdout 上添加了一个用于 ferror 的 catch,它从未被触发,所以我很确定这些字节至少可以到达 stdout。是否有其他东西以某种方式写入标准输出?当我的程序是 运行 时,可能会有一个额外的字节进入标准输出?我觉得这值得怀疑,因为在第 10 个条目中的第 4 个 fwrite 读取中始终出现错误。

根据 Craig 的要求,这里是完整的 C++ 代码(完整的 Python 代码已经发布):它位于 3 个文件中:

main.cpp

#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <iostream>
#include "./MutualInformation.h"

double * data;
using namespace std;

void
xxwrite(unsigned char *buf, size_t wlen, FILE *fo)
{
    size_t xlen;

    for (; wlen > 0; wlen -= xlen, buf += xlen) {
        xlen = wlen;
        if (xlen > 1024)
            xlen = 1024;
        xlen = fwrite(buf, 1, xlen, fo);
        fflush(fo);
    }
}

int main(int argc, char **argv) {
    int count = 0;
    long totalbytes = stoi(argv[argc-4], nullptr,10);       //bytes being transfered
    long bytechunk = stoi(argv[argc - 3], nullptr, 10); //bytes being transfered at a time
    long height = stoi(argv[argc-2], nullptr, 10);  //bytes being transfered at a time
    long width  = stoi(argv[argc-1], nullptr, 10);  //bytes being transfered at a time
    long offset = totalbytes / sizeof(double) / 2;


    data = new double[totalbytes/sizeof(double)];
    int columnindex = 0;
    //read in data from pipe
    while (count<totalbytes) {

        fread(&(data[columnindex]), 1, bytechunk, stdin);
        columnindex += bytechunk / sizeof(double);
        count += bytechunk;

    }


    //calculate the data transform
    MutualInformation MI = MutualInformation();
    MI.Initialize(data, height, width, offset);
    MI.calcMI();
    count = 0;

    columnindex = 0;
    while (count<totalbytes/2) {

        xxwrite((unsigned char*)&(MI.getOutput()[columnindex]),  bytechunk, stdout);
        count += bytechunk;
        columnindex += bytechunk/sizeof(double);
    }
    delete [] data;

    return 0;
}

MutualInformation.h

#include <map>

using namespace std;

class MutualInformation
{
private:
    double * mData;
    double * mOutput;
    long mHeight;
    long mWidth;
    long mOffset;

public:
    MutualInformation();
    ~MutualInformation();
    bool Initialize(double * data, long Height, long Width, long Offset);
    const double * getOutput();

    double calcMI();

};

MutualInformation.cpp

#include "MutualInformation.h"


MutualInformation::MutualInformation()
{
    this->mData = nullptr;
    this->mOutput = nullptr;
    this->mHeight = 0;
    this->mWidth = 0;

}


MutualInformation::~MutualInformation()
{
    delete[] this->mOutput;
}

bool MutualInformation::Initialize(double * data, long Height, long Width, long Offset){
    bool rvalue = false;
    this->mData = data;
    this->mHeight = Height;
    this->mWidth = Width;
    this->mOffset = Offset;


    //allocate output data
    this->mOutput = new double[this->mHeight*this->mWidth];

    return rvalue;
}

const double * MutualInformation::getOutput(){
    return this->mOutput;
}


double MutualInformation::calcMI(){
    double rvalue = 0.0;
    std::map<int, map<int, double>> lHistXY = map<int, map<int, double>>();
    std::map<int, double> lHistX = map<int, double>();
    std::map<int, double> lHistY = map<int, double>();
    typedef std::map<int, std::map<int, double>>::iterator HistXY_iter;
    typedef std::map<int, double>::iterator HistY_iter;

    //calculate Entropys and MI
    double MI = 0.0;
    double Hx = 0.0;
    double Hy = 0.0;
    double Px = 0.0;
    double Py = 0.0;
    double Pxy = 0.0;

    //scan through the image
    int ip = 0;
    int jp = 0;
    int chipsize = 3;

    //setup zero array
    double * zeros = new double[this->mHeight];
    for (int j = 0; j < this->mHeight; j++){
        zeros[j] = 0.0;
    }

    //zero out Output array
    for (int i = 0; i < this->mWidth; i++){
        memcpy(&(this->mOutput[i*this->mHeight]), zeros, this->mHeight*8);
    }


    double index = 0.0;
    for (int ioutter = chipsize; ioutter < (this->mWidth - chipsize); ioutter++){

        for (int j = chipsize; j < (this->mHeight - chipsize); j++){

            //clear the histograms
            lHistX.clear();
            lHistY.clear();
            lHistXY.clear();
            //chip out a section of the image
            for (int k = -chipsize; k <= chipsize; k++){
                for (int l = -chipsize; l <= chipsize; l++){
                    ip = ioutter + k;
                    jp = j + l;
                    //update X histogram
                    if (lHistX.count(int(this->mData[ip*this->mHeight + jp]))){
                        lHistX[int(this->mData[ip*this->mHeight + jp])] += 1.0;
                    }else{
                        lHistX[int(this->mData[ip*this->mHeight + jp])] = 1.0;

                    }
                    //update Y histogram
                    if (lHistY.count(int(this->mData[ip*this->mHeight + jp+this->mOffset]))){
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] += 1.0;
                    }
                    else{
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] = 1.0;

                    }

                    //update X and Y Histogram
                    if (lHistXY.count(int(this->mData[ip*this->mHeight + jp]))){ 
                        //X Key exists check if Y key exists
                        if (lHistXY[int(this->mData[ip*this->mHeight + jp])].count(int(this->mData[ip*this->mHeight + jp + this->mOffset]))){
                            //X & Y keys exist
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] += 1;
                        }else{
                            //X exist but Y doesn't
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                        }
                    }else{
                        //X Key Didn't exist
                        lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                    };
                }
            }

            //calculate PMI, Hx, Hy
            // iterator->first = key
            // iterator->second = value

             MI = 0.0;
             Hx = 0.0;
             Hy = 0.0;

            for (HistXY_iter Hist2D_iter = lHistXY.begin(); Hist2D_iter != lHistXY.end(); Hist2D_iter++) {

                Px = lHistX[Hist2D_iter->first] / ((double) this->mOffset);
                Hx -= Px*log(Px);

                for (HistY_iter HistY_iter = Hist2D_iter->second.begin(); HistY_iter != Hist2D_iter->second.end(); HistY_iter++) {
                    Py = lHistY[HistY_iter->first] / ((double) this->mOffset);
                    Hy -= Py*log(Py);
                    Pxy = HistY_iter->second / ((double) this->mOffset);
                    MI += Pxy*log(Pxy / Py / Px);
                }
            }

            //normalize PMI to max(Hx,Hy) so that the PMI value runs from 0 to 1
            if (Hx >= Hy && Hx > 0.0){
                MI /= Hx;
            }else if(Hy > Hx && Hy > 0.0){
                MI /= Hy;
            }
            else{
                MI = 0.0;
            }

            //write PMI to data output array
            if (MI < 1.1){
                this->mOutput[ioutter*this->mHeight + j] = MI;
            }
            else{
                this->mOutput[ioutter*this->mHeight + j] = 0.0;
                //cout << "problem with output";
            }

        }



    }



    //*/
    return rvalue;
}

由 6502

解决

6502 下面的回答解决了我的问题。我需要明确告诉 Windows 对标准输入/标准输出使用二进制模式。为此,我必须在我的主 cpp 文件中包含 2 个新的头文件。

#include <fcntl.h>
#include <io.h>

将以下代码行(由于 Visual Studio 抱怨而从 6502 的 POSIX 版本中修改)添加到我的主要函数的开头

_setmode(_fileno(stdout), O_BINARY);
_setmode(_fileno(stdin), O_BINARY);

然后将这些行添加到我的 Python 代码中:

import os, msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)

您的 C++ fwrite 代码没有考虑到 "short" t运行sfer.

这里有一个小调整:

//write out data to pipe
columnindex = 0;
while (count < totalbytes / 2) {
    wlen = fwrite(&(MI.getOutput()[columnindex]), 1, bytechunk, stdout);
    fflush(stdout);
    count += wlen;
    columnindex += wlen / sizeof(double);
}

注意:你仍然需要小心,因为如果wlenreturns并且它不是 sizeof(double) 的倍数。例如,如果 bytechunk 是 16 而 wlen 返回 14,则在继续循环之前您需要一个长度为 2 的额外 fwrite。对此的概括就是将 整个 数据矩阵视为一个巨大的字节缓冲区并在其上循环。

实际上,您将获得与许多小得多的 t运行sfer 大致相同的效率,这些 t运行sfer 的上限为 [say] 1024 字节的固定(即 "known safe amount")。这是可行的,因为输出是一个字节流。

这是我经常使用的稍微更通用的解决方案:

void
xxwrite(void *buf,size_t wlen,FILE *fo)
{
    size_t xlen;

    for (;  wlen > 0;  wlen -= xlen, buf += xlen) {
        xlen = wlen;
        if (xlen > 1024)
            xlen = 1024;
        xlen = fwrite(buf,1,xlen,fo);
        fflush(fo);
    }
}

//write out data to pipe
columnindex = 0;
while (count < totalbytes / 2) {
    xxwrite(&(MI.getOutput()[columnindex]), bytechunk, stdout);
    count += bytechunk;
    columnindex += bytechunk / sizeof(double);
}

更新:

我已经下载了你所有的代码并且运行它。我有好消息和坏消息:代码 运行 在这里很好,即使矩阵大小超过 3000。我 运行 它既使用 xxwrite 也没有使用 xxwrite 结果是一样。

使用我的 limited python 技能,我向您的 python 脚本添加了一些漂亮的打印(例如一些换行)并让它检查每个值对于 运行ge 并注释任何错误的值。脚本找到 none。此外,对这些值的目视检查没有发现任何结果 [在漂亮的打印之前这是真的,所以它没有引入任何东西]。只有很多零,然后是 0.9 运行ge.

中的块

我能看到的唯一区别是我在linux上使用gcc[当然还有python] .但是,从您的脚本看来,您使用 Windows [基于 C++ 可执行文件的 C:\... 路径。这个不应该对这个应用程序很重要,但我还是提到了它。

所以,管道在这里工作。您可能会尝试的一件事是将 C++ 输出定向到一个文件。然后,让脚本从文件中读回(即没有管道)并查看是否有所不同。我倾向于认为不是,但是...

此外,我不知道您在 Windows 下使用的是什么编译器和 python 实现。每当我必须这样做时,我通常都会安装 Cygwin,因为它提供了最接近 linux/Unix-like 环境的实现之一(即管道更有可能像宣传的那样工作)。

无论如何,这是修改后的脚本。另请注意,我添加了 os.getenv 以获取备用矩阵大小和 C++ 可执行文件的备用位置,以便它对我们双方都适用,而且痛苦最小

#!/usr/bin/python

import subprocess
import struct
import sys
import os
import numpy as np

val = os.getenv("MTX","2000")
sizeX = int(val)
sizeY = sizeX
print "sizeX=%d sizeY=%d" % (sizeX,sizeY)

#set up the variables needed
bytesPerDouble = 8
offset = sizeX*sizeY
totalBytesPerArray = sizeX*sizeY*bytesPerDouble
totalBytes = totalBytesPerArray*2                   #the 2 is because we pass 2 different versions of the 2D array

#setup the testing data array
a = np.zeros(sizeX*sizeY*2, dtype='d')
for i in range(sizeX):
    for j in range(sizeY):
        a[j+i*sizeY] = i
        a[j+i*sizeY+offset] = i
        if i % 10 == 0:
            a[j+i*sizeY+offset] = j

data = a.tobytes('C')

strTotalBytes = str(totalBytes)
strLineBytes  = str(sizeY*bytesPerDouble)

#communicate with c++ code
print("starting C++ code")

command = os.getenv("CPGM",None);
if command is None:
    command =   "C:\Python27\PythonPipes.exe"

proc = subprocess.Popen([command, strTotalBytes, strLineBytes, str(sizeY), str(sizeX)], stdin=subprocess.PIPE,stderr=subprocess.PIPE,stdout=subprocess.PIPE)

ByteBuffer = (data)
proc.stdin.write(ByteBuffer)

def prt(i,b):

    hangflg = 0
    per = 8

    for j in range(0,len(b)):
        if ((j % per) == 0):
            print("[%d,%d]" % (i,j)),

        q = b[j]
        print(q),
        hangflg = 1

        if (q < 0.0) or (q > 1.0):
            print("=WTF"),

        if ((j % per) == (per - 1)):
            print("")
            hangflg = 0

    if (hangflg):
        print("")

print("Reading results back from C++")
for i in range(sizeX):
    returnvalues = proc.stdout.read(sizeY*bytesPerDouble)
    a = buffer(returnvalues)
    b = struct.unpack_from(str(sizeY)+'d', a)
    prt(i,b)
    ###print str(b) + " " + str(i)
    ###print str(i) + ": " + str(b)

print('done')

问题是 windows 中的 stdin/stdout 是以文本模式打开的,而不是以二进制模式打开的,因此当字符 13 (\r) 已发送。

您可以在 Python 和

中设置例如二进制模式
import os, msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)

在 C++ 中

_setmode(fileno(stdout), O_BINARY);
_setmode(fileno(stdin), O_BINARY);

https://msdn.microsoft.com/en-us/library/tw4k6df8.aspx