Flink keyBy 选项列表

Flink keyBy over option list

我的虚拟 flink 作业

import org.apache.flink.streaming.api.scala._
import org.json4s.NoTypeHints
import org.json4s.native.Serialization
import org.json4s.native.Serialization.read

case class Label(name: String, typ: String)
case class MyData(id: String, labels: Option[List[Label]] )

object WindowWordCount {
  implicit val formats = Serialization.formats(NoTypeHints)

  def main(args: Array[String]) {

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.getConfig.setParallelism(1)

  val packetSource = env
      .socketTextStream("localhost", 7777)
      .map(json => read[MyData](json))

  env.execute("Window Stream WordCount")
  }
}

因此,每个 MyData 对象都有唯一的 id,并且可以有多个 labels。 我要做的是按标签 .keyBy

传入数据示例(序列化为MyData

{
  "id": "1",
  "labels": [
    {
      "name": "unolabelo",
      "typ": "two"
    },
    {
      "name": "twunolabelo",
      "typ": "two"
    }
  ]
}

如果单个 MyData 元素带有 3 个不同的标签,我需要发出 3 个具有唯一标签的 MyData 元素,然后我可以 .keyBy(_.label) .

最好的方法是什么?

所以,如果我理解正确的话,你想为 labels 中的每个 label 复制你的消息。我认为最简单的想法是简单地创建另一个 class,比如说 MyDataSimple,它只有一个 label,然后使用 FlatMapFunctionMyData 映射到 MyDataSimple 赞:

val myData = ...
myData.labels.map(label => MyDataSimple(label,...))

然后您可以执行以下操作:

val packetSource = env
      .socketTextStream("localhost", 7777)
      .map(json => read[MyData](json))
      .flatMap(new MyFlatMapFunction())
      .keyBy(_.label)
    

通过应用.flatMap函数解决

val packetSource = env
  .socketTextStream("localhost", 7777)
  .map(json => read[MyData](json))
  .flatMap(
    new FlatMapFunction[MyData,MyData] {
      override def flatMap(value: MyData, out: Collector[MyData]): Unit = {
        value.labels match {
          case Some(labels) =>
            for (label <- labels) {
              out.collect(value.copy(labels = Some(List(label))))
            }
          case None =>
        }
      }
    }
  )
  .keyBy(_.labels.get.head.name)