Flink 对单个源使用多个数据 类
Flink use multiple data classes for a single source
一些代码:
implicit val formats = Serialization.formats(NoTypeHints)
case class DataClass(id: String, name: String)
val dataSource = env
.addSource(new FlinkKinesisConsumer[String](s"data-stream-$stage", new SimpleStringSchema, consumerConfig))
.uid(s"data-stream-$stage-source-id").name("dataSource")
.map(json => read[DataClass](json))
这里我从运动流中获取数据并序列化到我的数据中 class。
一切正常,但现在需要添加以另一种格式接收数据的能力(例如 DataClassSecond
)
选项之一,添加额外的数据源并在您自己的流中处理它们。
但这需要额外的运动队列。我不确定这是否是一个好方法
有什么方法可以从运动中接收不同的数据,然后根据类型拆分流吗?
您可以尝试 filter
基于字段的 DataStream[String]
,这样您将获得两个或更多只包含具有正确 JSON 格式的元素的流。
所以最简单的方法是:
val streamDataClass = sourceStream.filter(_.contains("name"))
val streamDataClassSecond = sourceStream.filter(_.contains("surname"))
这仅在 name
和 surname
对每个 DataClass
都是唯一的情况下才有效。更有效的做法可能是首先 map
将 DataStream
转换为某种通用格式,或者使用 Either
之类的东西作为反序列化结果,然后检查它是否成功。
一些代码:
implicit val formats = Serialization.formats(NoTypeHints)
case class DataClass(id: String, name: String)
val dataSource = env
.addSource(new FlinkKinesisConsumer[String](s"data-stream-$stage", new SimpleStringSchema, consumerConfig))
.uid(s"data-stream-$stage-source-id").name("dataSource")
.map(json => read[DataClass](json))
这里我从运动流中获取数据并序列化到我的数据中 class。
一切正常,但现在需要添加以另一种格式接收数据的能力(例如 DataClassSecond
)
选项之一,添加额外的数据源并在您自己的流中处理它们。
但这需要额外的运动队列。我不确定这是否是一个好方法 有什么方法可以从运动中接收不同的数据,然后根据类型拆分流吗?
您可以尝试 filter
基于字段的 DataStream[String]
,这样您将获得两个或更多只包含具有正确 JSON 格式的元素的流。
所以最简单的方法是:
val streamDataClass = sourceStream.filter(_.contains("name"))
val streamDataClassSecond = sourceStream.filter(_.contains("surname"))
这仅在 name
和 surname
对每个 DataClass
都是唯一的情况下才有效。更有效的做法可能是首先 map
将 DataStream
转换为某种通用格式,或者使用 Either
之类的东西作为反序列化结果,然后检查它是否成功。