在输出 window 之前,如何将转换应用于无界 Apache Beam 管道的 window 中的所有元素?

How do you apply transformations to all elements in a window of an unbounded Apache Beam pipeline before outputting the window?

我正在编写一个数据流管道,它将从 Google Pub/Sub 读取并将数据写入 Google 云存储:

    pipeline.apply(marketData)
        .apply(ParDo.of(new PubsubMessageToByteArray()))
        .apply(ParDo.of(new ByteArrayToString()))
        .apply(ParDo.of(new StringToMarketData()))
        .apply(ParDo.of(new AddTimestamps()))
        .apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
                .withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
                .accumulatingFiredPanes())
        .apply(ParDo.of(new MarketDataToCsv()))
        .apply("Write File(s)", TextIO
                .write()
                .to(options.getOutputDirectory())
                .withWindowedWrites()
                .withNumShards(1)
                .withFilenamePolicy(new WindowedFilenamePolicy(outputBaseDirectory))
                .withHeader(csvHeader));

    pipeline.run().waitUntilFinish();

我想在输出结果之前对window中的元素进行去重和排序。这与典型的 PTransform 不同,因为我希望在 window 结束后执行转换。

Pub/Sub 主题将有重复,因为多个工作人员正在生成相同的消息以防一个工作人员失败。如何在写入之前删除 window 中的所有重复项?我看到 Beam 版本 0.2 中存在 RemoveDuplicates class,但当前版本中不存在。

我知道在幕后,Beam 并行化了 worker 中的 PTransform。但是由于这个管道写入withNumShards(1),所以只有一个worker会写入最终结果。这意味着理论上,应该可以让该工作人员在写入之前应用重复数据删除转换。

Beam python sdk still has a RemoveDuplicates method,所以我可以在 Java 中重现该逻辑,但为什么它会被删除,除非有更好的方法?我想实现将是在某些 window 触发器之后执行的重复数据删除 ParDo。

编辑:GroupByKey and SortValues 看起来他们会做我需要的。我现在正在尝试使用它们。

重复数据删除部分的答案如下:

.apply(Distinct
 // MarketData::key produces a String. Use withRepresentativeValue() 
 // because Apache beam deserializes Java objects into bytes, which 
 // could cause two equal objects to be interpreted as not equal. See 
 // org/apache/beam/sdk/transforms/Distinct.java for details. 
 .withRepresentativeValueFn(MarketData::key)
 .withRepresentativeType(TypeDescriptor.of(String.class)))

下面是对元素进行排序和删除重复项的解决方案(以防还需要排序):

public static class DedupAndSortByTime extends 
        Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> {
    @Override
    public TreeSet<MarketData> createAccumulator() {
        return new TreeSet<>(Comparator
                .comparingLong(MarketData::getEventTime)
                .thenComparing(MarketData::getOrderbookType));
    }

    @Override
    public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) {
        accum.add(input);
        return accum;
    }

    @Override
    public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) {

        TreeSet<MarketData> merged = createAccumulator();
        for (TreeSet<MarketData> accum : accums) {
            merged.addAll(accum);
        }
        return merged;
    }

    @Override
    public List<MarketData> extractOutput(TreeSet<MarketData> accum) {
        return Lists.newArrayList(accum.iterator());
    }
}

所以更新的管道是

    // Pipeline
    pipeline.apply(marketData)
        .apply(ParDo.of(new MarketDataDoFns.PubsubMessageToByteArray()))
        .apply(ParDo.of(new MarketDataDoFns.ByteArrayToString()))
        .apply(ParDo.of(new MarketDataDoFns.StringToMarketDataAggregate()))
        .apply(ParDo.of(new MarketDataDoFns.DenormalizeMarketDataAggregate()))
        .apply(ParDo.of(new MarketDataDoFns.AddTimestamps()))
        .apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
                .withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
                .accumulatingFiredPanes())
        .apply(Combine.globally(new MarketDataCombineFn.DedupAndSortByTime()).withoutDefaults())
        .apply(ParDo.of(new MarketDataDoFns.MarketDataToCsv()))
        .apply("Write File(s)", TextIO
                .write()
                // This doesn't set the output directory as expected. 
                // "/output" gets stripped and I don't know why,
                // so "/output" has to be added to the directory path 
                // within the FilenamePolicy.
                .to(options.getOutputDirectory())
                .withWindowedWrites()
                .withNumShards(1)
                .withFilenamePolicy(new MarketDataFilenamePolicy.WindowedFilenamePolicy(outputBaseDirectory))
                .withHeader(csvHeader));

    pipeline.run().waitUntilFinish();