如何在 Beam 管道中为会话 windows 编写单元测试?

How to write unit tests for session windows in a Beam pipeline?

我正在编写一个处理产品事件(创建、更新、删除)的管道。每个产品都属于具有特定持续时间的销售。我希望能够对给定销售中的所有产品执行一些聚合。出于本示例的目的,假设我只需要每次销售的唯一产品 ID 列表。

因此,我的管道在销售 ID 上使用会话 windows,间隔时间很长(因此,当销售结束并且没有发布更多产品更新时,window那次销售也结束了)。我的问题是,如何为此编写单元测试?

为了本次测试,我们假设如下:

这是我目前的情况:

我创建了一个 TestStream 并添加了一些元素:sale1 的 3 个产品。接下来,我将水印提高到 700,远远超过间隙持续时间。添加另一个产品,最后将水印推进到无穷大。

@Test
public void TestSessionWindow() {
    Coder<String> utfCoder = StringUtf8Coder.of();
    TestStream<String> onTimeProducts = 
TestStream.create(utfCoder).addElements(
            TimestampedValue.of("sale1 product1", new Instant(0)),
            TimestampedValue.of("sale1 product2", new Instant(0)),
            TimestampedValue.of("sale1 product3", new Instant(0))
    )
            .advanceWatermarkTo(new Instant(700)) // watermark passes trigger time
    .addElements(
            TimestampedValue.of("campaign1 product9", new Instant(710))
    )
    .advanceWatermarkToInfinity();

    PCollection<KV<String, String>> results = applyDistinctProductsTransform(pipeline, onTimeProducts);

    PAssert.that(results).containsInAnyOrder(
            KV.of("sale1", "product1,product2,product3"),
            KV.of("sale1", "product9")
    );
    pipeline.run().waitUntilFinish();
}

然而,

  1. 管道输出 sale1product1,product2,product3,product9 的 KV,因此 product9 附加到 window。我希望这个产品在单独的 window 中处理,因此最终在输出 PCollection 中的不同行中。
  2. 如何才能在 PAssert 中只得到单个 window 的结果?我知道有 inWindow 函数,我找到了 fixed time window 的示例,但我不知道如何为会话 window.
  3. 做同样的事情

您可以查看 PTransform and the unit test 的完整代码。

1) 我相信您遇到的是简单的单位问题。 window 600 的间隔持续时间以秒为单位指定 Duration.standardSeconds yet new Instant(long) 使用毫秒,这意味着 600 秒的间隔大于导致会话合并的 700 毫秒的时间间隔。

2) 会话在内部仍然使用间隔 windows。您将需要根据您的触发策略计算所有会话合并后的输出 window。默认情况下,会话 window 使用 IntervalWindow(timestamp, gap duration), and merges all overlapping windows 创建更大的 window。例如,如果您有 windows(开始时间、结束时间)、[10, 14]、[12, 18]、[4, 14] 相同的会话密钥,它们将全部合并生成单曲 [4, 18] window.