光束侧输入模式逻辑说明

Beam Side Input Pattern Logic Explaination

在官方的Beam编程指南中,介绍了当我们有侧输入时应该使用的模式。我很好奇为什么模式是这样的。请参阅下面的标准代码作为示例。

查看此处的代码片段:

      ParDo.of(
          new DoFn<Long, KV<Long, Long>>() {

            @ProcessElement
            public void process(ProcessContext c) {
              Map<String, String> keyMap = c.sideInput(map);
              c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());

              LOG.debug(
                  "Value is {}, key A is {}, and key B is {}.",
                  c.element(),
                  keyMap.get("Key_A"),
                  keyMap.get("Key_B"));
            }
          })
      .withSideInputs(map))

Beam 在代码末尾有 .withSideInputs(map) 表示此代码块中有侧输入。在块的中间,我们有 Map<String, String> keyMap = c.sideInput(map); 对我来说这很奇怪,因为我们在最后让代码知道我们有一个侧输入,但我们在块的中间使用它。这是为什么?为什么要在末尾标明,中间却可以用?

附件是 Beam 提供的使用此模式的完整示例。

public static void sideInputPatterns() {
  // This pipeline uses View.asSingleton for a placeholder external service.
  // Run in debug mode to see the output.
  Pipeline p = Pipeline.create();

  // Create a side input that updates each second.
  PCollectionView<Map<String, String>> map =
      p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
          .apply(
              Window.<Long>into(new GlobalWindows())
                  .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                  .discardingFiredPanes())
          .apply(
              ParDo.of(
                  new DoFn<Long, Map<String, String>>() {

                    @ProcessElement
                    public void process(
                        @Element Long input, OutputReceiver<Map<String, String>> o) {
                      // Replace map with test data from the placeholder external service.
                      // Add external reads here.
                      o.output(PlaceholderExternalService.readTestData());
                    }
                  }))
          .apply(View.asSingleton());

  // Consume side input. GenerateSequence generates test data.
  // Use a real source (like PubSubIO or KafkaIO) in production.
  p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
      .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
      .apply(Sum.longsGlobally().withoutDefaults())
      .apply(
          ParDo.of(
                  new DoFn<Long, KV<Long, Long>>() {

                    @ProcessElement
                    public void process(ProcessContext c) {
                      Map<String, String> keyMap = c.sideInput(map);
                      c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());

                      LOG.debug(
                          "Value is {}, key A is {}, and key B is {}.",
                          c.element(),
                          keyMap.get("Key_A"),
                          keyMap.get("Key_B"));
                    }
                  })
              .withSideInputs(map));
}

/** Placeholder class that represents an external service generating test data. */
public static class PlaceholderExternalService {

  public static Map<String, String> readTestData() {

    Map<String, String> map = new HashMap<>();
    Instant now = Instant.now();

    DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");

    map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
    map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());

    return map;
  }
}

Apache Beam 模型是一种有趣的混合 declarative/procedural 编程。当与 SDK 交互时,您正在幕后构建一个转换的有向无环图。有一些特殊的转换 (ParDos) 允许程序员添加无法使用 Apache Beam 原语捕获的特殊逻辑。

就 SideInputs 而言,图中所声明的有点像“Promise”或“Future”。您声明此 ParDo 将在将来 运行 时收到一个您可以使用的 SideInput。这就是为什么你可以“在中间使用它”,尽管它是在 ParDo 的末尾“声明”的;该代码仅在图中定义一个节点,但不执行该图本身。

如果您想查看 SideInputs 的技术规范,可以阅读 design document