Flink 是如何处理 watermarks with Union operators 的?

How does Flink handle watermarks with Union operators?

我从四个 Kinesis 流中读取数据。每个流中的数据是不同的数据类型。读入所有四个流后,我分配时间戳和水印,并聚合来自每个流的数据。四个聚合的结果都使用相同的通用对象输出。我想合并四个流的结果,这样我就可以将合并后的流发送到一个 ProcessFunction。这基本上允许我像使用 CoProcessFunction 一样使用 ProcessFunction,但我将能够处理来自两个以上流的数据(在这种情况下,ProcessFunction 将接收来自所有四个单独流的聚合)。

但是,我担心这可能无法很好地处理水印。如果一个流需要更长的时间来处理或以某种方式落后,如果所有水印在联合中向前传递并且其中一个流领先于其他流,则它的聚合可能无法进入流程功能。如果是这样,那么流程函数的水印将是它从四个单独的流中看到的水印的最大值。

我的问题是:联合运算符如何处理水印以及联合下游的运算符如何处理这些水印?

另外:如果通用对象的联合由于水印问题不起作用,当 Flink 仅支持两个流的 CoProcessFunction 时,合并四种不同聚合结果的最佳方法是什么?

Union 的水印与并行流的水印一样工作。这意味着水印始终是来自所有输入流的 min 个水印。同样代表下游运营商,他们的水印将是所有输入流的min

老实说,我不认为 union 以任何方式依赖于水印。但是,如果您出于任何原因想要使用 CoProcessFunction,我可以提供这种有点老套的方式。您可以创建一个 Seq 您已经生成的流,然后:

//Streams defined
val seq = Seq(stream, stream2, stream3, stream4)
seq.reduce((stream1, stream2) => stream1.connect(stream2).process(...))

将 2 个以上的流连接在一起的另一种方法是构建一个树,该树进行成对连接,直到所有流都连接在一起。要么作为平衡树,像这样:

A--->
     A+B---->
B--->

            A+B+C+D------------>

C--->
     C+D---->
D--->

或者一次添加一个流,如下所示:

a--->
     a+b--->
b--->
            a+b+c--->
     c----->
                     a+b+c+d--->
            d------->

FWIW,FLIP-92 是一个向 Flink 添加 n 元流运算符的提议,但即使实现了,它也可能不会被用户看到,至少一开始是这样。