如何从一个 DoFn 输出单个值并将其用作另一个 DoFn 中的参数?

How do I output a single value from a DoFn and use that as a parameter in another DoFn?

我有一个 pub/sub 用行分隔的 json。每条 pub/sub 消息都有一个属性值,其中包含要写入的 bigquery table 名称。

如何获取个人 table 名称值,并将其传递给新管道?

是否可以创建一个新的 PCollection 并应用它...从 DoFn 本身内部?

您可以应用转换来检索 DoFn 中的 table 名称,并向下游传递 KV<tableName, record>。然后使用 BigQueryIO 中的动态目标支持将每条记录路由到正确的目标。或者,您也可以在 BigQuery.withFormatFunction() 中检索 table 属性。下面是一个这样做的例子。

这是整个管道结构,其中 JSON 消息从 Pub/Sub 中被消耗,然后根据 Pub/Sub 消息属性路由到正确的 table 目的地。同样,您可以更改 getTableDestination(..) 逻辑以从 JSON 记录中检索 table 名称。

您可以查看整个示例 here

  /**
   * Runs the pipeline to completion with the specified options. This method does not wait until the
   * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
   * object to block until the pipeline is finished running if blocking programmatic execution is
   * required.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(Options options) {

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    // Retrieve non-serializable parameters
    String tableNameAttr = options.getTableNameAttr();
    String outputTableProject = options.getOutputTableProject();
    String outputTableDataset = options.getOutputTableDataset();

    // Build & execute pipeline
    pipeline
        .apply(
            "ReadMessages",
            PubsubIO.readMessagesWithAttributes().fromSubscription(options.getSubscription()))
        .apply(
            "WriteToBigQuery",
            BigQueryIO.<PubsubMessage>write()
                .to(
                    input ->
                        getTableDestination(
                            input,
                            tableNameAttr,
                            outputTableProject,
                            outputTableDataset))
                .withFormatFunction(
                    (PubsubMessage msg) -> convertJsonToTableRow(new String(msg.getPayload())))
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND));

    return pipeline.run();
  }

  /**
   * Retrieves the {@link TableDestination} for the {@link PubsubMessage} by extracting and
   * formatting the value of the {@code tableNameAttr} attribute. If the message is null, a {@link
   * RuntimeException} will be thrown because the message is unable to be routed.
   *
   * @param value The message to extract the table name from.
   * @param tableNameAttr The name of the attribute within the message which contains the table
   *     name.
   * @param outputProject The project which the table resides.
   * @param outputDataset The dataset which the table resides.
   * @return The destination to route the input message to.
   */
  @VisibleForTesting
   static TableDestination getTableDestination(
      ValueInSingleWindow<PubsubMessage> value,
      String tableNameAttr,
      String outputProject,
      String outputDataset) {
    PubsubMessage message = value.getValue();

    TableDestination destination;
    if (message != null) {
      destination =
          new TableDestination(
              String.format(
                  "%s:%s.%s",
                  outputProject, outputDataset, message.getAttributeMap().get(tableNameAttr)),
              null);
    } else {
      throw new RuntimeException(
          "Cannot retrieve the dynamic table destination of an null message!");
    }

    return destination;
  }