具有固定 window 和基于事件时间的触发器的 Beam CoGroupByKey 生成随机元素
Beam CoGroupByKey with fixed window and event time based trigger generates random elements
我在 Beam 中有一个管道,它使用 CoGroupByKey
组合 2 个 PCollection,第一个从 Pub/Sub 订阅读取,第二个使用相同的 PCollection,但通过查找来丰富数据来自 table 的附加信息,使用 JdbcIO.readAll
。因此,如果第一个 PCollection 中没有数据,第二个 PCollection 中就不可能有数据。
固定 window 为 10 秒,基于事件的触发器如下所示;
Repeatedly.forever(
AfterWatermark.pastEndOfWindow().withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(40))
).withLateFirings(AfterPane.elementCountAtLeast(1))
);
我看到的问题是,当我使用 Drain
模式停止管道时,它似乎在没有任何消息进入输入时为第二个 PCollection 随机生成元素 Pub/Sub话题。当管道也是 运行 但不一致时,这也会随机发生,但是在排空管道时我能够始终如一地重现这一点。
请在下面找到输入与输出的变化;
您正在使用 non-deterministic 触发,这意味着输出对事件进入的确切顺序很敏感。另一种看待这个问题的方式是 CoGBK 不等待双方进入;一旦任何一方进来,触发器就会开始滴答作响。
例如,让我们分别调用您的 PCollections A 和 A',并假设它们每个都有两个元素 a1, a2, a1' 和 a2'(共同出处)。
假设a1和a1'进入CoGBK,经过39秒,然后a2 进入(在同一个键上),再过 2 秒,然后 a2' 进入。CoGBK 将输出 ([a1, a2], [a1']) 当达到 40 秒标记时,然后当 window 关闭时 ([], [a2']) 将被发射。 (即使一切都在同一个键上,如果通过较长路径的墙时间延迟超过 40 秒,这种情况偶尔也会发生,并且对于任何延迟数据几乎肯定会发生(每一方都会单独触发)。
排水使事情变得更糟,例如我认为所有处理时间都会立即触发。
我在 Beam 中有一个管道,它使用 CoGroupByKey
组合 2 个 PCollection,第一个从 Pub/Sub 订阅读取,第二个使用相同的 PCollection,但通过查找来丰富数据来自 table 的附加信息,使用 JdbcIO.readAll
。因此,如果第一个 PCollection 中没有数据,第二个 PCollection 中就不可能有数据。
固定 window 为 10 秒,基于事件的触发器如下所示;
Repeatedly.forever(
AfterWatermark.pastEndOfWindow().withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(40))
).withLateFirings(AfterPane.elementCountAtLeast(1))
);
我看到的问题是,当我使用 Drain
模式停止管道时,它似乎在没有任何消息进入输入时为第二个 PCollection 随机生成元素 Pub/Sub话题。当管道也是 运行 但不一致时,这也会随机发生,但是在排空管道时我能够始终如一地重现这一点。
请在下面找到输入与输出的变化;
您正在使用 non-deterministic 触发,这意味着输出对事件进入的确切顺序很敏感。另一种看待这个问题的方式是 CoGBK 不等待双方进入;一旦任何一方进来,触发器就会开始滴答作响。
例如,让我们分别调用您的 PCollections A 和 A',并假设它们每个都有两个元素 a1, a2, a1' 和 a2'(共同出处)。
假设a1和a1'进入CoGBK,经过39秒,然后a2 进入(在同一个键上),再过 2 秒,然后 a2' 进入。CoGBK 将输出 ([a1, a2], [a1']) 当达到 40 秒标记时,然后当 window 关闭时 ([], [a2']) 将被发射。 (即使一切都在同一个键上,如果通过较长路径的墙时间延迟超过 40 秒,这种情况偶尔也会发生,并且对于任何延迟数据几乎肯定会发生(每一方都会单独触发)。
排水使事情变得更糟,例如我认为所有处理时间都会立即触发。