在批处理执行中提取聚合器值

Extract Aggregator values in Batch Execution

是否有任何方法可以在数据流批处理执行后以编程方式提取聚合器的最终值?

基于DirectePipelineRunnerclass,我写了如下方法。它似乎有效,但对于动态创建的计数器,它给出的值与控制台输出中显示的值不同。

PS。如果有帮助,我假设聚合器基于 Long 值,具有求和组合函数。

public static Map<String, Object> extractAllCounters(Pipeline p, PipelineResult pr)
{
    AggregatorPipelineExtractor aggregatorExtractor = new AggregatorPipelineExtractor(p);
    Map<String, Object> results = new HashMap<>();

    for (Map.Entry<Aggregator<?, ?>, Collection<PTransform<?, ?>>> e :
            aggregatorExtractor.getAggregatorSteps().entrySet()) {
        Aggregator agg = e.getKey();
        try {
            results.put(agg.getName(), pr.getAggregatorValues(agg).getTotalValue(agg.getCombineFn()));
        } catch(AggregatorRetrievalException|IllegalArgumentException aggEx) {
            //System.err.println("Can't extract " + agg.getName() + ": " + aggEx.getMessage());
        }
    }

    return results;
}

聚合器的值应该在 PipelineResult 中可用。例如:

CountOddsFn countOdds = new CountOddsFn();
pipeline
  .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100))
  .apply(ParDo.of(countOdds));
PipelineResult result = pipeline.run();
// Here you may need to use the BlockingDataflowPipelineRunner 

AggregatorValues<Integer> values =
result.getAggregatorValues(countOdds.aggregator);
Map<String, Integer> valuesAtSteps = values.getValuesAtSteps();
// Now read the values from the step...

报告聚合器的示例 DoFn

private static class CountOddsFn extends DoFn<Integer, Void> {

  Aggregator<Integer, Integer> aggregator =
    createAggregator("odds", new SumIntegerFn());

  @Override
  public void processElement(ProcessContext c) throws Exception {
    if (c.element() % 2 == 1) {
      aggregator.addValue(1);
    }
  }
}