如何使用 Apache flink 处理乱序事件?

How do I handle out-of-order events with Apache flink?

为了测试流处理和Flink,我给自己出了一个看似简单的问题。我的数据流由粒子的 xy 坐标以及记录位置的时间 t 组成。我的 objective 是用特定粒子的速度注释此数据。所以流可能看起来像这样。

<timestamp:Long> <particle_id:String> <x:Double> <y:Double>

1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2

现在不能保证事件会按顺序到达,即 1612103771213 p2 -0.1 -0.1 可能会在 1612103771212 p2 0.0 0.0 之前到达说 10ms

为简单起见,可以假设任何迟到的数据都将在早到数据的 100ms 内到达。

我承认我是流处理和 Flink 的新手,所以这可能是一个愚蠢的问题,但答案很明显,但我目前对如何实现我的目标感到困惑 objective这里。

编辑

根据 David 的回答,我尝试使用 Flink Table API 对数据流进行排序,对文本套接字流使用 nc -lk 9999。问题是在我关闭文本套接字流之前,控制台不会打印任何内容。这是我写的 scala 代码 -


package processor

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, FieldExpression, WithOperations}
import org.apache.flink.util.Collector

import java.time.Duration


object AnnotateJob {

  val OUT_OF_ORDER_NESS = 100

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val tableEnv = StreamTableEnvironment.create(env, bSettings)

    env.setParallelism(1)

    // Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
    val text = env.socketTextStream("localhost", 9999)
    val objStream = text
      .filter( _.nonEmpty )
      .map(new ParticleMapFunction)

    val posStream = objStream
      .assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[ParticlePos](Duration.ofMillis(OUT_OF_ORDER_NESS))
          .withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
            override def extractTimestamp(t: ParticlePos, l: Long): Long = t.t
          })
      )

    val tablePos = tableEnv.fromDataStream(posStream, $"t".rowtime() as "et", $"t", $"name", $"x", $"y")
    tableEnv.createTemporaryView("pos", tablePos)
    val sorted = tableEnv.sqlQuery("SELECT t, name, x, y FROM pos ORDER BY et ASC")

    val sortedPosStream = tableEnv.toAppendStream[ParticlePos](sorted)

    // sortedPosStream.keyBy(pos => pos.name).process(new ValAnnotator)

    sortedPosStream.print()

    // execute program
    env.execute()
  }

  case class ParticlePos(t : Long, name : String, x : Double, y : Double) extends Serializable
  case class ParticlePosVal(t : Long, name : String, x : Double, y : Double,
                            var vx : Double = 0.0, var vy : Double = 0.0) extends Serializable

  class ParticleMapFunction extends MapFunction[String, ParticlePos] {
    override def map(t: String): ParticlePos = {
      val parts = t.split("\W+")
      ParticlePos(parts(0).toLong, parts(1), parts(2).toDouble, parts(3).toDouble)
    }
  }

}

在 Flink 中执行此操作的一种方法可能是使用 KeyedProcessFunction,即可以:

的函数
  • 处​​理流中的每个事件
  • 保持某种状态
  • 使用基于事件时间的计时器触发一些逻辑

所以它会像这样:

  • 您需要了解关于您的数据的某种“最大乱序”。根据您的描述,我们假设 100 毫秒,这样当在时间戳 1612103771212 处处理数据时,您决定考虑在 1612103771112.
  • 之前您确定已经收到所有数据
  • 您的第一步是 keyBy() 您的流,按粒子 ID 键控。这意味着您的 Flink 应用程序中 next 运算符的逻辑现在可以用一个粒子的一系列事件来表达,并且每个粒子都以这种方式并行处理。

像这样:

yourStream.keyBy(...lookup p1 or p2 here...).process(new YourProcessFunction())
  • YourProcessFunctionProcessFunction 初始化期间(即在 open 方法期间),初始化一个 ListState 可以安全存储东西的地方。
  • 在处理流中的元素时,在 processElement 方法中,只需将其添加到 listState 并注册一个定时器触发器,例如 100 毫秒
  • onTimer() 方法触发时,比如时间 t,查看 listState 中时间 < t - 100 的所有元素,如果您有至少其中两个,对它们进行排序,将它们从状态中删除,应用您描述的速度计算和注释逻辑,然后向下游发出结果。

您会发现 an example in the official Flink training 在乘坐出租车期间使用这种逻辑,这与您的用例有很多相似之处。还可以查看该存储库的各种 Readme.md 文件以了解更多详细信息。

一般来说,水印与事件时间计时器相结合是解决无序事件流带来的问题的方法。涵盖 Event Time and Watermarks 的官方 Flink 培训部分解释了其工作原理。

在更高层次上,有时使用 Flink 的 CEP 库或 Flink SQL 之类的东西会更容易,因为它们可以很容易地按时间对流进行排序,从而消除所有的错误-秩序。例如,有关使用 Flink SQL 按事件时间对流进行排序的 Flink DataStream 程序示例,请参阅

对于您的情况,一个相当简单的 MATCH_RECOGNIZE 查询就可以满足您的需求。这可能看起来像这样,

SELECT *
    FROM event
    MATCH_RECOGNIZE (
        PARTITION BY particleId
        ORDER BY ts
        MEASURES 
            b.ts, 
            b.particleId, 
            velocity(a, b)
        AFTER MATCH SKIP TO NEXT ROW
        PATTERN (a b)
        DEFINE
            a AS TRUE,
            b AS TRUE
    )

其中 velocity(a, b) 是一个用户定义的函数,它在给定同一粒子的两个连续事件(a 和 b)的情况下计算速度。