在 Spark 中导入隐含函数不起作用
import of implicits in Spark not working
我正在尝试创建特征以将数据从配置单元 table 加载到类型化数据集中。这是代码:
import org.apache.spark.sql.{Dataset, Row, SparkSession}
trait PartitionedHiveTableLoader[T] {
def context: String
def table: String
def returnEntity: Row => T
def load(sparkSession: SparkSession, token: String): Dataset[T] = {
import sparkSession.implicits._
sparkSession.sql(s"SELECT * from $context.$table where d = $token").
map(returnEntity(_))
}
def load(sparkSession: SparkSession, lowBound: String, upperBound: String, includeLow: Boolean = true, includeUpper: Boolean = true): Dataset[T] = {
import sparkSession.implicits._
sparkSession.sql(s"SELECT * " +
s"from $context.$table " +
s"where d >${if(includeLow)"=" else ""} $lowBound " +
s"and d<${if(includeUpper)"=" else ""} $upperBound").
map(returnEntity(_))
}
}
然后这个特征与对象一起使用如下:
import org.apache.spark.sql.Row
object FreeUsersRightsLoader extends {} with PartitionedHiveTableLoader[FreeUsersRightsEntity] {
def context: String = "analytics"
def table: String = "free_users_rights"
def returnEntity: Row => FreeUsersRightsEntity = x => FreeUsersRightsDataset(x)
}
但是当我用 mvn package 编译它时,出现以下错误:
error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
但我在每个方法中都导入了 spark.implicits ...
有人知道问题出在哪里吗?
类型 T
的隐式 Encoder
必须在编译时对您正在使用的方法可用。
当您导入 import sparkSession.implicits._
时,您实际上为许多 已知 常见类型(例如 String、Long、Arrays、case classes 等),但是 - T
是 unknown 和 unbound,所以它可能是 anything ,并且没有任何 class 具有内置编码器 - 因此此导入没有用。
要解决此问题 - 您应该将 隐式编码器参数 添加到方法签名中:
def load(sparkSession: SparkSession, token: String)(implicit enc: Encoder[T]): Dataset[T] = {
sparkSession.sql(s"SELECT * from $context.$table where d = $token").
map(returnEntity(_))
}
def load(sparkSession: SparkSession,
lowBound: String,
upperBound: String,
includeLow: Boolean = true,
includeUpper: Boolean = true)(implicit enc: Encoder[T]): Dataset[T] = {
sparkSession.sql(s"SELECT * " +
s"from $context.$table " +
s"where d >${if(includeLow)"=" else ""} $lowBound " +
s"and d<${if(includeUpper)"=" else ""} $upperBound").
map(returnEntity(_))
}
然后,只要这些方法被调用 ,您就需要内置的隐式可用 - 其中类型 T 已知为 FreeUsersRightsEntity
(我假设是这些内置 classes 之一,例如一个包含基元和集合的案例 class):
import spark.implicits._
FreeUsersRightsLoader.load(spark, "token")
我正在尝试创建特征以将数据从配置单元 table 加载到类型化数据集中。这是代码:
import org.apache.spark.sql.{Dataset, Row, SparkSession}
trait PartitionedHiveTableLoader[T] {
def context: String
def table: String
def returnEntity: Row => T
def load(sparkSession: SparkSession, token: String): Dataset[T] = {
import sparkSession.implicits._
sparkSession.sql(s"SELECT * from $context.$table where d = $token").
map(returnEntity(_))
}
def load(sparkSession: SparkSession, lowBound: String, upperBound: String, includeLow: Boolean = true, includeUpper: Boolean = true): Dataset[T] = {
import sparkSession.implicits._
sparkSession.sql(s"SELECT * " +
s"from $context.$table " +
s"where d >${if(includeLow)"=" else ""} $lowBound " +
s"and d<${if(includeUpper)"=" else ""} $upperBound").
map(returnEntity(_))
}
}
然后这个特征与对象一起使用如下:
import org.apache.spark.sql.Row
object FreeUsersRightsLoader extends {} with PartitionedHiveTableLoader[FreeUsersRightsEntity] {
def context: String = "analytics"
def table: String = "free_users_rights"
def returnEntity: Row => FreeUsersRightsEntity = x => FreeUsersRightsDataset(x)
}
但是当我用 mvn package 编译它时,出现以下错误:
error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
但我在每个方法中都导入了 spark.implicits ... 有人知道问题出在哪里吗?
类型 T
的隐式 Encoder
必须在编译时对您正在使用的方法可用。
当您导入 import sparkSession.implicits._
时,您实际上为许多 已知 常见类型(例如 String、Long、Arrays、case classes 等),但是 - T
是 unknown 和 unbound,所以它可能是 anything ,并且没有任何 class 具有内置编码器 - 因此此导入没有用。
要解决此问题 - 您应该将 隐式编码器参数 添加到方法签名中:
def load(sparkSession: SparkSession, token: String)(implicit enc: Encoder[T]): Dataset[T] = {
sparkSession.sql(s"SELECT * from $context.$table where d = $token").
map(returnEntity(_))
}
def load(sparkSession: SparkSession,
lowBound: String,
upperBound: String,
includeLow: Boolean = true,
includeUpper: Boolean = true)(implicit enc: Encoder[T]): Dataset[T] = {
sparkSession.sql(s"SELECT * " +
s"from $context.$table " +
s"where d >${if(includeLow)"=" else ""} $lowBound " +
s"and d<${if(includeUpper)"=" else ""} $upperBound").
map(returnEntity(_))
}
然后,只要这些方法被调用 ,您就需要内置的隐式可用 - 其中类型 T 已知为 FreeUsersRightsEntity
(我假设是这些内置 classes 之一,例如一个包含基元和集合的案例 class):
import spark.implicits._
FreeUsersRightsLoader.load(spark, "token")