触发后发送元素到下一个window under condition

Send elements to next window under condition after triggering

到目前为止,我一直在处理 Apache Beam 中的场景,其中给定特定的 HTTP 代码,我可能会保留要在下一次迭代中重新启动的元素。

一直在用内部代码实现这个,只用了一个时间触发器。

        .apply(
            "Sample Window",
            Window.into(FixedWindows.of(Duration.standardMinutes(1)))
                .triggering(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(1)))
                .withAllowedLateness(Duration.ZERO)
                .discardingFiredPanes()
        )

我正在硬编码我的逻辑来处理比方说 200 个事件的请求。并且还在内存中存储这些事件以防请求失败。

但是,查看我看到的文档组合触发...

  Repeatedly.forever(AfterFirst.of(
     AfterPane.elementCountAtLeast(100),
     AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))

我的情况也是如此。

        .apply(
            "Sample Window",
            Window.<KV<String, String>>into(FixedWindows.of(Duration.standardMinutes(1)))
                .triggering(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(200),
                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1))
                    )
                )
                .withAllowedLateness(Duration.ZERO)
                .discardingFiredPanes()
        )

所以现在一直在想...

如果在 1 分钟时间范围内由元素数量触发,这些事件会发生什么情况?是否再次加工?我应该从 Window 中手动删除它们吗?

我也在谈论 200 个元素失败的情况。我怎样才能让他们在 window 中占上风?

在您的触发器中,您正在设置 .discardingFiredPanes()

This will "discards elements in a pane after they are triggered."

任何后续窗格都不会包含已经输出的元素。