如何在Scala中实现未知类型和未知数量的参数
How to implement parameters with unknown types and unknown number in Scala
我想在 Scala/Spark 中实现一个函数,它可以接受多个 reducers/aggregators 并一次执行它们。所以基本上我给出了 reduce 函数和初始值,它应该一次创建一个复合 reduce 操作。
这是 Python
中的逻辑
from functools import reduce
def reduce_at_once(data, reducer_funcs_inits):
reducer_funcs, inits = zip(*reducer_funcs_inits)
complete_reducer_func = lambda acc, y: tuple(rf(a_x, y) for a_x, rf in zip(acc, reducer_funcs))
return list(reduce(complete_reducer_func, data, inits))
data = list(range(1, 20))
reducer_funcs_inits = [(lambda acc, y: acc + y, 0), # sum
(lambda acc, y: acc * y, 1) # product
]
print(list(reduce_at_once(data, reducer_funcs_inits)))
# [190, 121645100408832000]
如何在 Scala (Spark) 中做这样的事情?问题似乎是我有一个列表,它的长度我只在调用时知道,而且列表的元素可能有不同的类型(减少初始累加器),这取决于我想包括哪个减速器(不一定只有像这里这样的数字)。
您可以随时使用
def reduce_at_once(data: Any, reducer_funcs_inits: Any*)
但这很少是您想要的。特别是这里你其实需要
case class ReducerInit[A, B](f: (B, A) => B, init: B)
def reduce_at_once[A](data: Seq[A], rfis: ReducerInit[A, _]*): Seq[_]
不幸的是,实施 reduce_at_once
会非常难看:
def reduce_at_once[A](data: Seq[A], rfis: ReducerInit[A, _]*): Seq[_] = {
val rfs = rfis.map(_.f.asInstanceOf[(Any, A) => Any])
val inits = rfis.map(_.init.asInstanceOf[Any])
val crf = (acc: Seq[Any], y: A) => acc.zip(rfs).map { case (a_x, rf) => rf(a_x, y) }
data.foldLeft(inits)(crf)
}
检查:
val data = 1 to 20
val rf1 = ReducerInit[Int, Int](_ + _, 0)
val rf2 = ReducerInit[Int, Int](_ * _, 1)
println(reduce_at_once(data, rf1, rf2))
givesArrayBuffer(210, -2102132736)
(注意溢出).
我想在 Scala/Spark 中实现一个函数,它可以接受多个 reducers/aggregators 并一次执行它们。所以基本上我给出了 reduce 函数和初始值,它应该一次创建一个复合 reduce 操作。
这是 Python
中的逻辑from functools import reduce
def reduce_at_once(data, reducer_funcs_inits):
reducer_funcs, inits = zip(*reducer_funcs_inits)
complete_reducer_func = lambda acc, y: tuple(rf(a_x, y) for a_x, rf in zip(acc, reducer_funcs))
return list(reduce(complete_reducer_func, data, inits))
data = list(range(1, 20))
reducer_funcs_inits = [(lambda acc, y: acc + y, 0), # sum
(lambda acc, y: acc * y, 1) # product
]
print(list(reduce_at_once(data, reducer_funcs_inits)))
# [190, 121645100408832000]
如何在 Scala (Spark) 中做这样的事情?问题似乎是我有一个列表,它的长度我只在调用时知道,而且列表的元素可能有不同的类型(减少初始累加器),这取决于我想包括哪个减速器(不一定只有像这里这样的数字)。
您可以随时使用
def reduce_at_once(data: Any, reducer_funcs_inits: Any*)
但这很少是您想要的。特别是这里你其实需要
case class ReducerInit[A, B](f: (B, A) => B, init: B)
def reduce_at_once[A](data: Seq[A], rfis: ReducerInit[A, _]*): Seq[_]
不幸的是,实施 reduce_at_once
会非常难看:
def reduce_at_once[A](data: Seq[A], rfis: ReducerInit[A, _]*): Seq[_] = {
val rfs = rfis.map(_.f.asInstanceOf[(Any, A) => Any])
val inits = rfis.map(_.init.asInstanceOf[Any])
val crf = (acc: Seq[Any], y: A) => acc.zip(rfs).map { case (a_x, rf) => rf(a_x, y) }
data.foldLeft(inits)(crf)
}
检查:
val data = 1 to 20
val rf1 = ReducerInit[Int, Int](_ + _, 0)
val rf2 = ReducerInit[Int, Int](_ * _, 1)
println(reduce_at_once(data, rf1, rf2))
givesArrayBuffer(210, -2102132736)
(注意溢出).