Spark Scala 数据集映射在 main 中工作但在函数中不工作
Spark Scala Dataset map works in main but not in function
我有 2 个数据集:
implicit val spark: SparkSession = SparkSession
.builder()
.appName("app").master("local[1]")
.config("spark.executor.memory", "1g")
.getOrCreate()
import spark.implicits._
val ds1 = /*read csv file*/.as[caseClass1]
val ds2 = /*read csv file*/.as[caseClass2]
然后我像这样加入和映射:
val ds3 = ds1.
joinWith(ds2, ds1("id") === ds2("id"))
.map{case(left, right) => (left, Option(right))}
获得预期结果。
问题是我正在尝试用它和其他一些函数来实现 RichDataset,如下所示:
object Extentions {
implicit class RichDataset[T <: Product](leftDs: Dataset[T]) {
def leftJoinWith[V <: Product](rightDs: Dataset[V], condition:
Column)(implicit spark: SparkSession) : Dataset[(T, Option[V])] = {
import spark.implicits._
leftDs.joinWith(rightDs, condition, "left")
.map{case(left, right) => (left, Option(right))}
}
}
}
在 main 中,对 leftJoinWith 的导入 Extentions._ 调用失败:
Error:(15, 13) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
.map{case(left, right) => (left, Option(right))}
Error:(15, 13) not enough arguments for method map: (implicit evidence: org.apache.spark.sql.Encoder[(T, Option[V])])org.apache.spark.sql.Dataset[(T, Option[V])].
Unspecified value parameter evidence.
.map{case(left, right) => (left, Option(right))}
...但是spark.implicits._是在函数内部导入的!
如果return只是join,而不是join + map,它在main和function中都可以工作。
scalaVersion := "2.11.8", sparkVersion := "2.2.0"
提前致谢!
如果您将 TypeTag
添加到泛型类型参数,它会起作用(在 Spark 的源代码中看到这个):
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql.{Column, Dataset, SparkSession}
object Extentions {
implicit class RichDataset[T <: Product : TypeTag](leftDs: Dataset[T]) {
def leftJoinWith[V <: Product : TypeTag](rightDs: Dataset[V], condition:
Column)(implicit spark: SparkSession) : Dataset[(T, Option[V])] = {
import spark.implicits._
leftDs.joinWith(rightDs, condition, "left")
.map{case(left, right) => (left, Option(right))}
}
}
}
我有 2 个数据集:
implicit val spark: SparkSession = SparkSession
.builder()
.appName("app").master("local[1]")
.config("spark.executor.memory", "1g")
.getOrCreate()
import spark.implicits._
val ds1 = /*read csv file*/.as[caseClass1]
val ds2 = /*read csv file*/.as[caseClass2]
然后我像这样加入和映射:
val ds3 = ds1.
joinWith(ds2, ds1("id") === ds2("id"))
.map{case(left, right) => (left, Option(right))}
获得预期结果。
问题是我正在尝试用它和其他一些函数来实现 RichDataset,如下所示:
object Extentions {
implicit class RichDataset[T <: Product](leftDs: Dataset[T]) {
def leftJoinWith[V <: Product](rightDs: Dataset[V], condition:
Column)(implicit spark: SparkSession) : Dataset[(T, Option[V])] = {
import spark.implicits._
leftDs.joinWith(rightDs, condition, "left")
.map{case(left, right) => (left, Option(right))}
}
}
}
在 main 中,对 leftJoinWith 的导入 Extentions._ 调用失败:
Error:(15, 13) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
.map{case(left, right) => (left, Option(right))}
Error:(15, 13) not enough arguments for method map: (implicit evidence: org.apache.spark.sql.Encoder[(T, Option[V])])org.apache.spark.sql.Dataset[(T, Option[V])].
Unspecified value parameter evidence.
.map{case(left, right) => (left, Option(right))}
...但是spark.implicits._是在函数内部导入的!
如果return只是join,而不是join + map,它在main和function中都可以工作。
scalaVersion := "2.11.8", sparkVersion := "2.2.0"
提前致谢!
如果您将 TypeTag
添加到泛型类型参数,它会起作用(在 Spark 的源代码中看到这个):
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql.{Column, Dataset, SparkSession}
object Extentions {
implicit class RichDataset[T <: Product : TypeTag](leftDs: Dataset[T]) {
def leftJoinWith[V <: Product : TypeTag](rightDs: Dataset[V], condition:
Column)(implicit spark: SparkSession) : Dataset[(T, Option[V])] = {
import spark.implicits._
leftDs.joinWith(rightDs, condition, "left")
.map{case(left, right) => (left, Option(right))}
}
}
}