如何使用 Apache flink 处理乱序事件?
How do I handle out-of-order events with Apache flink?
为了测试流处理和Flink,我给自己出了一个看似简单的问题。我的数据流由粒子的 x
和 y
坐标以及记录位置的时间 t
组成。我的 objective 是用特定粒子的速度注释此数据。所以流可能看起来像这样。
<timestamp:Long> <particle_id:String> <x:Double> <y:Double>
1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2
现在不能保证事件会按顺序到达,即 1612103771213 p2 -0.1 -0.1
可能会在 1612103771212 p2 0.0 0.0
之前到达说 10ms
。
为简单起见,可以假设任何迟到的数据都将在早到数据的 100ms
内到达。
我承认我是流处理和 Flink 的新手,所以这可能是一个愚蠢的问题,但答案很明显,但我目前对如何实现我的目标感到困惑 objective这里。
编辑
根据 David 的回答,我尝试使用 Flink Table API 对数据流进行排序,对文本套接字流使用 nc -lk 9999
。问题是在我关闭文本套接字流之前,控制台不会打印任何内容。这是我写的 scala 代码 -
package processor
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, FieldExpression, WithOperations}
import org.apache.flink.util.Collector
import java.time.Duration
object AnnotateJob {
val OUT_OF_ORDER_NESS = 100
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bSettings)
env.setParallelism(1)
// Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
val text = env.socketTextStream("localhost", 9999)
val objStream = text
.filter( _.nonEmpty )
.map(new ParticleMapFunction)
val posStream = objStream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[ParticlePos](Duration.ofMillis(OUT_OF_ORDER_NESS))
.withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
override def extractTimestamp(t: ParticlePos, l: Long): Long = t.t
})
)
val tablePos = tableEnv.fromDataStream(posStream, $"t".rowtime() as "et", $"t", $"name", $"x", $"y")
tableEnv.createTemporaryView("pos", tablePos)
val sorted = tableEnv.sqlQuery("SELECT t, name, x, y FROM pos ORDER BY et ASC")
val sortedPosStream = tableEnv.toAppendStream[ParticlePos](sorted)
// sortedPosStream.keyBy(pos => pos.name).process(new ValAnnotator)
sortedPosStream.print()
// execute program
env.execute()
}
case class ParticlePos(t : Long, name : String, x : Double, y : Double) extends Serializable
case class ParticlePosVal(t : Long, name : String, x : Double, y : Double,
var vx : Double = 0.0, var vy : Double = 0.0) extends Serializable
class ParticleMapFunction extends MapFunction[String, ParticlePos] {
override def map(t: String): ParticlePos = {
val parts = t.split("\W+")
ParticlePos(parts(0).toLong, parts(1), parts(2).toDouble, parts(3).toDouble)
}
}
}
在 Flink 中执行此操作的一种方法可能是使用 KeyedProcessFunction,即可以:
的函数
- 处理流中的每个事件
- 保持某种状态
- 使用基于事件时间的计时器触发一些逻辑
所以它会像这样:
- 您需要了解关于您的数据的某种“最大乱序”。根据您的描述,我们假设 100 毫秒,这样当在时间戳
1612103771212
处处理数据时,您决定考虑在 1612103771112
. 之前您确定已经收到所有数据
- 您的第一步是
keyBy()
您的流,按粒子 ID 键控。这意味着您的 Flink 应用程序中 next 运算符的逻辑现在可以用一个粒子的一系列事件来表达,并且每个粒子都以这种方式并行处理。
像这样:
yourStream.keyBy(...lookup p1 or p2 here...).process(new YourProcessFunction())
- 在
YourProcessFunction
的 ProcessFunction
初始化期间(即在 open
方法期间),初始化一个 ListState
可以安全存储东西的地方。
- 在处理流中的元素时,在
processElement
方法中,只需将其添加到 listState
并注册一个定时器触发器,例如 100 毫秒
- 当
onTimer()
方法触发时,比如时间 t
,查看 listState
中时间 < t - 100
的所有元素,如果您有至少其中两个,对它们进行排序,将它们从状态中删除,应用您描述的速度计算和注释逻辑,然后向下游发出结果。
您会发现 an example in the official Flink training 在乘坐出租车期间使用这种逻辑,这与您的用例有很多相似之处。还可以查看该存储库的各种 Readme.md 文件以了解更多详细信息。
一般来说,水印与事件时间计时器相结合是解决无序事件流带来的问题的方法。涵盖 Event Time and Watermarks 的官方 Flink 培训部分解释了其工作原理。
在更高层次上,有时使用 Flink 的 CEP 库或 Flink SQL 之类的东西会更容易,因为它们可以很容易地按时间对流进行排序,从而消除所有的错误-秩序。例如,有关使用 Flink SQL 按事件时间对流进行排序的 Flink DataStream 程序示例,请参阅 。
对于您的情况,一个相当简单的 MATCH_RECOGNIZE 查询就可以满足您的需求。这可能看起来像这样,
SELECT *
FROM event
MATCH_RECOGNIZE (
PARTITION BY particleId
ORDER BY ts
MEASURES
b.ts,
b.particleId,
velocity(a, b)
AFTER MATCH SKIP TO NEXT ROW
PATTERN (a b)
DEFINE
a AS TRUE,
b AS TRUE
)
其中 velocity(a, b) 是一个用户定义的函数,它在给定同一粒子的两个连续事件(a 和 b)的情况下计算速度。
为了测试流处理和Flink,我给自己出了一个看似简单的问题。我的数据流由粒子的 x
和 y
坐标以及记录位置的时间 t
组成。我的 objective 是用特定粒子的速度注释此数据。所以流可能看起来像这样。
<timestamp:Long> <particle_id:String> <x:Double> <y:Double>
1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2
现在不能保证事件会按顺序到达,即 1612103771213 p2 -0.1 -0.1
可能会在 1612103771212 p2 0.0 0.0
之前到达说 10ms
。
为简单起见,可以假设任何迟到的数据都将在早到数据的 100ms
内到达。
我承认我是流处理和 Flink 的新手,所以这可能是一个愚蠢的问题,但答案很明显,但我目前对如何实现我的目标感到困惑 objective这里。
编辑
根据 David 的回答,我尝试使用 Flink Table API 对数据流进行排序,对文本套接字流使用 nc -lk 9999
。问题是在我关闭文本套接字流之前,控制台不会打印任何内容。这是我写的 scala 代码 -
package processor
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, FieldExpression, WithOperations}
import org.apache.flink.util.Collector
import java.time.Duration
object AnnotateJob {
val OUT_OF_ORDER_NESS = 100
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bSettings)
env.setParallelism(1)
// Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
val text = env.socketTextStream("localhost", 9999)
val objStream = text
.filter( _.nonEmpty )
.map(new ParticleMapFunction)
val posStream = objStream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[ParticlePos](Duration.ofMillis(OUT_OF_ORDER_NESS))
.withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
override def extractTimestamp(t: ParticlePos, l: Long): Long = t.t
})
)
val tablePos = tableEnv.fromDataStream(posStream, $"t".rowtime() as "et", $"t", $"name", $"x", $"y")
tableEnv.createTemporaryView("pos", tablePos)
val sorted = tableEnv.sqlQuery("SELECT t, name, x, y FROM pos ORDER BY et ASC")
val sortedPosStream = tableEnv.toAppendStream[ParticlePos](sorted)
// sortedPosStream.keyBy(pos => pos.name).process(new ValAnnotator)
sortedPosStream.print()
// execute program
env.execute()
}
case class ParticlePos(t : Long, name : String, x : Double, y : Double) extends Serializable
case class ParticlePosVal(t : Long, name : String, x : Double, y : Double,
var vx : Double = 0.0, var vy : Double = 0.0) extends Serializable
class ParticleMapFunction extends MapFunction[String, ParticlePos] {
override def map(t: String): ParticlePos = {
val parts = t.split("\W+")
ParticlePos(parts(0).toLong, parts(1), parts(2).toDouble, parts(3).toDouble)
}
}
}
在 Flink 中执行此操作的一种方法可能是使用 KeyedProcessFunction,即可以:
的函数- 处理流中的每个事件
- 保持某种状态
- 使用基于事件时间的计时器触发一些逻辑
所以它会像这样:
- 您需要了解关于您的数据的某种“最大乱序”。根据您的描述,我们假设 100 毫秒,这样当在时间戳
1612103771212
处处理数据时,您决定考虑在1612103771112
. 之前您确定已经收到所有数据
- 您的第一步是
keyBy()
您的流,按粒子 ID 键控。这意味着您的 Flink 应用程序中 next 运算符的逻辑现在可以用一个粒子的一系列事件来表达,并且每个粒子都以这种方式并行处理。
像这样:
yourStream.keyBy(...lookup p1 or p2 here...).process(new YourProcessFunction())
- 在
YourProcessFunction
的ProcessFunction
初始化期间(即在open
方法期间),初始化一个ListState
可以安全存储东西的地方。 - 在处理流中的元素时,在
processElement
方法中,只需将其添加到listState
并注册一个定时器触发器,例如 100 毫秒 - 当
onTimer()
方法触发时,比如时间t
,查看listState
中时间 <t - 100
的所有元素,如果您有至少其中两个,对它们进行排序,将它们从状态中删除,应用您描述的速度计算和注释逻辑,然后向下游发出结果。
您会发现 an example in the official Flink training 在乘坐出租车期间使用这种逻辑,这与您的用例有很多相似之处。还可以查看该存储库的各种 Readme.md 文件以了解更多详细信息。
一般来说,水印与事件时间计时器相结合是解决无序事件流带来的问题的方法。涵盖 Event Time and Watermarks 的官方 Flink 培训部分解释了其工作原理。
在更高层次上,有时使用 Flink 的 CEP 库或 Flink SQL 之类的东西会更容易,因为它们可以很容易地按时间对流进行排序,从而消除所有的错误-秩序。例如,有关使用 Flink SQL 按事件时间对流进行排序的 Flink DataStream 程序示例,请参阅
对于您的情况,一个相当简单的 MATCH_RECOGNIZE 查询就可以满足您的需求。这可能看起来像这样,
SELECT *
FROM event
MATCH_RECOGNIZE (
PARTITION BY particleId
ORDER BY ts
MEASURES
b.ts,
b.particleId,
velocity(a, b)
AFTER MATCH SKIP TO NEXT ROW
PATTERN (a b)
DEFINE
a AS TRUE,
b AS TRUE
)
其中 velocity(a, b) 是一个用户定义的函数,它在给定同一粒子的两个连续事件(a 和 b)的情况下计算速度。