如何有条件地将 Transform 应用于 PCollection?
How to apply PTransform to PCollection conditionnaly?
我有一个 PCollection
,如果某个条件得到验证,我想应用自定义 PTransform
。(该条件不依赖于 Pcollection
内容)
示例: 我有日志,如果 PipelineOptions
中提供了日期,我想在该日期进行过滤。
目前,我的最佳解决方案是:
// Read File
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput()))
if(!date.equals("")){
logs = logs.apply("FilterOnDate", ParDo.of(new DateFilterFn(date)));
}
logs = logs.apply(...
它可以工作,但我不喜欢重新分配日志。更何况,我不喜欢打破apply
的链条。这看起来不像是优雅的方式。
是否有某种条件PTransform
?或者如果没有,将条件检查放在 PTransform
中并在未验证的情况下输出所有内容会更有效吗?
梦想例子:
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput()))
.applyIf("FilterOnDate", ParDo.of(new DateFilterFn(date)), !date.equals(""))
.apply(...
不幸的是,Beam 没有任何类似的 applyIf
,
您当前的方法是进行此类条件过滤的一般方法。
PTransform 中的条件检查为每个元素添加了一个额外的操作,这将根据检查类型影响性能。
如果可能,最好避免从管道进行转换,而不是使 PTransform 更复杂。
从代码美学的角度来看,可以使用wrapper transform有条件地应用相关的filter pardo。
示例:
public static class ConditionallyFilter
extends PTransform<PCollection<LogRow>, PCollection<LogRow>> {
private final String date;
public ConditionallyFilter(String date){
this.date = date;
}
@Override
public PCollection<LogRow> expand(PCollection<LogRow> logs) {
if(!date.equals("")){
logs = logs.apply("FilterOnDate", ParDo.of(new DateFilterFn(date)));
}
return logs;
}
}
// Read File
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput())).apply(new ConditionallyFilter(date)).apply(...
我有一个 PCollection
,如果某个条件得到验证,我想应用自定义 PTransform
。(该条件不依赖于 Pcollection
内容)
示例: 我有日志,如果 PipelineOptions
中提供了日期,我想在该日期进行过滤。
目前,我的最佳解决方案是:
// Read File
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput()))
if(!date.equals("")){
logs = logs.apply("FilterOnDate", ParDo.of(new DateFilterFn(date)));
}
logs = logs.apply(...
它可以工作,但我不喜欢重新分配日志。更何况,我不喜欢打破apply
的链条。这看起来不像是优雅的方式。
是否有某种条件PTransform
?或者如果没有,将条件检查放在 PTransform
中并在未验证的情况下输出所有内容会更有效吗?
梦想例子:
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput()))
.applyIf("FilterOnDate", ParDo.of(new DateFilterFn(date)), !date.equals(""))
.apply(...
不幸的是,Beam 没有任何类似的 applyIf
,
您当前的方法是进行此类条件过滤的一般方法。
PTransform 中的条件检查为每个元素添加了一个额外的操作,这将根据检查类型影响性能。
如果可能,最好避免从管道进行转换,而不是使 PTransform 更复杂。
从代码美学的角度来看,可以使用wrapper transform有条件地应用相关的filter pardo。 示例:
public static class ConditionallyFilter
extends PTransform<PCollection<LogRow>, PCollection<LogRow>> {
private final String date;
public ConditionallyFilter(String date){
this.date = date;
}
@Override
public PCollection<LogRow> expand(PCollection<LogRow> logs) {
if(!date.equals("")){
logs = logs.apply("FilterOnDate", ParDo.of(new DateFilterFn(date)));
}
return logs;
}
}
// Read File
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput())).apply(new ConditionallyFilter(date)).apply(...