如何将 case class 放入 rdd 中并让它像元组(对)一样工作?

How do I put a case class in an rdd and have it act like a tuple(pair)?

举个例子,我有一个简单的案例class

case class Foo(k:String, v1:String, v2:String)

为了类似的目的,我能否让 spark 将其识别为元组,而无需转换为元组,例如 map 或 keyBy step。

val rdd = sc.parallelize(List(Foo("k", "v1", "v2")))
// Swap values
rdd.mapValues(v => (v._2, v._1))

我什至不在乎它是否在这样的操作后失去了原来的情况class。我尝试了以下但没有运气。我是 Scala 的新手,我是否遗漏了什么?

case class Foo(k:String, v1:String, v2:String)
  extends Tuple2[String, (String, String)](k, (v1, v2))

编辑:在上面的代码片段中,class 扩展了 Tuple2,这不会产生预期的效果,即 RDD class 和函数不会将其视为元组并允许 PairRDDFunctions,例如作为 mapValues、values、reduceByKey 等

出于多种原因,扩展 TupleN 不是一个好主意,其中一个最好的原因是它已被弃用,在 2.11 上,甚至不可能扩展 TupleN一例class。即使您将 Foo 设为 non-case class,在 2.11 上使用 -deprecation 定义它也会向您显示:"warning: inheritance from class Tuple2 in package scala is deprecated: Tuples will be made final in a future version.".

如果您关心的是使用方便并且您不介意转换为元组的开销(几乎可以忽略不计),您可以使用 [= 提供的语法丰富 RDD[Foo] 21=] 转换如下:

import org.apache.spark.rdd.{ PairRDDFunctions, RDD }

case class Foo(k: String, v1: String, v2: String)

implicit def fooToPairRDDFunctions[K, V]
  (rdd: RDD[Foo]): PairRDDFunctions[String, (String, String)] =
    new PairRDDFunctions(
      rdd.map {
        case Foo(k, v1, v2) => k -> (v1, v2)
      }
    )

然后:

scala> val rdd = sc.parallelize(List(Foo("a", "b", "c"), Foo("d", "e", "f")))
rdd: org.apache.spark.rdd.RDD[Foo] = ParallelCollectionRDD[6] at parallelize at <console>:34

scala> rdd.mapValues(_._1).first
res0: (String, String) = (a,b)

你的 Foo 扩展 Tuple2[String, (String, String)] 的版本不起作用的原因是 RDD.rddToPairRDDFunctionsRDD[Tuple2[K, V]] 为目标,而 RDD 在它的类型参数,所以 RDD[Foo] 不是 RDD[Tuple2[K, V]]。一个更简单的例子可能会使这一点更清楚:

case class Box[A](a: A)

class Foo(k: String, v: String) extends Tuple2[String, String](k, v)

class PairBoxFunctions(box: Box[(String, String)]) {
  def pairValue: String = box.a._2
}

implicit def toPairBoxFunctions(box: Box[(String, String)]): PairBoxFunctions =
  new PairBoxFunctions(box)

然后:

scala> Box(("a", "b")).pairValue
res0: String = b

scala> Box(new Foo("a", "b")).pairValue
<console>:16: error: value pairValue is not a member of Box[Foo]
       Box(new Foo("a", "b")).pairValue
                              ^

但是如果你让 Box 协变...

case class Box[+A](a: A)

class Foo(k: String, v: String) extends Tuple2[String, String](k, v)

class PairBoxFunctions(box: Box[(String, String)]) {
  def pairValue: String = box.a._2
}

implicit def toPairBoxFunctions(box: Box[(String, String)]): PairBoxFunctions =
  new PairBoxFunctions(box)

…一切都很好:

scala> Box(("a", "b")).pairValue
res0: String = b

scala> Box(new Foo("a", "b")).pairValue
res1: String = b

您不能使 RDD 协变,因此定义您自己的隐式转换以添加语法是最好的选择。就个人而言,我可能会选择显式进行转换,但这是相对 un-horrible 隐式转换的使用。

不确定我是否答对了你的问题,但假设你有一个案例 class

import org.apache.spark.rdd.RDD

case class DataFormat(id: Int, name: String, value: Double)
val data: Seq[(Int, String, Double)] = Seq(
   (1, "Joe", 0.1),
   (2, "Mike", 0.3)
)
val rdd: RDD[DataFormat] = (
    sc.parallelize(data).map(x=>DataFormat(x._1, x._2, x._3))
)

// Print all data
rdd.foreach(println)

// Print only names
rdd.map(x=>x.name).foreach(println)