从数据流管道选项中获取独立的参数字符串

Get independent parameter string from Dataflow Pipeline Options

我正在尝试从 Airflow 触发 Google Dataflow 作业,需要帮助从 Airflow 发送一个字符串作为参数,该字符串可以在 Dataflow 中读取并用作独立字符串。

这是我的 DataflowTemplateOperator 的代码,它发送名为 secretCode 的参数:

        DataflowTemplateOperator(
            task_id=TASK_ID,
            job_name=JOB_NAME,
            template=TEMPLATE_PATH,
            parameters={
                "secretCode": "123456"
            },
            dag=dag
        )

我想将 PipelineOptions 中的 secretCode 读作 String 发送给以下 ParDo,但我不知道该怎么做。该代码与ParDoclass的输入输出没有任何关系。我只想将代码写入 BigQuery。

    val dataToTableRow: PCollection<TableRow> = myCustomDataStructure.apply(
        "transform my data to table row",
        ParDo.of(DataToTableRow())
    )

我想将从 PipelineOptions 返回的密码写入 BigQuery,如下面的代码所示,但我不知道如何到达那里:

   class DataToTableRow : DoFn<myCustomDataStructure, TableRow>() { 
      @ProcessElement
      fun processElement(@Element myData: myCustomDataStructure, outputReceiver: OutputReceiver<TableRow>) {
          outputReceiver.output(getTableRow(myData))
      }

      private fun getTableRow(myData: myCustomDataStructure): TableRow {
          return TableRow().set("ID", myData.id)
                           .set("SecretCode", secretCode)
      }
   }

对于如何解决此问题,我将不胜感激。提前致谢。

您需要创建自己的接口来扩展 PipelineOptions 并在此处设置参数。

  public interface SecretOptions extends PipelineOptions {
    String getSecretCode();
    void setSecretCode(String secretCode);
  }

然后,像这样在您的管道上注册您的接口:

PipelineOptionsFactory.register(SecretOptions.class);
SecretOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(SecretOptions.class);

然后您就可以使用 options.getSecretCode();

在管道的任何位置访问您的参数

有关 documentation

的更多信息