flink 1.2下沉kafka流报错
Error in sinking kafka stream in flink 1.2
我所做的是以 json 格式读取来自 kafka 的消息。例如
{"a":1,"b":2}
然后我对这条消息应用了一个过滤器,确保a对应的值为1,b的值为2。最后,我想将结果流输出到下游的kafka。但是,我不知道为什么编译器会说类型不匹配。
我的代码如下:
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONDeserializationSchema(),
params.getProperties)
val messageStream = env.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(jsonNode => jsonNode.get("a").asText.equals("1")
&& jsonNode.get("b").asText.equals("2"))
filteredStream.addSink(new FlinkKafkaProducer010[Object](params.getRequired("output-topic"), new SimpleStringSchema, params.getProperties))
我得到的错误如下图所示:
我参考flink kafka connector文档写kafka outstream代码:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html
您有 ObjectNode
类型的流 DataStream
,因此您需要提供 FlinkKafkaProducer010[ObjectNode]
,例如:
stream1.addSink(new FlinkKafkaProducer010[ObjectNode](params.getRequired("output-topic"), new SerializationSchema[ObjectNode] {
override def serialize(element: ObjectNode): Array[Byte] = ???
} ), params.getProperties)
java 中的所有泛型在类型上都是不变的,这就是为什么你不能只传递 FlinkKafkaProducer010[Object]
。
您可能会进一步遇到的另一个问题是您还需要提供 SerializationSchema[ObjectNode]
而 SimpleStringSchema
实现 SerializationSchema[String]
.
除了@Dawid 已经指出的内容之外,您还可以为 ObjectNode 提供序列化模式(假设它是一个 POJO,因为我还没有针对其他对象测试它),如下所示:
TypeInformation<ObjectNode> typeInfo =
TypeInformation.of(new TypeHint<ObjectNode>() {});
TypeInformationSerializationSchema<ObjectNode> serdeSchema =
new TypeInformationSerializationSchema<>(typeInfo, env.getConfig());
然后使用 serdeschema 如下用于 KafkaPrducer 接收器:
FlinkKafkaProducer010<RecordReadEventType> kafkaSink =
new FlinkKafkaProducer010<>(
BOOTSTRAP_SERVERS,
"output-topic",
serdeSchema);
希望这能解决您的 kafka 接收器冲突问题。
我所做的是以 json 格式读取来自 kafka 的消息。例如
{"a":1,"b":2}
然后我对这条消息应用了一个过滤器,确保a对应的值为1,b的值为2。最后,我想将结果流输出到下游的kafka。但是,我不知道为什么编译器会说类型不匹配。
我的代码如下:
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONDeserializationSchema(),
params.getProperties)
val messageStream = env.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(jsonNode => jsonNode.get("a").asText.equals("1")
&& jsonNode.get("b").asText.equals("2"))
filteredStream.addSink(new FlinkKafkaProducer010[Object](params.getRequired("output-topic"), new SimpleStringSchema, params.getProperties))
我得到的错误如下图所示:
我参考flink kafka connector文档写kafka outstream代码: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html
您有 ObjectNode
类型的流 DataStream
,因此您需要提供 FlinkKafkaProducer010[ObjectNode]
,例如:
stream1.addSink(new FlinkKafkaProducer010[ObjectNode](params.getRequired("output-topic"), new SerializationSchema[ObjectNode] {
override def serialize(element: ObjectNode): Array[Byte] = ???
} ), params.getProperties)
java 中的所有泛型在类型上都是不变的,这就是为什么你不能只传递 FlinkKafkaProducer010[Object]
。
您可能会进一步遇到的另一个问题是您还需要提供 SerializationSchema[ObjectNode]
而 SimpleStringSchema
实现 SerializationSchema[String]
.
除了@Dawid 已经指出的内容之外,您还可以为 ObjectNode 提供序列化模式(假设它是一个 POJO,因为我还没有针对其他对象测试它),如下所示:
TypeInformation<ObjectNode> typeInfo =
TypeInformation.of(new TypeHint<ObjectNode>() {});
TypeInformationSerializationSchema<ObjectNode> serdeSchema =
new TypeInformationSerializationSchema<>(typeInfo, env.getConfig());
然后使用 serdeschema 如下用于 KafkaPrducer 接收器:
FlinkKafkaProducer010<RecordReadEventType> kafkaSink =
new FlinkKafkaProducer010<>(
BOOTSTRAP_SERVERS,
"output-topic",
serdeSchema);
希望这能解决您的 kafka 接收器冲突问题。