在 Spark 中创建数据集期间的 Scala 反射异常

Scala Reflection exception during creation of DataSet in Spark

我想 运行 Spark Jobserver 上的 Spark Job。 在执行过程中,我得到了一个异常:

堆栈

java.lang.RuntimeException: scala.ScalaReflectionException: class com.some.example.instrument.data.SQLMapping in JavaMirror with org.apache.spark.util.MutableURLClassLoader@55b699ef of type class org.apache.spark.util.MutableURLClassLoader with classpath [file:/app/spark-job-server.jar] and parent being sun.misc.Launcher$AppClassLoader@2e817b38 of type class sun.misc.Launcher$AppClassLoader with classpath [.../classpath jars/] not found.

at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22) at com.some.example.instrument.DataRetriever$$anonfun$combineMappings$$typecreator15.apply(DataRetriever.scala:136) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:49) at org.apache.spark.sql.Encoders$.product(Encoders.scala:275) at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:233) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33) at com.some.example.instrument.DataRetriever$$anonfun$combineMappings.apply(DataRetriever.scala:136) at com.some.example.instrument.DataRetriever$$anonfun$combineMappings.apply(DataRetriever.scala:135) at scala.util.Success$$anonfun$map.apply(Try.scala:237) at scala.util.Try$.apply(Try.scala:192) at scala.util.Success.map(Try.scala:237) at scala.concurrent.Future$$anonfun$map.apply(Future.scala:237) at scala.concurrent.Future$$anonfun$map.apply(Future.scala:237) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

DataRetriever 中,我将简单大小写 class 转换为 DataSet。

案例class定义:

case class SQLMapping(id: String,
                      it: InstrumentPrivateKey,
                      cc: Option[String],
                      ri: Option[SourceInstrumentId],
                      p: Option[SourceInstrumentId],
                      m: Option[SourceInstrumentId])

case class SourceInstrumentId(instrumentId: Long,
                              providerId: String)

case class InstrumentPrivateKey(instrumentId: Long,
                                providerId: String,
                                clientId: String)

导致问题的代码:

import session.implicits._
def someFunc(future: Future[ID]): Dataset[SQLMappins] = {
future.map {f =>
val seq: Seq[SQLMapping] = getFromEndpoint(f)
val ds: Dataset[SQLMapping] = seq.toDS()
...
 }
}

job 有时会工作,但是如果我重新运行 job,它会抛出异常。

更新 28.03.2018 我忘了提一个细节,事实证明这很重要。 数据集是在 Future.

内部构造的

在 future 中调用 toDS() 导致 ScalaReflectionException。

我决定在future.map之外构造DataSet。

您可以使用此示例作业验证无法在 future.map 中构建数据集。

package com.example.sparkapplications

import com.typesafe.config.Config
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import spark.jobserver.SparkJob
import spark.jobserver.SparkJobValid
import spark.jobserver.SparkJobValidation

object FutureJob extends SparkJob{
  override def runJob(sc: SparkContext,
                      jobConfig: Config): Any = {
    val session = SparkSession.builder().config(sc.getConf).getOrCreate()
    import session.implicits._
    val f = Future{
      val seq = Seq(
        Dummy("1", 1),
        Dummy("2", 2),
        Dummy("3", 3),
        Dummy("4", 4),
        Dummy("5", 5)
      )

      val ds = seq.toDS

      ds.collect()
    }

    Await.result(f, 10 seconds)
  }

  case class Dummy(id: String, value: Long)
  override def validate(sc: SparkContext,
                        config: Config): SparkJobValidation = SparkJobValid
}

如果使用 spark 2.3.0 仍然存在问题,以及当您直接通过 spark-submit 传递 jar 时,我稍后会提供信息。