Flink 对单个源使用多个数据 类

Flink use multiple data classes for a single source

一些代码:

implicit val formats = Serialization.formats(NoTypeHints)

case class DataClass(id: String, name: String)    

val dataSource = env
      .addSource(new FlinkKinesisConsumer[String](s"data-stream-$stage", new SimpleStringSchema, consumerConfig))
      .uid(s"data-stream-$stage-source-id").name("dataSource")
      .map(json => read[DataClass](json))

这里我从运动流中获取数据并序列化到我的数据中 class。 一切正常,但现在需要添加以另一种格式接收数据的能力(例如 DataClassSecond

选项之一,添加额外的数据源并在您自己的流中处理它们。

但这需要额外的运动队列。我不确定这是否是一个好方法 有什么方法可以从运动中接收不同的数据,然后根据类型拆分流吗?

您可以尝试 filter 基于字段的 DataStream[String],这样您将获得两个或更多只包含具有正确 JSON 格式的元素的流。

所以最简单的方法是:

val streamDataClass = sourceStream.filter(_.contains("name"))
val streamDataClassSecond = sourceStream.filter(_.contains("surname"))

这仅在 namesurname 对每个 DataClass 都是唯一的情况下才有效。更有效的做法可能是首先 mapDataStream 转换为某种通用格式,或者使用 Either 之类的东西作为反序列化结果,然后检查它是否成功。