flink 1.2下沉kafka流报错

Error in sinking kafka stream in flink 1.2

我所做的是以 json 格式读取来自 kafka 的消息。例如

{"a":1,"b":2}

然后我对这条消息应用了一个过滤器,确保a对应的值为1,b的值为2。最后,我想将结果流输出到下游的kafka。但是,我不知道为什么编译器会说类型不匹配。

我的代码如下:

val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONDeserializationSchema(),
params.getProperties)

val messageStream = env.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(jsonNode => jsonNode.get("a").asText.equals("1")
                      && jsonNode.get("b").asText.equals("2"))

filteredStream.addSink(new FlinkKafkaProducer010[Object](params.getRequired("output-topic"), new SimpleStringSchema, params.getProperties))

我得到的错误如下图所示:

我参考flink kafka connector文档写kafka outstream代码: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html

您有 ObjectNode 类型的流 DataStream,因此您需要提供 FlinkKafkaProducer010[ObjectNode],例如:

stream1.addSink(new FlinkKafkaProducer010[ObjectNode](params.getRequired("output-topic"), new SerializationSchema[ObjectNode] {
  override def serialize(element: ObjectNode): Array[Byte] = ???
} ), params.getProperties) 

java 中的所有泛型在类型上都是不变的,这就是为什么你不能只传递 FlinkKafkaProducer010[Object]

您可能会进一步遇到的另一个问题是您还需要提供 SerializationSchema[ObjectNode]SimpleStringSchema 实现 SerializationSchema[String].

除了@Dawid 已经指出的内容之外,您还可以为 ObjectNode 提供序列化模式(假设它是一个 POJO,因为我还没有针对其他对象测试它),如下所示:

TypeInformation<ObjectNode> typeInfo =
        TypeInformation.of(new TypeHint<ObjectNode>() {});
TypeInformationSerializationSchema<ObjectNode> serdeSchema =
        new TypeInformationSerializationSchema<>(typeInfo, env.getConfig());

然后使用 serdeschema 如下用于 KafkaPrducer 接收器:

FlinkKafkaProducer010<RecordReadEventType> kafkaSink =
                new FlinkKafkaProducer010<>(
                                BOOTSTRAP_SERVERS,
                                "output-topic",
                                serdeSchema);

希望这能解决您的 kafka 接收器冲突问题。