Apache Beam GenerateSequence 不以指定速率发射元素

Apache Beam GenerateSequence does not emit elements at specified rates

我正在试验 Apache Beam,并尝试使用 GenerateSequence PTransform 作为生成无界流数据源的简单方法。

GenerateSequence class 提供方法 withRate(long numElements, Duration periodLength),根据我的理解,它控制每个周期产生的元素的速率以及周期的持续时间。令我惊讶的是,元素的生成速度与提供的描述不一致。

例如,我尝试使用以下代码片段:

Pipeline p = Pipeline.create(pipelineOptions);
Duration runtimeDuration = Duration.standardSeconds(20L);
Duration periodDuration = Duration.standardSeconds(1L);
PCollection<String> generated_seq = p.apply("Get Sequence",
        GenerateSequence.from(1)
                .withMaxReadTime(runtimeDuration)
                .withRate(1, periodDuration))
        .apply("Test sequence generation", ParDo.of(new DoFn<Long,String>() {
            @ProcessElement
            public void processElement(@Element Long in, OutputReceiver<String> out){
                long userId = in % 5; //simulate events from 5 users
                Instant timestamp = Instant.now();
                DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy-MM-dd HH-mm-ss.SSSzZ").withZone(DateTimeZone.forID("Etc/GMT"));
                System.out.println(in + " => UserId:" + userId + "|Timestamp: " + timestamp.toString(fmt));
                out.outputWithTimestamp(Long.toString(userId), timestamp);
            }
        }));

流水线生成了结果序列:

5 => UserId:0|Timestamp: 2021-03-08 21-20-25.361GMT+0000
14 => UserId:4|Timestamp: 2021-03-08 21-20-25.446GMT+0000
6 => UserId:1|Timestamp: 2021-03-08 21-20-25.450GMT+0000
12 => UserId:2|Timestamp: 2021-03-08 21-20-25.452GMT+0000
7 => UserId:2|Timestamp: 2021-03-08 21-20-25.456GMT+0000
9 => UserId:4|Timestamp: 2021-03-08 21-20-25.459GMT+0000
13 => UserId:3|Timestamp: 2021-03-08 21-20-25.461GMT+0000
1 => UserId:1|Timestamp: 2021-03-08 21-20-25.463GMT+0000
2 => UserId:2|Timestamp: 2021-03-08 21-20-25.465GMT+0000
16 => UserId:1|Timestamp: 2021-03-08 21-20-25.468GMT+0000
10 => UserId:0|Timestamp: 2021-03-08 21-20-25.469GMT+0000
8 => UserId:3|Timestamp: 2021-03-08 21-20-25.471GMT+0000
15 => UserId:0|Timestamp: 2021-03-08 21-20-25.474GMT+0000
4 => UserId:4|Timestamp: 2021-03-08 21-20-25.476GMT+0000
17 => UserId:2|Timestamp: 2021-03-08 21-20-25.478GMT+0000
11 => UserId:1|Timestamp: 2021-03-08 21-20-25.488GMT+0000
3 => UserId:3|Timestamp: 2021-03-08 21-20-37.613GMT+0000

如上所述,尽管指定了 withRate(1, periodDuration),但大多数元素都是在 同一秒 内生成的;即指定1秒内最多生成1个元素

我试图深入研究 SDK 代码以了解并希望解决此行为的原因,但我无法确定其原因。因此,有没有办法解决这个问题,或者是否有任何类似的 PTransforms 可以模拟无界的流媒体源?

原因

这可能是由 GenerateSequence 转换中的一个怪癖引起的,该文档并未真正解释。具体来说,用于生成数字 (CountingSource) 的底层源的工作方式是,如果它用完了要发出的元素,则在再次检查源之前会稍等片刻。如果这个等待时间大于period duration,那么下次查source的时候可能会有多个元素排队,source会快速通过它们。

因此,在您的示例中,可能发生的情况是源启动了,但由于一秒钟的时间尚未过去,因此尚未发出任何元素。几秒钟后再次检查,此时它会迅速发出所有应该在该时间段内发出的元素,直到用完然后再次等待。这可以从示例中的最后一个元素看出,它是在前一个元素之后 12 秒发出的。延长运行时持续时间是一个很好的方式来观察它的实际效果;您可能会看到发射了多批元素。

时间戳

如果您所需要的只是定期生成的原始数字,则上述行为非常有效。但是,如果您使用 GenerateSequence 来测试依赖于时间戳的管道,您将希望在源代码中设置自定义 TimestampFn to set the timestamps for each emitted element. The default TimestampFn 可能是一个很好的使用示例。一个可能对您有用的非常简单的方法是设置时间戳以匹配元素的值。