使用 HCFS 读取 JSON-newline 文件
Read JSON-newline file with HCFS
如何使 Flink HCFS 连接器从 Google Cloud Storage 中以类似 **/*S0.json
的模式读取,其中文件包含换行符分隔的 JSON 数据?
这些文件包含类似
的内容
{"message": "Hello world", "timestamp": 1556655155}
{"message": "Goodbye world", "timestamp": 1556655170}
在 GCS UI 中,它看起来像这样:
跟进
JSON file from HCFS as a plain text you can map it to JSONObject
using custom mapper之后:
import org.apache.flink.api.java.DataSet;
import org.apache.sling.commons.json.JSONObject;
DataSet<JSONObject> jsonInput =
input
.map(record -> record.f1.toString())
.map(StringToJsonObject::new);
JSONObject
基于上面链接示例的映射器代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.sling.commons.json.JSONObject;
public class StringToJsonObject implements MapFunction<String, JSONObject> {
private static final long serialVersionUID = 4573928723585302447L;
public JSONObject map(String content) throws Exception {
return new JSONObject(content);
}
}
如有必要,您可以使用类似于 this one.
的映射器将 String
映射到 POJO 而不是通用 JSONObject
如何使 Flink HCFS 连接器从 Google Cloud Storage 中以类似 **/*S0.json
的模式读取,其中文件包含换行符分隔的 JSON 数据?
这些文件包含类似
的内容{"message": "Hello world", "timestamp": 1556655155}
{"message": "Goodbye world", "timestamp": 1556655170}
在 GCS UI 中,它看起来像这样:
跟进
JSONObject
using custom mapper之后:
import org.apache.flink.api.java.DataSet;
import org.apache.sling.commons.json.JSONObject;
DataSet<JSONObject> jsonInput =
input
.map(record -> record.f1.toString())
.map(StringToJsonObject::new);
JSONObject
基于上面链接示例的映射器代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.sling.commons.json.JSONObject;
public class StringToJsonObject implements MapFunction<String, JSONObject> {
private static final long serialVersionUID = 4573928723585302447L;
public JSONObject map(String content) throws Exception {
return new JSONObject(content);
}
}
如有必要,您可以使用类似于 this one.
的映射器将String
映射到 POJO 而不是通用 JSONObject