GlobalWindow 无法转换为 IntervalWindow
GlobalWindow cannot be cast to IntervalWindow
我正在尝试 运行 对 csv 文件中的一组元组进行滑动 window 计算。每行都有一个与之关联的日期。使用 TextIO.Read
读取 csv 文件后,我应用 ParDo
转换来更改 PCollection
.
中每个元素的时间戳
//Reading and time stamping the stock prices
PCollection<KV<Integer, StockPricePoint>> stockPrices = pipeline
.apply(TextIO.Read.from("./inputFiles/2004.csv"))
.apply(ParDo.of(new DoFn<String, KV<Integer, StockPricePoint>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String[] fields = c.element().split(",");
try {
StockPricePoint stockPoint = new StockPricePoint();
stockPoint.setId(fields[0]);
SimpleDateFormat sdf = new SimpleDateFormat("yyyymmdd");
stockPoint.setDate(sdf.parse(fields[1].trim()));
stockPoint.setSymbol(fields[2]);
stockPoint.setPrice(Double.parseDouble(fields[5].trim()));
stockPoint.setCap(Double.parseDouble(fields[6].trim()));
Instant instant = new Instant(stockPoint.getDate().getTime());
c.outputWithTimestamp(KV.of(
symbolEncoder.getSymbolIndex(stockPoint.getSymbol()), stockPoint),
instant);
} catch (Exception ex) {
//Todo accumulate errors
ex.printStackTrace();
}
}
});
然后我应用滑动 window 变换如下
//creating the sliding windows
PCollection<KV<Integer, StockPricePoint>> slidingWindowStockPrices = stockPrices
.apply(Window.<KV<Integer, StockPricePoint>>into(
SlidingWindows.of(Duration.standardDays(30))
.every(Duration.standardDays(5)));
当我如下调用 GroupByKey
转换后,我得到一个 GlobalWindow cannot be cast to IntervalWindow
异常。这里可能出了什么问题?
slidingWindowStockPrices.apply(GroupByKey.create());
可以在此处找到完整的堆栈跟踪 http://pastebin.com/QUStvrfB
这看起来不像是 Google Cloud Dataflow 服务的问题。您也可以尝试 DirectPipelineRunner
进行本地测试。
Spark runner 似乎有问题,它可能尚未实现在全局 Window 中触发所需的完整语义。 Spark runner 现在由 Apache Beam (Incubating) 项目维护。我已经在项目的 Jira 上提交了一个 ticket 来跟踪这个问题。
我试图在我编写的单元测试中重现该行为。请参阅:https://issues.apache.org/jira/browse/BEAM-112
我已将您的代码片段用于功能测试以尝试重现,但我无法重现该问题。
但是,我确实注意到您在集成之前使用的是 Spark runner 的早期版本(甚至是 Cloudera 的 spark-dataflow),因此尝试更新您的 spark runner 可能值得一试。
如果我在这里遗漏了什么,请告诉我。
我正在尝试 运行 对 csv 文件中的一组元组进行滑动 window 计算。每行都有一个与之关联的日期。使用 TextIO.Read
读取 csv 文件后,我应用 ParDo
转换来更改 PCollection
.
//Reading and time stamping the stock prices
PCollection<KV<Integer, StockPricePoint>> stockPrices = pipeline
.apply(TextIO.Read.from("./inputFiles/2004.csv"))
.apply(ParDo.of(new DoFn<String, KV<Integer, StockPricePoint>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String[] fields = c.element().split(",");
try {
StockPricePoint stockPoint = new StockPricePoint();
stockPoint.setId(fields[0]);
SimpleDateFormat sdf = new SimpleDateFormat("yyyymmdd");
stockPoint.setDate(sdf.parse(fields[1].trim()));
stockPoint.setSymbol(fields[2]);
stockPoint.setPrice(Double.parseDouble(fields[5].trim()));
stockPoint.setCap(Double.parseDouble(fields[6].trim()));
Instant instant = new Instant(stockPoint.getDate().getTime());
c.outputWithTimestamp(KV.of(
symbolEncoder.getSymbolIndex(stockPoint.getSymbol()), stockPoint),
instant);
} catch (Exception ex) {
//Todo accumulate errors
ex.printStackTrace();
}
}
});
然后我应用滑动 window 变换如下
//creating the sliding windows
PCollection<KV<Integer, StockPricePoint>> slidingWindowStockPrices = stockPrices
.apply(Window.<KV<Integer, StockPricePoint>>into(
SlidingWindows.of(Duration.standardDays(30))
.every(Duration.standardDays(5)));
当我如下调用 GroupByKey
转换后,我得到一个 GlobalWindow cannot be cast to IntervalWindow
异常。这里可能出了什么问题?
slidingWindowStockPrices.apply(GroupByKey.create());
可以在此处找到完整的堆栈跟踪 http://pastebin.com/QUStvrfB
这看起来不像是 Google Cloud Dataflow 服务的问题。您也可以尝试 DirectPipelineRunner
进行本地测试。
Spark runner 似乎有问题,它可能尚未实现在全局 Window 中触发所需的完整语义。 Spark runner 现在由 Apache Beam (Incubating) 项目维护。我已经在项目的 Jira 上提交了一个 ticket 来跟踪这个问题。
我试图在我编写的单元测试中重现该行为。请参阅:https://issues.apache.org/jira/browse/BEAM-112
我已将您的代码片段用于功能测试以尝试重现,但我无法重现该问题。 但是,我确实注意到您在集成之前使用的是 Spark runner 的早期版本(甚至是 Cloudera 的 spark-dataflow),因此尝试更新您的 spark runner 可能值得一试。
如果我在这里遗漏了什么,请告诉我。