是否可以在当前正在执行的管道中创建另一个 DatastoreV1.Read?
Is it possible to create another DatastoreV1.Read inside a currently executing pipeline?
我知道可以在开始时使用 PCollections、KV 和 CoGbkResult 创建两个查询,但您必须在 ParDos/DoFns 之外指定 DatastoreIO.Reads。是否可以这样做:
Pipeline pipeline = Pipeline.create(options);
PubsubIO.Read.Bound<String> pubsub = PubsubIO.Read.named("ReadFromPubsub")
.subscription(subscriptionPath);
pipeline.apply(pubsub).apply("Query DB with UserId" , ParDo.of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String userId = c.element().toString();
//Query database using userId....
}
}));
pipeline.run();
是的。您可以将任何您喜欢的任意代码放入 ParDo
中,它会在出现任何问题时执行。但是,请记住,您的代码将 运行 在数据流分布式环境中,因此您不能共享状态,取决于顺序等。
我知道可以在开始时使用 PCollections、KV 和 CoGbkResult 创建两个查询,但您必须在 ParDos/DoFns 之外指定 DatastoreIO.Reads。是否可以这样做:
Pipeline pipeline = Pipeline.create(options);
PubsubIO.Read.Bound<String> pubsub = PubsubIO.Read.named("ReadFromPubsub")
.subscription(subscriptionPath);
pipeline.apply(pubsub).apply("Query DB with UserId" , ParDo.of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String userId = c.element().toString();
//Query database using userId....
}
}));
pipeline.run();
是的。您可以将任何您喜欢的任意代码放入 ParDo
中,它会在出现任何问题时执行。但是,请记住,您的代码将 运行 在数据流分布式环境中,因此您不能共享状态,取决于顺序等。