Dataflow sideInput 是否可以通过读取 gcs 存储桶按 window 更新?

Can Dataflow sideInput be updated per window by reading a gcs bucket?

我目前正在创建一个 PCollectionView,方法是从 gcs 存储桶中读取过滤信息并将其作为侧输入传递到我的管道的不同阶段以过滤输出。如果 gcs 存储桶中的文件发生变化,我希望当前 运行 管道使用这个新的过滤器信息。如果我的过滤器发生变化,有没有办法在每个新的 window 数据上更新此 PCollectionView?我以为我可以在 startBundle 中做到这一点,但我不知道如何或是否可能。可以的话可以举个例子吗

PCollectionView<Map<String, TagObject>> 
    tagMapView =
        pipeline.apply(TextIO.Read.named("TagListTextRead")
                                  .from("gs://tag-list-bucket/tag-list.json"))
                .apply(ParDo.named("TagsToTagMap").of(new Tags.BuildTagListMapFn()))
                .apply("MakeTagMapView", View.asSingleton());
PCollection<String> 
    windowedData =
        pipeline.apply(PubsubIO.Read.topic("myTopic"))
                .apply(Window.<String>into(
                              SlidingWindows.of(Duration.standardMinutes(15))
                                            .every(Duration.standardSeconds(31))));
PCollection<MY_DATA> 
    lineData = windowedData
        .apply(ParDo.named("ExtractJsonObject")
            .withSideInputs(tagMapView)
            .of(new ExtractJsonObjectFn()));

您可能想要 "use an at most a 1-minute-old version of the filter as a side input" 之类的东西(因为理论上文件可以频繁地、不可预测地并且独立于您的管道更改 - 所以没有办法真正使文件的更改与管道)。

这是我能够想出的(当然,相当笨拙的)解决方案。它依赖于侧面输入也隐含地由 window 键入的事实。在这个解决方案中,我们将创建一个侧输入 windowed 到 1 分钟固定 windows,其中每个 window 将包含标签映射的单个值,从过滤器派生window.

中的某个时刻的文件
PCollection<Long> ticks = p
  // Produce 1 "tick" per second
  .apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(1)))
  // Window the ticks into 1-minute windows
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  // Use an arbitrary per-window combiner to reduce to 1 element per window
  .apply(Count.globally());

// Produce a collection of tag maps, 1 per each 1-minute window
PCollectionView<TagMap> tagMapView = ticks
  .apply(MapElements.via((Long ignored) -> {
    ... manually read the json file as a TagMap ...
  }))
  .apply(View.asSingleton());

这种模式(加入缓慢变化的外部数据作为辅助输入)反复出现,我在这里提出的解决方案远非完美,我希望我们在编程模型中对此有更好的支持。我已经提交了一个 BEAM JIRA issue 来跟踪这个。