Spark Kafka Streaming 作业因 InvalidClassException 而失败

Spark Kafka Streaming job failing due to InvalidClassException

我是 运行 在 Spark 2、CDH 5.9 中使用 Kafka 客户端 0.8 的流媒体作业。简单的目的是将信息持久化在 Impala 中,逐条记录。

我无法摆脱这个错误,因为我不知道它来自哪里:

16/12/14 08:43:28 ERROR scheduler.JobScheduler: Error running job streaming
job 1481726608000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0
(TID 132, datanode1, executor 1):
java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat;
local class incompatible: stream classdesc serialVersionUID = 1, 
local class serialVersionUID = 2

Direct Kafka Stream 由

创建
val streamingContext = new StreamingContext(spark.sparkContext, Seconds(2))
val kafkaParams = Map[String, String](
  "bootstrap.servers" -> "datanode1:9092,datanode2:9092,datanode3:9092",
  "group.id" -> "myconsumergroup",
  "auto.offset.reset" -> "largest")
val topics:Set[String] = Set("kafkatest")
val directKafkaStream  = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, topics)

并由以下人员处理:

val deviceMap = spark.read.parquet("/user/admin/temp/joinData.parquet").cache()

directKafkaStream.foreachRDD { rdd =>
  val avgData = spark.read.schema(jsonDatastruct).json(rdd.map(i => i._2)).select("data.*").as[JsonInfo]

  val deviceEnriched = avgData.join(deviceMap,Seq("COMMON_KEY"),"left")

    deviceEnriched.show(false)
    spark.sql("use my_database")
      deviceEnriched.write.mode("append").saveAsTable("tbl_persisted_kafka_stream")
}

streamingContext.start()
streamingContext.awaitTermination()

简短回答:消息是使用 commons-lang3 JAR 版本序列化的,该版本与您在 Spark 中使用的 JAR 不兼容

长答案:如果您刚刚用 Google 搜索该错误消息,然后搜索 Apache Commons 源代码,您会发现...

  • this post 深入探讨 Java "class incompatible" 序列化问题,一般来说
  • FastDateFormat 的源代码指出 serialVersionUID = 1L 直到 V3.1 but switching to serialVersionUID = 2L with V3.2(因为当时二进制结构已更改)

顺便说一句,我刚刚检查过,CDH 5.9 在 V3.1 中附带了 commons-lang3(对于 Hive、Impala、Sentry、Hive- in-Oozie、Sqoop-in-Oozie)和 V3.3.2(对于 Spark-in-Oozie)和 V3.4(对于 Sqoop)而 Spark 本身根本不需要它。去图吧。
由于 CDH 尚未随 Spark 2 一起提供,我猜您要么下载了 "beta" 包裹,要么下载了 Apache 版本——我检查过,Apache 版本 (V2.0.2) 随 commons-lang3 V3.3.2

我的 2 美分:只需在您的 Spark 2 命令行中强制 --jars /opt/cloudera/parcels/CDH/jars/commons-lang3-3.1.jar,看看是否足以解决您的问题。

Edit 额外支付 2 美分,确保您的 "custom" JAR 优先于YARN 类路径,--conf spark.yarn.user.classpath.first=true