Spark - 使用 ShuffledRDD 包装任意 RDD
Spark - Wrap an arbitrary RDD with using a ShuffledRDD
我有一个 Broadcast 变量,我将其反序列化以获得 RDD 及其依赖集,如下所示:
val taskBinary: Broadcast[Array[Byte]]
var (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
但是,我想用 ShuffledRDD 包装这个 rdd,因为我需要对其应用自定义分区程序,我正在这样做:
var wrappedRDD = new ShuffledRDD[_ ,_, _](rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner())
但它会导致错误:
Error:unbound wildcard type
rdd = new ShuffledRDD[_ ,_, _ ](rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner())
..................................^
问题是我不知道如何用任何推断类型替换这些通配符,因为它应该是动态的,而且我不知道原始 rdd 的推断类型是什么。知道我该如何解决这个问题吗?
所以,我认为您的 wrappedRDD 中存在一些问题。报告的错误 "unbound wildcard type..." 与您在构造函数调用中向 rdd
变量添加了类型定义有关。
(rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner())
应该是
(rdd, context.getCustomPartitioner())
此外,您需要为 ShuffledRDD
提供类型。您可以使用 Any
、
var wrappedRDD = new ShuffledRDD[Any,Any,Any](rdd, context.getCustomPartitioner())
但我怀疑你真正想做的是定义一个接受类型的函数和 returns 特定类型 ShuffledRDD
像这样:
def wrapRDD[K:ClassTag, V:ClassTag, C: ClassTag](rdd: RDD[(K, V)]) = {
new ShuffledRDD[K, V, C](rdd, context.getCustomPartitioner())
}
val wrappedRDD = wrapRDD[String, String, Combiner](rdd, context.getCustomPartitioner())
我有一个 Broadcast 变量,我将其反序列化以获得 RDD 及其依赖集,如下所示:
val taskBinary: Broadcast[Array[Byte]]
var (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
但是,我想用 ShuffledRDD 包装这个 rdd,因为我需要对其应用自定义分区程序,我正在这样做:
var wrappedRDD = new ShuffledRDD[_ ,_, _](rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner())
但它会导致错误:
Error:unbound wildcard type rdd = new ShuffledRDD[_ ,_, _ ](rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner()) ..................................^
问题是我不知道如何用任何推断类型替换这些通配符,因为它应该是动态的,而且我不知道原始 rdd 的推断类型是什么。知道我该如何解决这个问题吗?
所以,我认为您的 wrappedRDD 中存在一些问题。报告的错误 "unbound wildcard type..." 与您在构造函数调用中向 rdd
变量添加了类型定义有关。
(rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner())
应该是
(rdd, context.getCustomPartitioner())
此外,您需要为 ShuffledRDD
提供类型。您可以使用 Any
、
var wrappedRDD = new ShuffledRDD[Any,Any,Any](rdd, context.getCustomPartitioner())
但我怀疑你真正想做的是定义一个接受类型的函数和 returns 特定类型 ShuffledRDD
像这样:
def wrapRDD[K:ClassTag, V:ClassTag, C: ClassTag](rdd: RDD[(K, V)]) = {
new ShuffledRDD[K, V, C](rdd, context.getCustomPartitioner())
}
val wrappedRDD = wrapRDD[String, String, Combiner](rdd, context.getCustomPartitioner())