Google 数据流中的管道初始化步骤
Pipeline initialization step in Google Dataflow
我需要在管道获取其输入数据之前清除 table,我希望此步骤 运行 作为管道本身的一部分,在云中,而不是在本地。
这是代码目前的样子,clearTable()
运行 在本地:
exactTargetIntegration.clearTable(); // runs locally
Pipeline p = Pipeline.create(options);
PCollection<String> readFromFile =
p.apply(TextIO.Read.from(INPUT_FILES)); // runs in the cloud
...
可能吗?
目前没有办法确保在同一管道内的读取之前发生某些操作。如果您需要在云中 运行 进行操作,您可以 运行 它作为一个单独的管道,在第一个之前 运行s。
例如:
DataflowPipelineOptions options = ...
options.setRunner(BlockingDataflowPipelineRunner.class);
Pipeline deletePipeline = <build deletion pipeline>
Pipeline mainPipeline = <build main pipeline>
deletePipeline.run(options);
mainPipeline.run(options);
此外,这个用例绝对是我们想要支持的;您可以在此处跟踪问题:https://issues.apache.org/jira/browse/BEAM-65
我需要在管道获取其输入数据之前清除 table,我希望此步骤 运行 作为管道本身的一部分,在云中,而不是在本地。
这是代码目前的样子,clearTable()
运行 在本地:
exactTargetIntegration.clearTable(); // runs locally
Pipeline p = Pipeline.create(options);
PCollection<String> readFromFile =
p.apply(TextIO.Read.from(INPUT_FILES)); // runs in the cloud
...
可能吗?
目前没有办法确保在同一管道内的读取之前发生某些操作。如果您需要在云中 运行 进行操作,您可以 运行 它作为一个单独的管道,在第一个之前 运行s。
例如:
DataflowPipelineOptions options = ...
options.setRunner(BlockingDataflowPipelineRunner.class);
Pipeline deletePipeline = <build deletion pipeline>
Pipeline mainPipeline = <build main pipeline>
deletePipeline.run(options);
mainPipeline.run(options);
此外,这个用例绝对是我们想要支持的;您可以在此处跟踪问题:https://issues.apache.org/jira/browse/BEAM-65