是否可以在当前正在执行的管道中创建另一个 DatastoreV1.Read?

Is it possible to create another DatastoreV1.Read inside a currently executing pipeline?

我知道可以在开始时使用 PCollections、KV 和 CoGbkResult 创建两个查询,但您必须在 ParDos/DoFns 之外指定 DatastoreIO.Reads。是否可以这样做:

    Pipeline pipeline = Pipeline.create(options);

    PubsubIO.Read.Bound<String> pubsub = PubsubIO.Read.named("ReadFromPubsub")
            .subscription(subscriptionPath);

    pipeline.apply(pubsub).apply("Query DB with UserId" , ParDo.of(new DoFn<String, String>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            String userId = c.element().toString();

            //Query database using userId....
        }
    }));

    pipeline.run();

是的。您可以将任何您喜欢的任意代码放入 ParDo 中,它会在出现任何问题时执行。但是,请记住,您的代码将 运行 在数据流分布式环境中,因此您不能共享状态,取决于顺序等。