从数据流管道选项中获取独立的参数字符串
Get independent parameter string from Dataflow Pipeline Options
我正在尝试从 Airflow 触发 Google Dataflow 作业,需要帮助从 Airflow 发送一个字符串作为参数,该字符串可以在 Dataflow 中读取并用作独立字符串。
这是我的 DataflowTemplateOperator
的代码,它发送名为 secretCode
的参数:
DataflowTemplateOperator(
task_id=TASK_ID,
job_name=JOB_NAME,
template=TEMPLATE_PATH,
parameters={
"secretCode": "123456"
},
dag=dag
)
我想将 PipelineOptions
中的 secretCode
读作 String
发送给以下 ParDo
,但我不知道该怎么做。该代码与ParDo
class的输入输出没有任何关系。我只想将代码写入 BigQuery。
val dataToTableRow: PCollection<TableRow> = myCustomDataStructure.apply(
"transform my data to table row",
ParDo.of(DataToTableRow())
)
我想将从 PipelineOptions
返回的密码写入 BigQuery,如下面的代码所示,但我不知道如何到达那里:
class DataToTableRow : DoFn<myCustomDataStructure, TableRow>() {
@ProcessElement
fun processElement(@Element myData: myCustomDataStructure, outputReceiver: OutputReceiver<TableRow>) {
outputReceiver.output(getTableRow(myData))
}
private fun getTableRow(myData: myCustomDataStructure): TableRow {
return TableRow().set("ID", myData.id)
.set("SecretCode", secretCode)
}
}
对于如何解决此问题,我将不胜感激。提前致谢。
您需要创建自己的接口来扩展 PipelineOptions 并在此处设置参数。
public interface SecretOptions extends PipelineOptions {
String getSecretCode();
void setSecretCode(String secretCode);
}
然后,像这样在您的管道上注册您的接口:
PipelineOptionsFactory.register(SecretOptions.class);
SecretOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(SecretOptions.class);
然后您就可以使用 options.getSecretCode();
在管道的任何位置访问您的参数
的更多信息
我正在尝试从 Airflow 触发 Google Dataflow 作业,需要帮助从 Airflow 发送一个字符串作为参数,该字符串可以在 Dataflow 中读取并用作独立字符串。
这是我的 DataflowTemplateOperator
的代码,它发送名为 secretCode
的参数:
DataflowTemplateOperator(
task_id=TASK_ID,
job_name=JOB_NAME,
template=TEMPLATE_PATH,
parameters={
"secretCode": "123456"
},
dag=dag
)
我想将 PipelineOptions
中的 secretCode
读作 String
发送给以下 ParDo
,但我不知道该怎么做。该代码与ParDo
class的输入输出没有任何关系。我只想将代码写入 BigQuery。
val dataToTableRow: PCollection<TableRow> = myCustomDataStructure.apply(
"transform my data to table row",
ParDo.of(DataToTableRow())
)
我想将从 PipelineOptions
返回的密码写入 BigQuery,如下面的代码所示,但我不知道如何到达那里:
class DataToTableRow : DoFn<myCustomDataStructure, TableRow>() {
@ProcessElement
fun processElement(@Element myData: myCustomDataStructure, outputReceiver: OutputReceiver<TableRow>) {
outputReceiver.output(getTableRow(myData))
}
private fun getTableRow(myData: myCustomDataStructure): TableRow {
return TableRow().set("ID", myData.id)
.set("SecretCode", secretCode)
}
}
对于如何解决此问题,我将不胜感激。提前致谢。
您需要创建自己的接口来扩展 PipelineOptions 并在此处设置参数。
public interface SecretOptions extends PipelineOptions {
String getSecretCode();
void setSecretCode(String secretCode);
}
然后,像这样在您的管道上注册您的接口:
PipelineOptionsFactory.register(SecretOptions.class);
SecretOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(SecretOptions.class);
然后您就可以使用 options.getSecretCode();