使用链式 keyBy 调用同一键的事件顺序

Order of events with chained keyBy calls on same key

来自 this link 我了解到来自某些 inDataStream 的事件顺序在以下结果 outDataStream 中按键保留:

outDataStream = inDataStream.keyBy(...)
    .timeWindow(...)
    .reduce(...)

因此,例如,如果我们从 inDataStream 输入以下事件(我们对键执行 keyBy):

(1, key1), (2, key1), (3, key2), (4, key1), (5, key2)

然后outDataStream 会为key1 的事件和key2 的事件保留相同的顺序。所以 outDataStream 这样的结果永远不会发生:

(2, key1), (1, key1), (3, key2), (4, key1), (5, key2)

(因为1和2调换了)

到目前为止,我是正确的吗? 然后,如果我们链接另一个 keyBy/process,我们将再次以相同的顺序产生结果,对吗?因为我们只是再次应用相同的保证.. 因为相同键的顺序对我们来说很重要,所以为了确保我们在同一页面上,我制作了我们现有内容的简化版本:

// incoming events. family is used for keyBy grouping.
case class Event(id: Int, family: String, value: Double)
// the aggregation of events
case class Aggregation(latsetId: Int, family: String, total: Double)

// simply adding events into total aggregation
object AggFunc extends AggregateFunction[Event, Aggregation, Aggregation] {
override def add(e: Event, acc: Aggregation) = Aggregation(e.id, e.family, e.value + acc.total)
override def createAccumulator() = Aggregation(-1, null, 0.0)
override def getResult(acc: Aggregation) = acc
}

object ProcessFunc extends ProcessFunction[Aggregation, String] {
override def processElement(agg: Aggregation, ctx: ProcessFunction[Aggregation, String]#Context, out: Collector[String]) =
  out.collect(s"Received aggregation combined with event ${agg.latsetId}. New total=${agg.total}")
}

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// incoming events from a source have 2 families: "A", and "B"
env.fromElements(Event(1, "A", 6.0), Event(2, "B", 4.0), Event(3, "A", -2.0), Event(4, "B", 3.0),
    Event(5, "A", 8.0), Event(6, "B", 1.0), Event(7, "A", -10.0))
  .keyBy(_.family)
  .timeWindow(Time.seconds(1))
  .trigger(CountTrigger.of(1)) // FIRE any incoming event for immediate aggregation and ProcessFunc application
  .aggregate(AggFunc)
  .keyBy(_.family)
  .process(ProcessFunc)
  .print()
}

因此,对于按顺序进入第一个 keyBy 的此类事件 - 对于任何运算符并行性和集群部署,我们保证 Sink(此处为 print())将始终接收以下家族聚合 "A" 并按此顺序(但可能与家族 "B" 的聚合混合):

"Received aggregation combined with event 1. New total=6.0"
"Received aggregation combined with event 3. New total=4.0"
"Received aggregation combined with event 5. New total=12.0"
"Received aggregation combined with event 7. New total=2.0"

是否正确?

我认为您不能安全地假设流元素的绝对顺序将在并行度 > 1 的情况下保持不变。

此外,我认为一旦遇到聚合运算符,就根本无法假定顺序。聚合运算符的输出基于内部 window 计时器,不应假定键以任何特定顺序保存。

如果您需要排序,那么我认为您最好的选择是在数据输出到您需要的内容后对其进行排序。

Flink 仅保证并行分区内的顺序,即它不跨分区通信并保留数据以保证顺序。

这意味着,如果您有以下运算符:

map(Mapper1).keyBy(0).map(Mapper2)

和 运行 并行度为 2,即

Mapper1(1) -\-/- Mapper2(1)
             X
Mapper1(2) -/-\- Mapper2(2)

然后来自 Mapper1(1) 的具有相同键的所有记录将按顺序到达 Mapper2(1)Mapper2(2),具体取决于键。当然,对于 Mapper1(2).

中具有相同键的所有记录也是如此

因此,一旦具有相同键的记录分布在多个分区(这里 Mapper1(1)Mapper1(2)),不同分区的记录就没有顺序保证,但只有那些在同一个分区。

如果顺序很重要,您可以将并行度降低到 1,或者使用事件时间语义实现您的运算符并利用水印来推断记录的乱序。