使用 HCFS 读取 JSON-newline 文件

Read JSON-newline file with HCFS

如何使 Flink HCFS 连接器从 Google Cloud Storage 中以类似 **/*S0.json 的模式读取,其中文件包含换行符分隔的 JSON 数据?

这些文件包含类似

的内容
{"message": "Hello world", "timestamp": 1556655155}
{"message": "Goodbye world", "timestamp": 1556655170}

在 GCS UI 中,它看起来像这样:

跟进

JSON file from HCFS as a plain text you can map it to JSONObject using custom mapper之后:

import org.apache.flink.api.java.DataSet;
import org.apache.sling.commons.json.JSONObject;

DataSet<JSONObject> jsonInput = 
    input
        .map(record -> record.f1.toString())
        .map(StringToJsonObject::new);

JSONObject 基于上面链接示例的映射器代码:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.sling.commons.json.JSONObject;

public class StringToJsonObject implements MapFunction<String, JSONObject> {
    private static final long serialVersionUID = 4573928723585302447L;

    public JSONObject map(String content) throws Exception {
        return new JSONObject(content);
    }
}

如有必要,您可以使用类似于 this one.

的映射器将 String 映射到 POJO 而不是通用 JSONObject