如何有条件地将 Transform 应用于 PCollection?

How to apply PTransform to PCollection conditionnaly?

我有一个 PCollection,如果某个条件得到验证,我想应用自定义 PTransform(该条件不依赖于 Pcollection内容)
示例: 我有日志,如果 PipelineOptions 中提供了日期,我想在该日期进行过滤。

目前,我的最佳解决方案是:

// Read File
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput()))
if(!date.equals("")){
    logs = logs.apply("FilterOnDate", ParDo.of(new DateFilterFn(date)));
}
logs = logs.apply(...

它可以工作,但我不喜欢重新分配日志。更何况,我不喜欢打破apply的链条。这看起来不像是优雅的方式。

是否有某种条件PTransform?或者如果没有,将条件检查放在 PTransform 中并在未验证的情况下输出所有内容会更有效吗?

梦想例子:

PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput()))
    .applyIf("FilterOnDate", ParDo.of(new DateFilterFn(date)), !date.equals(""))
    .apply(...

不幸的是,Beam 没有任何类似的 applyIf, 您当前的方法是进行此类条件过滤的一般方法。

PTransform 中的条件检查为每个元素添加了一个额外的操作,这将根据检查类型影响性能。

如果可能,最好避免从管道进行转换,而不是使 PTransform 更复杂。

从代码美学的角度来看,可以使用wrapper transform有条件地应用相关的filter pardo。 示例:

public static class ConditionallyFilter
      extends PTransform<PCollection<LogRow>, PCollection<LogRow>> {
  private final String date;
  public ConditionallyFilter(String date){
    this.date = date;
  }
  @Override
  public PCollection<LogRow> expand(PCollection<LogRow> logs) {
    if(!date.equals("")){
      logs = logs.apply("FilterOnDate", ParDo.of(new DateFilterFn(date)));
    }
    return logs;
  }
} 


// Read File
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput())).apply(new ConditionallyFilter(date)).apply(...