数据流:管道完成后访问价值提供者
Dataflow: Accessing a value provider after the pipeline is complete
我正在尝试在我的管道完成后进行滞后更新,其中由于日期版本控制,表在 运行 时间传入。由于此代码是作为模板执行的,因此需要使用 nestedValueProviders。
public interface DataQueryRunnerOptions extends DataflowPipelineOptions {
@Description("Table to read/write payload data.")
@Default.String("test.payloadData")
ValueProvider<String> getPayloadTable();
@Description("Table to read eligibility data from, and update with payloadData")
@Default.String("test.dqr_test_eligibilities")
ValueProvider<String> getEligibilityInputTable();
}
管道中的用法:
campaignIdToDataQueryMap.apply("RunDataQueries", ParDo.of(new RunDataQueries()))
.apply("WritePayloadDataToTable", BigQueryIO.writeTableRows()
.withSchema(getPayloadDataSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.to(options.getPayloadTable()));
然后我在管道之后调用代码:
pipeline.run().waitUntilFinish();
runFinalUpdate(options);
对于 运行FinalUpdate 方法:
private static void runFinalUpdate(DataQueryRunner2Options options) {
ValueProvider.NestedValueProvider eligTable = ValueProvider.NestedValueProvider.of(
options.getEligibilityInputTable(),
(SerializableFunction<String, String>) eligibilityInputTable -> options.getEligibilityInputTable().get()
);
ValueProvider.NestedValueProvider payloadTable = ValueProvider.NestedValueProvider.of(
options.getPayloadTable(),
(SerializableFunction<String, String>) payload -> options.getPayloadTable().get()
);
String finalUpdate = "UPDATE " + eligTable.get() + " elig SET elig.dataQueryPayload = (SELECT pd.dataQueryPayload FROM `"
+ payloadTable.get() + "` pd WHERE pd.numericId = elig.numericId and pd.campaignId = elig.campaignId)"
+ " WHERE elig.dataQueryPayload IS NULL";
try {
Utilities.runQuery(finalUpdate);
} catch (InterruptedException e) {
LOG.error("Final update failure: " + e.getMessage());
e.printStackTrace();
}
}
这给出了错误:
java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=eligibilityInputTable, default=test.dqr_test_eligibilities}
如何在管道外访问此值 运行?管道完成后,是否有更好的方法来完成 "once only" 工作?
ValueProvider 接口允许管道接受运行时参数。为了访问这些值以用于报告/日志记录目的,您需要在 Beam DAG 中访问它们。一个潜在的解决方案是在您的管道中创建一个报告分支,该分支采用单个虚拟值并在 DoFn 中 'processes' 此虚拟值将选项导出到外部存储。
Java(SDK 2.9.0):
public interface YourOptions extends PipelineOptions {
@Description("Your option")
@Default.String("Hello World!")
ValueProvider<String> getStringValue();
void setStringValue(ValueProvider<String> value);
}
public static void main(String[] args) {
// Create pipeline
YourOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(YourOptions.class);
Pipeline p = Pipeline.create(options);
// Branch for pushing the Value Provider value
p.apply(Create.of(1)).apply(ParDo.of(new DoFn<Integer, Integer>() {
@ProcessElement public void process(ProcessContext c) {
YourOptions ops = c.getPipelineOptions().as(YourOptions.class);
// Do something like push to DB here....
LOG.info("Option StringValue was {}" , ops.getStringValue());
}
}));
// The main pipeline....
p.apply(Create.of(1,2,3,4)).apply(Sum.integersGlobally());
p.run();
}
我正在尝试在我的管道完成后进行滞后更新,其中由于日期版本控制,表在 运行 时间传入。由于此代码是作为模板执行的,因此需要使用 nestedValueProviders。
public interface DataQueryRunnerOptions extends DataflowPipelineOptions {
@Description("Table to read/write payload data.")
@Default.String("test.payloadData")
ValueProvider<String> getPayloadTable();
@Description("Table to read eligibility data from, and update with payloadData")
@Default.String("test.dqr_test_eligibilities")
ValueProvider<String> getEligibilityInputTable();
}
管道中的用法:
campaignIdToDataQueryMap.apply("RunDataQueries", ParDo.of(new RunDataQueries()))
.apply("WritePayloadDataToTable", BigQueryIO.writeTableRows()
.withSchema(getPayloadDataSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.to(options.getPayloadTable()));
然后我在管道之后调用代码:
pipeline.run().waitUntilFinish();
runFinalUpdate(options);
对于 运行FinalUpdate 方法:
private static void runFinalUpdate(DataQueryRunner2Options options) {
ValueProvider.NestedValueProvider eligTable = ValueProvider.NestedValueProvider.of(
options.getEligibilityInputTable(),
(SerializableFunction<String, String>) eligibilityInputTable -> options.getEligibilityInputTable().get()
);
ValueProvider.NestedValueProvider payloadTable = ValueProvider.NestedValueProvider.of(
options.getPayloadTable(),
(SerializableFunction<String, String>) payload -> options.getPayloadTable().get()
);
String finalUpdate = "UPDATE " + eligTable.get() + " elig SET elig.dataQueryPayload = (SELECT pd.dataQueryPayload FROM `"
+ payloadTable.get() + "` pd WHERE pd.numericId = elig.numericId and pd.campaignId = elig.campaignId)"
+ " WHERE elig.dataQueryPayload IS NULL";
try {
Utilities.runQuery(finalUpdate);
} catch (InterruptedException e) {
LOG.error("Final update failure: " + e.getMessage());
e.printStackTrace();
}
}
这给出了错误:
java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=eligibilityInputTable, default=test.dqr_test_eligibilities}
如何在管道外访问此值 运行?管道完成后,是否有更好的方法来完成 "once only" 工作?
ValueProvider 接口允许管道接受运行时参数。为了访问这些值以用于报告/日志记录目的,您需要在 Beam DAG 中访问它们。一个潜在的解决方案是在您的管道中创建一个报告分支,该分支采用单个虚拟值并在 DoFn 中 'processes' 此虚拟值将选项导出到外部存储。
Java(SDK 2.9.0):
public interface YourOptions extends PipelineOptions {
@Description("Your option")
@Default.String("Hello World!")
ValueProvider<String> getStringValue();
void setStringValue(ValueProvider<String> value);
}
public static void main(String[] args) {
// Create pipeline
YourOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(YourOptions.class);
Pipeline p = Pipeline.create(options);
// Branch for pushing the Value Provider value
p.apply(Create.of(1)).apply(ParDo.of(new DoFn<Integer, Integer>() {
@ProcessElement public void process(ProcessContext c) {
YourOptions ops = c.getPipelineOptions().as(YourOptions.class);
// Do something like push to DB here....
LOG.info("Option StringValue was {}" , ops.getStringValue());
}
}));
// The main pipeline....
p.apply(Create.of(1,2,3,4)).apply(Sum.integersGlobally());
p.run();
}