GlobalWindow 无法转换为 IntervalWindow

GlobalWindow cannot be cast to IntervalWindow

我正在尝试 运行 对 csv 文件中的一组元组进行滑动 window 计算。每行都有一个与之关联的日期。使用 TextIO.Read 读取 csv 文件后,我应用 ParDo 转换来更改 PCollection.

中每个元素的时间戳
//Reading and time stamping the stock prices
PCollection<KV<Integer, StockPricePoint>> stockPrices = pipeline
  .apply(TextIO.Read.from("./inputFiles/2004.csv"))
  .apply(ParDo.of(new DoFn<String, KV<Integer, StockPricePoint>>() {
    @Override
    public void processElement(ProcessContext c) throws Exception {
      String[] fields = c.element().split(",");
      try {
        StockPricePoint stockPoint = new StockPricePoint();
        stockPoint.setId(fields[0]);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyymmdd");
        stockPoint.setDate(sdf.parse(fields[1].trim()));
        stockPoint.setSymbol(fields[2]);
        stockPoint.setPrice(Double.parseDouble(fields[5].trim()));
        stockPoint.setCap(Double.parseDouble(fields[6].trim()));
        Instant instant = new Instant(stockPoint.getDate().getTime());
        c.outputWithTimestamp(KV.of(
          symbolEncoder.getSymbolIndex(stockPoint.getSymbol()), stockPoint), 
          instant);
      } catch (Exception ex) {
        //Todo accumulate errors
        ex.printStackTrace();
      }
    }
  });

然后我应用滑动 window 变换如下

//creating the sliding windows
PCollection<KV<Integer, StockPricePoint>> slidingWindowStockPrices = stockPrices
  .apply(Window.<KV<Integer, StockPricePoint>>into(
    SlidingWindows.of(Duration.standardDays(30))
        .every(Duration.standardDays(5)));

当我如下调用 GroupByKey 转换后,我得到一个 GlobalWindow cannot be cast to IntervalWindow 异常。这里可能出了什么问题?

   slidingWindowStockPrices.apply(GroupByKey.create());

可以在此处找到完整的堆栈跟踪 http://pastebin.com/QUStvrfB

这看起来不像是 Google Cloud Dataflow 服务的问题。您也可以尝试 DirectPipelineRunner 进行本地测试。

Spark runner 似乎有问题,它可能尚未实现在全局 Window 中触发所需的完整语义。 Spark runner 现在由 Apache Beam (Incubating) 项目维护。我已经在项目的 Jira 上提交了一个 ticket 来跟踪这个问题。

我试图在我编写的单元测试中重现该行为。请参阅:https://issues.apache.org/jira/browse/BEAM-112

我已将您的代码片段用于功能测试以尝试重现,但我无法重现该问题。 但是,我确实注意到您在集成之前使用的是 Spark runner 的早期版本(甚至是 Cloudera 的 spark-dataflow),因此尝试更新您的 spark runner 可能值得一试。

如果我在这里遗漏了什么,请告诉我。