如何计算每个 window 的元素
How to count elements per window
我正在尝试解决看似简单的问题 -- 计算每个 window 的 PCollection 中有多少个元素。我需要它在写入时传递给 .withSharding() 函数,以创建与要写入的文件一样多的分片。
我试过:
FileIO.writeDynamic<Long, E>()
.withDestinationCoder(AvroCoder.of(Long::class.java))
.by { e -> e.key }
.via(Contextful.fn(MySerFunction()))
.withNaming({ key -> MyFileNaming() })
.withSharding(ShardingFn())
.to("gs://some-output")
class ShardingFn : PTransform<PCollection<E>>, PCollectionView<Int>>() {
override fun expand(input: PCollection<E>): PCollectionView<Int> {
val keys: PCollection<Long> = input.apply(Keys.create())
// This only works with GlobalWindowing, how to count per window?
val count: PCollection<Long> = keys.apply(Count.globally())
val int: PCollection<Int> = count.apply(MapElements.via(Long2Int))
return int.apply(View.asSingleton())
}
但是,这仅在我具有全局 windowing(又名 "batch mode")时有效,否则 Count.globally() 将抛出异常。
也许我写错了,但如果出于其他原因我想按 window 计算元素,该怎么做?
要计算每个 window 的数据,您必须使用时间戳(如果数据中有 none,则添加时间戳)然后计算它们。我建议查看此 example,因为它详细说明了如何操作。
使用 Combine.globally(Count.<T>combineFn()).withoutDefaults()
而不是 Count.globally()
应该适用于您的情况。这也可以在 Javadoc 中找到:https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/transforms/Count.html#globally--
我正在尝试解决看似简单的问题 -- 计算每个 window 的 PCollection 中有多少个元素。我需要它在写入时传递给 .withSharding() 函数,以创建与要写入的文件一样多的分片。
我试过:
FileIO.writeDynamic<Long, E>()
.withDestinationCoder(AvroCoder.of(Long::class.java))
.by { e -> e.key }
.via(Contextful.fn(MySerFunction()))
.withNaming({ key -> MyFileNaming() })
.withSharding(ShardingFn())
.to("gs://some-output")
class ShardingFn : PTransform<PCollection<E>>, PCollectionView<Int>>() {
override fun expand(input: PCollection<E>): PCollectionView<Int> {
val keys: PCollection<Long> = input.apply(Keys.create())
// This only works with GlobalWindowing, how to count per window?
val count: PCollection<Long> = keys.apply(Count.globally())
val int: PCollection<Int> = count.apply(MapElements.via(Long2Int))
return int.apply(View.asSingleton())
}
但是,这仅在我具有全局 windowing(又名 "batch mode")时有效,否则 Count.globally() 将抛出异常。
也许我写错了,但如果出于其他原因我想按 window 计算元素,该怎么做?
要计算每个 window 的数据,您必须使用时间戳(如果数据中有 none,则添加时间戳)然后计算它们。我建议查看此 example,因为它详细说明了如何操作。
使用 Combine.globally(Count.<T>combineFn()).withoutDefaults()
而不是 Count.globally()
应该适用于您的情况。这也可以在 Javadoc 中找到:https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/transforms/Count.html#globally--