如何在Scala中实现未知类型和未知数量的参数

How to implement parameters with unknown types and unknown number in Scala

我想在 Scala/Spark 中实现一个函数,它可以接受多个 reducers/aggregators 并一次执行它们。所以基本上我给出了 reduce 函数和初始值,它应该一次创建一个复合 reduce 操作。

这是 Python

中的逻辑
from functools import reduce

def reduce_at_once(data, reducer_funcs_inits):
    reducer_funcs, inits = zip(*reducer_funcs_inits)

    complete_reducer_func = lambda acc, y: tuple(rf(a_x, y) for a_x, rf in zip(acc, reducer_funcs))

    return list(reduce(complete_reducer_func, data, inits))

data = list(range(1, 20))
reducer_funcs_inits = [(lambda acc, y: acc + y, 0), # sum
                       (lambda acc, y: acc * y, 1)  # product
                       ]
print(list(reduce_at_once(data, reducer_funcs_inits)))
# [190, 121645100408832000]

如何在 Scala (Spark) 中做这样的事情?问题似乎是我有一个列表,它的长度我只在调用时知道,而且列表的元素可能有不同的类型(减少初始累加器),这取决于我想包括哪个减速器(不一定只有像这里这样的数字)。

您可以随时使用

def reduce_at_once(data: Any, reducer_funcs_inits: Any*)

但这很少是您想要的。特别是这里你其实需要

case class ReducerInit[A, B](f: (B, A) => B, init: B)

def reduce_at_once[A](data: Seq[A], rfis: ReducerInit[A, _]*): Seq[_]

不幸的是,实施 reduce_at_once 会非常难看:

def reduce_at_once[A](data: Seq[A], rfis: ReducerInit[A, _]*): Seq[_] = {
  val rfs = rfis.map(_.f.asInstanceOf[(Any, A) => Any])
  val inits = rfis.map(_.init.asInstanceOf[Any])

  val crf = (acc: Seq[Any], y: A) => acc.zip(rfs).map { case (a_x, rf) => rf(a_x, y) }

  data.foldLeft(inits)(crf)
}

检查:

val data = 1 to 20

val rf1 = ReducerInit[Int, Int](_ + _, 0)
val rf2 = ReducerInit[Int, Int](_ * _, 1)

println(reduce_at_once(data, rf1, rf2))

givesArrayBuffer(210, -2102132736)(注意溢出).