Flink keyBy 选项列表
Flink keyBy over option list
我的虚拟 flink 作业
import org.apache.flink.streaming.api.scala._
import org.json4s.NoTypeHints
import org.json4s.native.Serialization
import org.json4s.native.Serialization.read
case class Label(name: String, typ: String)
case class MyData(id: String, labels: Option[List[Label]] )
object WindowWordCount {
implicit val formats = Serialization.formats(NoTypeHints)
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setParallelism(1)
val packetSource = env
.socketTextStream("localhost", 7777)
.map(json => read[MyData](json))
env.execute("Window Stream WordCount")
}
}
因此,每个 MyData
对象都有唯一的 id
,并且可以有多个 labels
。
我要做的是按标签 .keyBy
。
传入数据示例(序列化为MyData
)
{
"id": "1",
"labels": [
{
"name": "unolabelo",
"typ": "two"
},
{
"name": "twunolabelo",
"typ": "two"
}
]
}
如果单个 MyData
元素带有 3 个不同的标签,我需要发出 3 个具有唯一标签的 MyData
元素,然后我可以 .keyBy(_.label)
.
最好的方法是什么?
所以,如果我理解正确的话,你想为 labels
中的每个 label
复制你的消息。我认为最简单的想法是简单地创建另一个 class,比如说 MyDataSimple
,它只有一个 label
,然后使用 FlatMapFunction
将 MyData
映射到 MyDataSimple
赞:
val myData = ...
myData.labels.map(label => MyDataSimple(label,...))
然后您可以执行以下操作:
val packetSource = env
.socketTextStream("localhost", 7777)
.map(json => read[MyData](json))
.flatMap(new MyFlatMapFunction())
.keyBy(_.label)
通过应用.flatMap
函数解决
val packetSource = env
.socketTextStream("localhost", 7777)
.map(json => read[MyData](json))
.flatMap(
new FlatMapFunction[MyData,MyData] {
override def flatMap(value: MyData, out: Collector[MyData]): Unit = {
value.labels match {
case Some(labels) =>
for (label <- labels) {
out.collect(value.copy(labels = Some(List(label))))
}
case None =>
}
}
}
)
.keyBy(_.labels.get.head.name)
我的虚拟 flink 作业
import org.apache.flink.streaming.api.scala._
import org.json4s.NoTypeHints
import org.json4s.native.Serialization
import org.json4s.native.Serialization.read
case class Label(name: String, typ: String)
case class MyData(id: String, labels: Option[List[Label]] )
object WindowWordCount {
implicit val formats = Serialization.formats(NoTypeHints)
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setParallelism(1)
val packetSource = env
.socketTextStream("localhost", 7777)
.map(json => read[MyData](json))
env.execute("Window Stream WordCount")
}
}
因此,每个 MyData
对象都有唯一的 id
,并且可以有多个 labels
。
我要做的是按标签 .keyBy
。
传入数据示例(序列化为MyData
)
{
"id": "1",
"labels": [
{
"name": "unolabelo",
"typ": "two"
},
{
"name": "twunolabelo",
"typ": "two"
}
]
}
如果单个 MyData
元素带有 3 个不同的标签,我需要发出 3 个具有唯一标签的 MyData
元素,然后我可以 .keyBy(_.label)
.
最好的方法是什么?
所以,如果我理解正确的话,你想为 labels
中的每个 label
复制你的消息。我认为最简单的想法是简单地创建另一个 class,比如说 MyDataSimple
,它只有一个 label
,然后使用 FlatMapFunction
将 MyData
映射到 MyDataSimple
赞:
val myData = ...
myData.labels.map(label => MyDataSimple(label,...))
然后您可以执行以下操作:
val packetSource = env
.socketTextStream("localhost", 7777)
.map(json => read[MyData](json))
.flatMap(new MyFlatMapFunction())
.keyBy(_.label)
通过应用.flatMap
函数解决
val packetSource = env
.socketTextStream("localhost", 7777)
.map(json => read[MyData](json))
.flatMap(
new FlatMapFunction[MyData,MyData] {
override def flatMap(value: MyData, out: Collector[MyData]): Unit = {
value.labels match {
case Some(labels) =>
for (label <- labels) {
out.collect(value.copy(labels = Some(List(label))))
}
case None =>
}
}
}
)
.keyBy(_.labels.get.head.name)