Spark/scala 在特征中使用泛型创建空数据集

Spark/scala create empty dataset using generics in a trait

我有一个名为 trait 的类型参数,它的方法之一需要能够创建一个空的类型化数据集。

trait MyTrait[T] {
    val sparkSession: SparkSession
    val spark = sparkSession.session
    val sparkContext = spark.sparkContext

    def createEmptyDataset(): Dataset[T] = {
        import spark.implicits._ // to access .toDS() function
        // DOESN'T WORK.
        val emptyRDD = sparkContext.parallelize(Seq[T]())
        val accumulator = emptyRDD.toDS()
        ...
    }
}

到目前为止我还没有让它工作。它抱怨 no ClassTag for T,并且 value toDS is not a member of org.apache.spark.rdd.RDD[T]

如有任何帮助,我们将不胜感激。谢谢!

您必须在同一范围内同时提供 ClassTag[T]Encoder[T]。例如:

import org.apache.spark.sql.{SparkSession, Dataset, Encoder}
import scala.reflect.ClassTag


trait MyTrait[T] {
    val ct: ClassTag[T]
    val enc: Encoder[T]

    val sparkSession: SparkSession
    val sparkContext = spark.sparkContext

    def createEmptyDataset(): Dataset[T] = {
        val emptyRDD = sparkContext.emptyRDD[T](ct)
        spark.createDataset(emptyRDD)(enc)
    }
}

具体实现:

class Foo extends MyTrait[Int] {
   val sparkSession = SparkSession.builder.getOrCreate()
   import sparkSession.implicits._

   val ct = implicitly[ClassTag[Int]]
   val enc = implicitly[Encoder[Int]]
}

可以跳过RDD:

import org.apache.spark.sql.{SparkSession, Dataset, Encoder}

trait MyTrait[T] {
    val enc: Encoder[T]

    val sparkSession: SparkSession
    val sparkContext = spark.sparkContext

    def createEmptyDataset(): Dataset[T] = {
        spark.emptyDataset[T](enc)
    }
}

检查How to declare traits as taking implicit "constructor parameters"?, specifically answer by Blaisorblade and another one by Alexey Romanov