在输出 window 之前,如何将转换应用于无界 Apache Beam 管道的 window 中的所有元素?
How do you apply transformations to all elements in a window of an unbounded Apache Beam pipeline before outputting the window?
我正在编写一个数据流管道,它将从 Google Pub/Sub 读取并将数据写入 Google 云存储:
pipeline.apply(marketData)
.apply(ParDo.of(new PubsubMessageToByteArray()))
.apply(ParDo.of(new ByteArrayToString()))
.apply(ParDo.of(new StringToMarketData()))
.apply(ParDo.of(new AddTimestamps()))
.apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
.withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
.accumulatingFiredPanes())
.apply(ParDo.of(new MarketDataToCsv()))
.apply("Write File(s)", TextIO
.write()
.to(options.getOutputDirectory())
.withWindowedWrites()
.withNumShards(1)
.withFilenamePolicy(new WindowedFilenamePolicy(outputBaseDirectory))
.withHeader(csvHeader));
pipeline.run().waitUntilFinish();
我想在输出结果之前对window中的元素进行去重和排序。这与典型的 PTransform 不同,因为我希望在 window 结束后执行转换。
Pub/Sub 主题将有重复,因为多个工作人员正在生成相同的消息以防一个工作人员失败。如何在写入之前删除 window 中的所有重复项?我看到 Beam 版本 0.2 中存在 RemoveDuplicates class,但当前版本中不存在。
我知道在幕后,Beam 并行化了 worker 中的 PTransform。但是由于这个管道写入withNumShards(1)
,所以只有一个worker会写入最终结果。这意味着理论上,应该可以让该工作人员在写入之前应用重复数据删除转换。
Beam python sdk still has a RemoveDuplicates method,所以我可以在 Java 中重现该逻辑,但为什么它会被删除,除非有更好的方法?我想实现将是在某些 window 触发器之后执行的重复数据删除 ParDo。
编辑:GroupByKey and SortValues 看起来他们会做我需要的。我现在正在尝试使用它们。
重复数据删除部分的答案如下:
.apply(Distinct
// MarketData::key produces a String. Use withRepresentativeValue()
// because Apache beam deserializes Java objects into bytes, which
// could cause two equal objects to be interpreted as not equal. See
// org/apache/beam/sdk/transforms/Distinct.java for details.
.withRepresentativeValueFn(MarketData::key)
.withRepresentativeType(TypeDescriptor.of(String.class)))
下面是对元素进行排序和删除重复项的解决方案(以防还需要排序):
public static class DedupAndSortByTime extends
Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> {
@Override
public TreeSet<MarketData> createAccumulator() {
return new TreeSet<>(Comparator
.comparingLong(MarketData::getEventTime)
.thenComparing(MarketData::getOrderbookType));
}
@Override
public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) {
accum.add(input);
return accum;
}
@Override
public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) {
TreeSet<MarketData> merged = createAccumulator();
for (TreeSet<MarketData> accum : accums) {
merged.addAll(accum);
}
return merged;
}
@Override
public List<MarketData> extractOutput(TreeSet<MarketData> accum) {
return Lists.newArrayList(accum.iterator());
}
}
所以更新的管道是
// Pipeline
pipeline.apply(marketData)
.apply(ParDo.of(new MarketDataDoFns.PubsubMessageToByteArray()))
.apply(ParDo.of(new MarketDataDoFns.ByteArrayToString()))
.apply(ParDo.of(new MarketDataDoFns.StringToMarketDataAggregate()))
.apply(ParDo.of(new MarketDataDoFns.DenormalizeMarketDataAggregate()))
.apply(ParDo.of(new MarketDataDoFns.AddTimestamps()))
.apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
.withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
.accumulatingFiredPanes())
.apply(Combine.globally(new MarketDataCombineFn.DedupAndSortByTime()).withoutDefaults())
.apply(ParDo.of(new MarketDataDoFns.MarketDataToCsv()))
.apply("Write File(s)", TextIO
.write()
// This doesn't set the output directory as expected.
// "/output" gets stripped and I don't know why,
// so "/output" has to be added to the directory path
// within the FilenamePolicy.
.to(options.getOutputDirectory())
.withWindowedWrites()
.withNumShards(1)
.withFilenamePolicy(new MarketDataFilenamePolicy.WindowedFilenamePolicy(outputBaseDirectory))
.withHeader(csvHeader));
pipeline.run().waitUntilFinish();
我正在编写一个数据流管道,它将从 Google Pub/Sub 读取并将数据写入 Google 云存储:
pipeline.apply(marketData)
.apply(ParDo.of(new PubsubMessageToByteArray()))
.apply(ParDo.of(new ByteArrayToString()))
.apply(ParDo.of(new StringToMarketData()))
.apply(ParDo.of(new AddTimestamps()))
.apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
.withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
.accumulatingFiredPanes())
.apply(ParDo.of(new MarketDataToCsv()))
.apply("Write File(s)", TextIO
.write()
.to(options.getOutputDirectory())
.withWindowedWrites()
.withNumShards(1)
.withFilenamePolicy(new WindowedFilenamePolicy(outputBaseDirectory))
.withHeader(csvHeader));
pipeline.run().waitUntilFinish();
我想在输出结果之前对window中的元素进行去重和排序。这与典型的 PTransform 不同,因为我希望在 window 结束后执行转换。
Pub/Sub 主题将有重复,因为多个工作人员正在生成相同的消息以防一个工作人员失败。如何在写入之前删除 window 中的所有重复项?我看到 Beam 版本 0.2 中存在 RemoveDuplicates class,但当前版本中不存在。
我知道在幕后,Beam 并行化了 worker 中的 PTransform。但是由于这个管道写入withNumShards(1)
,所以只有一个worker会写入最终结果。这意味着理论上,应该可以让该工作人员在写入之前应用重复数据删除转换。
Beam python sdk still has a RemoveDuplicates method,所以我可以在 Java 中重现该逻辑,但为什么它会被删除,除非有更好的方法?我想实现将是在某些 window 触发器之后执行的重复数据删除 ParDo。
编辑:GroupByKey and SortValues 看起来他们会做我需要的。我现在正在尝试使用它们。
重复数据删除部分的答案如下:
.apply(Distinct
// MarketData::key produces a String. Use withRepresentativeValue()
// because Apache beam deserializes Java objects into bytes, which
// could cause two equal objects to be interpreted as not equal. See
// org/apache/beam/sdk/transforms/Distinct.java for details.
.withRepresentativeValueFn(MarketData::key)
.withRepresentativeType(TypeDescriptor.of(String.class)))
下面是对元素进行排序和删除重复项的解决方案(以防还需要排序):
public static class DedupAndSortByTime extends
Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> {
@Override
public TreeSet<MarketData> createAccumulator() {
return new TreeSet<>(Comparator
.comparingLong(MarketData::getEventTime)
.thenComparing(MarketData::getOrderbookType));
}
@Override
public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) {
accum.add(input);
return accum;
}
@Override
public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) {
TreeSet<MarketData> merged = createAccumulator();
for (TreeSet<MarketData> accum : accums) {
merged.addAll(accum);
}
return merged;
}
@Override
public List<MarketData> extractOutput(TreeSet<MarketData> accum) {
return Lists.newArrayList(accum.iterator());
}
}
所以更新的管道是
// Pipeline
pipeline.apply(marketData)
.apply(ParDo.of(new MarketDataDoFns.PubsubMessageToByteArray()))
.apply(ParDo.of(new MarketDataDoFns.ByteArrayToString()))
.apply(ParDo.of(new MarketDataDoFns.StringToMarketDataAggregate()))
.apply(ParDo.of(new MarketDataDoFns.DenormalizeMarketDataAggregate()))
.apply(ParDo.of(new MarketDataDoFns.AddTimestamps()))
.apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
.withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
.accumulatingFiredPanes())
.apply(Combine.globally(new MarketDataCombineFn.DedupAndSortByTime()).withoutDefaults())
.apply(ParDo.of(new MarketDataDoFns.MarketDataToCsv()))
.apply("Write File(s)", TextIO
.write()
// This doesn't set the output directory as expected.
// "/output" gets stripped and I don't know why,
// so "/output" has to be added to the directory path
// within the FilenamePolicy.
.to(options.getOutputDirectory())
.withWindowedWrites()
.withNumShards(1)
.withFilenamePolicy(new MarketDataFilenamePolicy.WindowedFilenamePolicy(outputBaseDirectory))
.withHeader(csvHeader));
pipeline.run().waitUntilFinish();