在 Spark 中导入隐含函数不起作用

import of implicits in Spark not working

我正在尝试创建特征以将数据从配置单元 table 加载到类型化数据集中。这是代码:

import org.apache.spark.sql.{Dataset, Row, SparkSession}

trait PartitionedHiveTableLoader[T] {
  def context: String
  def table: String
  def returnEntity: Row => T
  def load(sparkSession: SparkSession, token: String): Dataset[T] = {
    import sparkSession.implicits._
    sparkSession.sql(s"SELECT * from $context.$table where d = $token").
      map(returnEntity(_))
  }
  def load(sparkSession: SparkSession, lowBound: String, upperBound: String, includeLow: Boolean = true, includeUpper: Boolean = true): Dataset[T] = {
    import sparkSession.implicits._
    sparkSession.sql(s"SELECT * " +
      s"from $context.$table " +
      s"where d >${if(includeLow)"=" else ""} $lowBound " +
      s"and d<${if(includeUpper)"=" else ""} $upperBound").
      map(returnEntity(_))
  }
}

然后这个特征与对象一起使用如下:

import org.apache.spark.sql.Row

object FreeUsersRightsLoader extends {} with PartitionedHiveTableLoader[FreeUsersRightsEntity] {
  def context: String = "analytics"
  def table: String =  "free_users_rights"
  def returnEntity: Row => FreeUsersRightsEntity = x => FreeUsersRightsDataset(x)
}

但是当我用 mvn package 编译它时,出现以下错误:

error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

但我在每个方法中都导入了 spark.implicits ... 有人知道问题出在哪里吗?

类型 T 的隐式 Encoder 必须在编译时对您正在使用的方法可用。

当您导入 import sparkSession.implicits._ 时,您实际上为许多 已知 常见类型(例如 String、Long、Arrays、case classes 等),但是 - Tunknownunbound,所以它可能是 anything ,并且没有任何 class 具有内置编码器 - 因此此导入没有用。

要解决此问题 - 您应该将 隐式编码器参数 添加到方法签名中:

def load(sparkSession: SparkSession, token: String)(implicit enc: Encoder[T]): Dataset[T] = {
  sparkSession.sql(s"SELECT * from $context.$table where d = $token").
    map(returnEntity(_))
}

def load(sparkSession: SparkSession,
         lowBound: String,
         upperBound: String,
         includeLow: Boolean = true,
         includeUpper: Boolean = true)(implicit enc: Encoder[T]): Dataset[T] = {
  sparkSession.sql(s"SELECT * " +
    s"from $context.$table " +
    s"where d >${if(includeLow)"=" else ""} $lowBound " +
    s"and d<${if(includeUpper)"=" else ""} $upperBound").
    map(returnEntity(_))
}

然后,只要这些方法被调用 ,您就需要内置的隐式可用 - 其中类型 T 已知为 FreeUsersRightsEntity(我假设是这些内置 classes 之一,例如一个包含基元和集合的案例 class):

import spark.implicits._

FreeUsersRightsLoader.load(spark, "token")