当 CoGroupByKey 与 CalendarWindows 时,Flatten 的输入不兼容 window windowFns
Inputs to Flatten had incompatible window windowFns when CoGroupByKey with CalendarWindows
TL;DR:
我如何 CoGroupByKey 一组与 CalendarWindows 设置相同窗口策略的 PCollections?
长版
我正在编写一个从两个不同的 pub/subs 读取的数据流管道,其中一个 PCollection 被拆分为一个 PCollectionTuple,最后我尝试 join them in a CoGroupByKey 在将其保存到 BigQuery 之前。
在管道测试期间,我的 PCollections 窗口化策略是:
private static PCollection<KV<String, Long>> applyWindowsAndCount(final PCollection<KV<String, Long>> summary, final String OperationName){
return summary
.apply("Apply Windows " + OperationName, Window
.<KV<String, Long>>into(FixedWindows.of(Duration.standardMinutes(1)))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(1)))
.apply("Count " + OperationName, Count.perKey());
}
我将它们设置为 1 分钟的 FixedWindow 长度,以便快速获得结果。
我的分组是这样的:
private static PCollection<KV<String, CoGbkResult>> MergeSummary(PCollection<KV<String, Long>> Avail, PCollection<KV<String, Long>> ValuationOK, PCollection<KV<String, Long>> ValuationKO){
return KeyedPCollectionTuple.of(Util.AVAIL, Avail)
.and(Util.VALUATION_OK, ValuationOK)
.and(Util.VALUATION_KO, ValuationKO)
.apply("Merge Summary", CoGroupByKey.create());
}
当我在本地和云上测试时它运行顺利,但是,当我使用实际生产值设置窗口时,1 天的 CalendarWindows 长度如下:
private static PCollection<KV<String, Long>> applyWindowsAndCount(final PCollection<KV<String, Long>> summary, final String OperationName){
return summary
.apply("Apply Windows " + OperationName, Window
.<KV<String, Long>>into(CalendarWindows.days(1).withTimeZone(DateTimeZone.UTC).withStartingDay(2016,9,20)) //Per day windowing.
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(1))) //Accepts X days late data.
.apply("Count " + OperationName, Count.perKey());
}
然后我什至无法编译代码,因为我收到如下消息:
Exception in thread "main" java.lang.IllegalStateException: Inputs to Flatten had incompatible window windowFns: com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows$DaysWindows@6af9fcb2, com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows$DaysWindows@6cce16f4
at com.google.cloud.dataflow.sdk.transforms.Flatten$FlattenPCollectionList.apply(Flatten.java:121)
at com.google.cloud.dataflow.sdk.transforms.Flatten$FlattenPCollectionList.apply(Flatten.java:105)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:413)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at com.google.cloud.dataflow.sdk.values.PCollectionList.apply(PCollectionList.java:175)
at com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey.apply(CoGroupByKey.java:124)
at com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey.apply(CoGroupByKey.java:74)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:413)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:290)
at com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:116)
阅读文档我发现了这个:
When using CoGroupByKey to group PCollections that have a windowing strategy applied, all of the PCollections you want to group must use the same windowing strategy and window sizing. For example, all the collections you're merging must use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.
If your pipeline attempts to use CoGroupByKey to merge PCollections with incompatible windows, Dataflow will generate an IllegalStateException error when your pipeline is constructed.
很明显dataflow认为我的PCollections不兼容windows,但是,它们都是使用我之前复制的函数应用的。那么,我如何 CoGroupByKey 一组与 CalendarWindows 设置相同窗口策略的 PCollections?
这看起来像是 CalendarWindows 中的一个错误;要解决它,您可以创建一个 CalendarWindows 对象,并将其用作每个 PCollection 的 WindowFn,而不是为每个对象创建单独的 CalendarWindows 对象。
TL;DR:
我如何 CoGroupByKey 一组与 CalendarWindows 设置相同窗口策略的 PCollections?
长版
我正在编写一个从两个不同的 pub/subs 读取的数据流管道,其中一个 PCollection 被拆分为一个 PCollectionTuple,最后我尝试 join them in a CoGroupByKey 在将其保存到 BigQuery 之前。
在管道测试期间,我的 PCollections 窗口化策略是:
private static PCollection<KV<String, Long>> applyWindowsAndCount(final PCollection<KV<String, Long>> summary, final String OperationName){
return summary
.apply("Apply Windows " + OperationName, Window
.<KV<String, Long>>into(FixedWindows.of(Duration.standardMinutes(1)))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(1)))
.apply("Count " + OperationName, Count.perKey());
}
我将它们设置为 1 分钟的 FixedWindow 长度,以便快速获得结果。
我的分组是这样的:
private static PCollection<KV<String, CoGbkResult>> MergeSummary(PCollection<KV<String, Long>> Avail, PCollection<KV<String, Long>> ValuationOK, PCollection<KV<String, Long>> ValuationKO){
return KeyedPCollectionTuple.of(Util.AVAIL, Avail)
.and(Util.VALUATION_OK, ValuationOK)
.and(Util.VALUATION_KO, ValuationKO)
.apply("Merge Summary", CoGroupByKey.create());
}
当我在本地和云上测试时它运行顺利,但是,当我使用实际生产值设置窗口时,1 天的 CalendarWindows 长度如下:
private static PCollection<KV<String, Long>> applyWindowsAndCount(final PCollection<KV<String, Long>> summary, final String OperationName){
return summary
.apply("Apply Windows " + OperationName, Window
.<KV<String, Long>>into(CalendarWindows.days(1).withTimeZone(DateTimeZone.UTC).withStartingDay(2016,9,20)) //Per day windowing.
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(1))) //Accepts X days late data.
.apply("Count " + OperationName, Count.perKey());
}
然后我什至无法编译代码,因为我收到如下消息:
Exception in thread "main" java.lang.IllegalStateException: Inputs to Flatten had incompatible window windowFns: com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows$DaysWindows@6af9fcb2, com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows$DaysWindows@6cce16f4
at com.google.cloud.dataflow.sdk.transforms.Flatten$FlattenPCollectionList.apply(Flatten.java:121)
at com.google.cloud.dataflow.sdk.transforms.Flatten$FlattenPCollectionList.apply(Flatten.java:105)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:413)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at com.google.cloud.dataflow.sdk.values.PCollectionList.apply(PCollectionList.java:175)
at com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey.apply(CoGroupByKey.java:124)
at com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey.apply(CoGroupByKey.java:74)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:413)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:290)
at com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:116)
阅读文档我发现了这个:
When using CoGroupByKey to group PCollections that have a windowing strategy applied, all of the PCollections you want to group must use the same windowing strategy and window sizing. For example, all the collections you're merging must use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.
If your pipeline attempts to use CoGroupByKey to merge PCollections with incompatible windows, Dataflow will generate an IllegalStateException error when your pipeline is constructed.
很明显dataflow认为我的PCollections不兼容windows,但是,它们都是使用我之前复制的函数应用的。那么,我如何 CoGroupByKey 一组与 CalendarWindows 设置相同窗口策略的 PCollections?
这看起来像是 CalendarWindows 中的一个错误;要解决它,您可以创建一个 CalendarWindows 对象,并将其用作每个 PCollection 的 WindowFn,而不是为每个对象创建单独的 CalendarWindows 对象。