在 Apache Flink 中从 HDFS 地址流式传输文件

Streaming a File From HDFS Address in Apache Flink

在我的 Flink 代码中,我正在流式传输位于 HDFS 文件夹中的文件,我收到错误“(没有这样的文件或目录)”,但是我确定文件名和地址是正确的,因为我使用在批处理方法中也是如此,每件事都进行得很顺利。 有谁知道可能是什么问题? 这是我的代码:

DataStream<FebrlObject> myStream = 
env.addSource(new MyObjectGenerator("hdfs://../Data/Dataset1.csv"));

及其相关的class:

public class MyObjectGenerator implements SourceFunction<MyObject> {

    private String dataFilePath;
    private float servingSpeedFactor;
    private Integer rowNo ; 
    private transient BufferedReader reader;
    private transient InputStream inputStream;

    public MyObjectGenerator(String dataFilePath) {
        this(dataFilePath, 1.0f);
    }

    public MyObjectGenerator(String dataFilePath, float servingSpeedFactor) {
        this.dataFilePath = dataFilePath;
        this.servingSpeedFactor = servingSpeedFactor;
        rowNo = 0 ;
    }

    @Override
    public void run(SourceContext<MyObject> sourceContext) throws Exception {
        long servingStartTime = Calendar.getInstance().getTimeInMillis();
        inputStream = new DataInputStream(new FileInputStream(dataFilePath));
        reader = new BufferedReader(new InputStreamReader(inputStream));
        String line;
        long dataStartTime;
        rowNo++;
        if (reader.ready() && (line = reader.readLine()) != null ) {
            MyObject myObject = MyObject.fromString(line);
            if (febrlObject!= null )
            sourceContext.collect(myObject);
        } else {
            return;
        }
        while (reader.ready() && (line = reader.readLine()) != null) {
            MyObject myObject = MyObject.fromString(line);
            sourceContext.collect( febrlObject );
        }
        this.reader.close();
        this.reader = null;
        this.inputStream.close();
        this.inputStream = null;
    }

    @Override
    public void cancel() {
        try {
            if (this.reader != null) {
                this.reader.close();
            }
            if( this.inputStream != null) {
                this.inputStream.close();
            }
        } catch (IOException ioe) {
            //
        } finally {
            this.reader = null;
            this.inputStream = null;
        }
    }
}

您尝试使用 Java 的常规 FileInputStream 访问 HDFS 中的文件。 FileInputStream can only access the local file system. It does not know anything about talking to HDFS. You need to use the HDFS client to read files from HDFS. See Flink'sFileInputFormat` 为例。

但是,如果可能的话,我会尽量避免自己实施。您可以尝试使用 Flink 的 FileInputFormat 逐行读取文件 (returns a DataStream<String>) 和一个连续的(平面)映射器来解析该行。