reduce 的参数太多 [Scala 中的 Flink 1.9]

Too Many Arguments for reduce [Flink 1.9 in Scala]

我正在尝试将 Flink 的 Incremental Window Aggregation with ReduceFunction 用于我正在做的项目 return 单个值,该值是时间 window 中的最小值 window界限。

def aggregation(run1Stream: DataStream[myClass], windowSize: Time = Time.hours(1), windowSlide: Time = Time.minutes(2)): DataStream[myClass] = {
    myStream
      .keyBy(x => x.key)
      .timeWindow(windowSize, windowSlide)
      // run a incremental reduce on window aggregation 
      .reduce( new minVal(),  new AssignWindowEndProcessFunction())
  }
class minVal extends ReduceFunction[myClass] {
  override def reduce(r1: myClass, r2: myClass) = {
    (r1: myClass, r2: myClass) => {if (r1.val > r2.val) r2 else r1}
  }
}
class AssignWindowEndProcessFunction extends ProcessWindowFunction[myClass, (myClass,Long, Long), String, TimeWindow] {

  override def process(key: String,
                       ctx: Context,
                       input: Iterable[myClass],
                       out: Collector[(myClass,Long, Long)]): Unit = {
    val minVal = input.head
    val windowStart = ctx.window.getStart
    val windowEnd = ctx.window.getEnd
    out.collect((minVal, windowStart ,windowEnd))
  }

我收到的错误是:无法解析重载方法 'reduce'

有没有人发现我的实施存在任何重大问题?

WindowedStream#reduce的签名是

def reduce[R: TypeInformation](
  preAggregator: (T, T) => T,
  windowFunction: (K, W, Iterable[T], Collector[R]) => Unit)

我假设 ReduceFunction 不是 scala 的有效替代品 Function2

因此您有两个选择,a) 将 minVal 更改为扩展 (T, T) => T 或 b) 将函数内联为示例中的 lambda。

在我的案例中,问题最终是我的一位同事发现的函数中的类型不匹配。 .reduce() 的输出与函数的预期输出不匹配,因此重载方法更改 DataStream[myClass] => DataStream[(myClass,Long, Long)] 修复了重载方法错误。

  def aggregation(run1Stream: DataStream[myClass], windowSize: Time = Time.hours(1), windowSlide: Time = Time.minutes(2)): DataStream[(myClass,Long, Long)] = {
    myStream
      .keyBy(x => x.key)
      .timeWindow(windowSize, windowSlide)
      // run a incremental reduce on window aggregation 
      .reduce( new minVal(),  new AssignWindowEndProcessFunction())
  }

此外,我不得不将 reduce 函数更改为

class minVal extends ReduceFunction[myClass] {
  override def reduce(r1: myClass, r2: myClass) = {
    if (r1.val > r2.val) r2 else r1
  }
}

因为在我像 Arvid Heise 提到的那样返回 Function2 之前