Apache Beam GenerateSequence 不以指定速率发射元素
Apache Beam GenerateSequence does not emit elements at specified rates
我正在试验 Apache Beam,并尝试使用 GenerateSequence PTransform 作为生成无界流数据源的简单方法。
GenerateSequence class 提供方法 withRate(long numElements, Duration periodLength)
,根据我的理解,它控制每个周期产生的元素的速率以及周期的持续时间。令我惊讶的是,元素的生成速度与提供的描述不一致。
例如,我尝试使用以下代码片段:
Pipeline p = Pipeline.create(pipelineOptions);
Duration runtimeDuration = Duration.standardSeconds(20L);
Duration periodDuration = Duration.standardSeconds(1L);
PCollection<String> generated_seq = p.apply("Get Sequence",
GenerateSequence.from(1)
.withMaxReadTime(runtimeDuration)
.withRate(1, periodDuration))
.apply("Test sequence generation", ParDo.of(new DoFn<Long,String>() {
@ProcessElement
public void processElement(@Element Long in, OutputReceiver<String> out){
long userId = in % 5; //simulate events from 5 users
Instant timestamp = Instant.now();
DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy-MM-dd HH-mm-ss.SSSzZ").withZone(DateTimeZone.forID("Etc/GMT"));
System.out.println(in + " => UserId:" + userId + "|Timestamp: " + timestamp.toString(fmt));
out.outputWithTimestamp(Long.toString(userId), timestamp);
}
}));
流水线生成了结果序列:
5 => UserId:0|Timestamp: 2021-03-08 21-20-25.361GMT+0000
14 => UserId:4|Timestamp: 2021-03-08 21-20-25.446GMT+0000
6 => UserId:1|Timestamp: 2021-03-08 21-20-25.450GMT+0000
12 => UserId:2|Timestamp: 2021-03-08 21-20-25.452GMT+0000
7 => UserId:2|Timestamp: 2021-03-08 21-20-25.456GMT+0000
9 => UserId:4|Timestamp: 2021-03-08 21-20-25.459GMT+0000
13 => UserId:3|Timestamp: 2021-03-08 21-20-25.461GMT+0000
1 => UserId:1|Timestamp: 2021-03-08 21-20-25.463GMT+0000
2 => UserId:2|Timestamp: 2021-03-08 21-20-25.465GMT+0000
16 => UserId:1|Timestamp: 2021-03-08 21-20-25.468GMT+0000
10 => UserId:0|Timestamp: 2021-03-08 21-20-25.469GMT+0000
8 => UserId:3|Timestamp: 2021-03-08 21-20-25.471GMT+0000
15 => UserId:0|Timestamp: 2021-03-08 21-20-25.474GMT+0000
4 => UserId:4|Timestamp: 2021-03-08 21-20-25.476GMT+0000
17 => UserId:2|Timestamp: 2021-03-08 21-20-25.478GMT+0000
11 => UserId:1|Timestamp: 2021-03-08 21-20-25.488GMT+0000
3 => UserId:3|Timestamp: 2021-03-08 21-20-37.613GMT+0000
如上所述,尽管指定了 withRate(1, periodDuration)
,但大多数元素都是在 同一秒 内生成的;即指定1秒内最多生成1个元素
我试图深入研究 SDK 代码以了解并希望解决此行为的原因,但我无法确定其原因。因此,有没有办法解决这个问题,或者是否有任何类似的 PTransforms 可以模拟无界的流媒体源?
原因
这可能是由 GenerateSequence
转换中的一个怪癖引起的,该文档并未真正解释。具体来说,用于生成数字 (CountingSource
) 的底层源的工作方式是,如果它用完了要发出的元素,则在再次检查源之前会稍等片刻。如果这个等待时间大于period duration,那么下次查source的时候可能会有多个元素排队,source会快速通过它们。
因此,在您的示例中,可能发生的情况是源启动了,但由于一秒钟的时间尚未过去,因此尚未发出任何元素。几秒钟后再次检查,此时它会迅速发出所有应该在该时间段内发出的元素,直到用完然后再次等待。这可以从示例中的最后一个元素看出,它是在前一个元素之后 12 秒发出的。延长运行时持续时间是一个很好的方式来观察它的实际效果;您可能会看到发射了多批元素。
时间戳
如果您所需要的只是定期生成的原始数字,则上述行为非常有效。但是,如果您使用 GenerateSequence
来测试依赖于时间戳的管道,您将希望在源代码中设置自定义 TimestampFn to set the timestamps for each emitted element. The default TimestampFn 可能是一个很好的使用示例。一个可能对您有用的非常简单的方法是设置时间戳以匹配元素的值。
我正在试验 Apache Beam,并尝试使用 GenerateSequence PTransform 作为生成无界流数据源的简单方法。
GenerateSequence class 提供方法 withRate(long numElements, Duration periodLength)
,根据我的理解,它控制每个周期产生的元素的速率以及周期的持续时间。令我惊讶的是,元素的生成速度与提供的描述不一致。
例如,我尝试使用以下代码片段:
Pipeline p = Pipeline.create(pipelineOptions);
Duration runtimeDuration = Duration.standardSeconds(20L);
Duration periodDuration = Duration.standardSeconds(1L);
PCollection<String> generated_seq = p.apply("Get Sequence",
GenerateSequence.from(1)
.withMaxReadTime(runtimeDuration)
.withRate(1, periodDuration))
.apply("Test sequence generation", ParDo.of(new DoFn<Long,String>() {
@ProcessElement
public void processElement(@Element Long in, OutputReceiver<String> out){
long userId = in % 5; //simulate events from 5 users
Instant timestamp = Instant.now();
DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy-MM-dd HH-mm-ss.SSSzZ").withZone(DateTimeZone.forID("Etc/GMT"));
System.out.println(in + " => UserId:" + userId + "|Timestamp: " + timestamp.toString(fmt));
out.outputWithTimestamp(Long.toString(userId), timestamp);
}
}));
流水线生成了结果序列:
5 => UserId:0|Timestamp: 2021-03-08 21-20-25.361GMT+0000
14 => UserId:4|Timestamp: 2021-03-08 21-20-25.446GMT+0000
6 => UserId:1|Timestamp: 2021-03-08 21-20-25.450GMT+0000
12 => UserId:2|Timestamp: 2021-03-08 21-20-25.452GMT+0000
7 => UserId:2|Timestamp: 2021-03-08 21-20-25.456GMT+0000
9 => UserId:4|Timestamp: 2021-03-08 21-20-25.459GMT+0000
13 => UserId:3|Timestamp: 2021-03-08 21-20-25.461GMT+0000
1 => UserId:1|Timestamp: 2021-03-08 21-20-25.463GMT+0000
2 => UserId:2|Timestamp: 2021-03-08 21-20-25.465GMT+0000
16 => UserId:1|Timestamp: 2021-03-08 21-20-25.468GMT+0000
10 => UserId:0|Timestamp: 2021-03-08 21-20-25.469GMT+0000
8 => UserId:3|Timestamp: 2021-03-08 21-20-25.471GMT+0000
15 => UserId:0|Timestamp: 2021-03-08 21-20-25.474GMT+0000
4 => UserId:4|Timestamp: 2021-03-08 21-20-25.476GMT+0000
17 => UserId:2|Timestamp: 2021-03-08 21-20-25.478GMT+0000
11 => UserId:1|Timestamp: 2021-03-08 21-20-25.488GMT+0000
3 => UserId:3|Timestamp: 2021-03-08 21-20-37.613GMT+0000
如上所述,尽管指定了 withRate(1, periodDuration)
,但大多数元素都是在 同一秒 内生成的;即指定1秒内最多生成1个元素
我试图深入研究 SDK 代码以了解并希望解决此行为的原因,但我无法确定其原因。因此,有没有办法解决这个问题,或者是否有任何类似的 PTransforms 可以模拟无界的流媒体源?
原因
这可能是由 GenerateSequence
转换中的一个怪癖引起的,该文档并未真正解释。具体来说,用于生成数字 (CountingSource
) 的底层源的工作方式是,如果它用完了要发出的元素,则在再次检查源之前会稍等片刻。如果这个等待时间大于period duration,那么下次查source的时候可能会有多个元素排队,source会快速通过它们。
因此,在您的示例中,可能发生的情况是源启动了,但由于一秒钟的时间尚未过去,因此尚未发出任何元素。几秒钟后再次检查,此时它会迅速发出所有应该在该时间段内发出的元素,直到用完然后再次等待。这可以从示例中的最后一个元素看出,它是在前一个元素之后 12 秒发出的。延长运行时持续时间是一个很好的方式来观察它的实际效果;您可能会看到发射了多批元素。
时间戳
如果您所需要的只是定期生成的原始数字,则上述行为非常有效。但是,如果您使用 GenerateSequence
来测试依赖于时间戳的管道,您将希望在源代码中设置自定义 TimestampFn to set the timestamps for each emitted element. The default TimestampFn 可能是一个很好的使用示例。一个可能对您有用的非常简单的方法是设置时间戳以匹配元素的值。