使用Flink时Kafka中json数据不清晰时如何反序列化
How to deserialize when json data in Kafka is not clear when using Flink
我想通过Flink计算Kafka中的数据,但是问题是Kafka中的JASON数据可能是可变的。
像这样:
{"data":{"template":25,"name":"name"}}
或者这个:
{"data"{"type":"type1","props":"props","strem":"stream1"}
而且我无法提前知道这个里面包含了多少数据JSON.So在使用Flink的时候有一个问题:
streamExecutionEnvironment.addSource(new FlinkKafkaConsumer010<KafkaEvent>("flink", new KafkaEventSchema(),
kafkaProps))
.flatMap(new KafkaEventToRow()).returns(getReturnType());
那么当Json这样的数据时,如何定义pojo类型和mapFuncation?
您必须定义一个更通用的反序列化架构,例如 Map
定义架构
class CustomSchema implements DeserializationSchema {
private ObjectMapper mapper = new ObjectMapper();
@Override
public Map<String,Object> deserialize(byte[] bytes) throws IOException {
Map<String,Object> t = null;
t = mapper.readValue(bytes, Map.class);
return t;
}
@Override
public boolean isEndOfStream(Object o) {
return false;
}
@Override
public TypeInformation<Map> getProducedType() {
return TypeInformation.of(new TypeHint<Map>() {
});
}
}
现在将其用作架构
streamExecutionEnvironment
.addSource(new FlinkKafkaConsumer010<KafkaEvent>("flink", new CustomSchema(),......
现在你得到了一个通用的地图,它又可以包含任何数据结构
我想通过Flink计算Kafka中的数据,但是问题是Kafka中的JASON数据可能是可变的。
像这样:
{"data":{"template":25,"name":"name"}}
或者这个:
{"data"{"type":"type1","props":"props","strem":"stream1"}
而且我无法提前知道这个里面包含了多少数据JSON.So在使用Flink的时候有一个问题:
streamExecutionEnvironment.addSource(new FlinkKafkaConsumer010<KafkaEvent>("flink", new KafkaEventSchema(),
kafkaProps))
.flatMap(new KafkaEventToRow()).returns(getReturnType());
那么当Json这样的数据时,如何定义pojo类型和mapFuncation?
您必须定义一个更通用的反序列化架构,例如 Map
定义架构
class CustomSchema implements DeserializationSchema {
private ObjectMapper mapper = new ObjectMapper();
@Override
public Map<String,Object> deserialize(byte[] bytes) throws IOException {
Map<String,Object> t = null;
t = mapper.readValue(bytes, Map.class);
return t;
}
@Override
public boolean isEndOfStream(Object o) {
return false;
}
@Override
public TypeInformation<Map> getProducedType() {
return TypeInformation.of(new TypeHint<Map>() {
});
}
}
现在将其用作架构
streamExecutionEnvironment
.addSource(new FlinkKafkaConsumer010<KafkaEvent>("flink", new CustomSchema(),......
现在你得到了一个通用的地图,它又可以包含任何数据结构