Spark - 使用 ShuffledRDD 包装任意 RDD

Spark - Wrap an arbitrary RDD with using a ShuffledRDD

我有一个 Broadcast 变量,我将其反序列化以获得 RDD 及其依赖集,如下所示:

val taskBinary: Broadcast[Array[Byte]]
var (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

但是,我想用 ShuffledRDD 包装这个 rdd,因为我需要对其应用自定义分区程序,我正在这样做:

var wrappedRDD = new ShuffledRDD[_ ,_, _](rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner())

但它会导致错误:

Error:unbound wildcard type rdd = new ShuffledRDD[_ ,_, _ ](rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner()) ..................................^

问题是我不知道如何用任何推断类型替换这些通配符,因为它应该是动态的,而且我不知道原始 rdd 的推断类型是什么。知道我该如何解决这个问题吗?

所以,我认为您的 wrappedRDD 中存在一些问题。报告的错误 "unbound wildcard type..." 与您在构造函数调用中向 rdd 变量添加了类型定义有关。

(rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner())

应该是

(rdd, context.getCustomPartitioner())

此外,您需要为 ShuffledRDD 提供类型。您可以使用 Any

var wrappedRDD = new ShuffledRDD[Any,Any,Any](rdd, context.getCustomPartitioner())

但我怀疑你真正想做的是定义一个接受类型的函数和 returns 特定类型 ShuffledRDD 像这样:

def wrapRDD[K:ClassTag, V:ClassTag, C: ClassTag](rdd: RDD[(K, V)]) = {
  new ShuffledRDD[K, V, C](rdd, context.getCustomPartitioner())
}

val wrappedRDD = wrapRDD[String, String, Combiner](rdd, context.getCustomPartitioner())