Spark Kafka Streaming 作业因 InvalidClassException 而失败
Spark Kafka Streaming job failing due to InvalidClassException
我是 运行 在 Spark 2、CDH 5.9 中使用 Kafka 客户端 0.8 的流媒体作业。简单的目的是将信息持久化在 Impala 中,逐条记录。
我无法摆脱这个错误,因为我不知道它来自哪里:
16/12/14 08:43:28 ERROR scheduler.JobScheduler: Error running job streaming
job 1481726608000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0
(TID 132, datanode1, executor 1):
java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat;
local class incompatible: stream classdesc serialVersionUID = 1,
local class serialVersionUID = 2
Direct Kafka Stream 由
创建
val streamingContext = new StreamingContext(spark.sparkContext, Seconds(2))
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "datanode1:9092,datanode2:9092,datanode3:9092",
"group.id" -> "myconsumergroup",
"auto.offset.reset" -> "largest")
val topics:Set[String] = Set("kafkatest")
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, topics)
并由以下人员处理:
val deviceMap = spark.read.parquet("/user/admin/temp/joinData.parquet").cache()
directKafkaStream.foreachRDD { rdd =>
val avgData = spark.read.schema(jsonDatastruct).json(rdd.map(i => i._2)).select("data.*").as[JsonInfo]
val deviceEnriched = avgData.join(deviceMap,Seq("COMMON_KEY"),"left")
deviceEnriched.show(false)
spark.sql("use my_database")
deviceEnriched.write.mode("append").saveAsTable("tbl_persisted_kafka_stream")
}
streamingContext.start()
streamingContext.awaitTermination()
简短回答:消息是使用 commons-lang3
JAR 版本序列化的,该版本与您在 Spark 中使用的 JAR 不兼容。
长答案:如果您刚刚用 Google 搜索该错误消息,然后搜索 Apache Commons 源代码,您会发现...
- this post 深入探讨 Java "class incompatible" 序列化问题,一般来说
FastDateFormat
的源代码指出 serialVersionUID = 1L
直到 V3.1 but switching to serialVersionUID = 2L
with V3.2(因为当时二进制结构已更改)
顺便说一句,我刚刚检查过,CDH 5.9 在 V3.1 中附带了 commons-lang3
(对于 Hive、Impala、Sentry、Hive- in-Oozie、Sqoop-in-Oozie)和 V3.3.2(对于 Spark-in-Oozie)和 V3.4(对于 Sqoop)而 Spark 本身根本不需要它。去图吧。
由于 CDH 尚未随 Spark 2 一起提供,我猜您要么下载了 "beta" 包裹,要么下载了 Apache 版本——我检查过,Apache 版本 (V2.0.2) 随 commons-lang3
V3.3.2
我的 2 美分:只需在您的 Spark 2 命令行中强制 --jars /opt/cloudera/parcels/CDH/jars/commons-lang3-3.1.jar
,看看是否足以解决您的问题。
Edit 额外支付 2 美分,确保您的 "custom" JAR 优先于YARN 类路径,--conf spark.yarn.user.classpath.first=true
我是 运行 在 Spark 2、CDH 5.9 中使用 Kafka 客户端 0.8 的流媒体作业。简单的目的是将信息持久化在 Impala 中,逐条记录。
我无法摆脱这个错误,因为我不知道它来自哪里:
16/12/14 08:43:28 ERROR scheduler.JobScheduler: Error running job streaming
job 1481726608000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0
(TID 132, datanode1, executor 1):
java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat;
local class incompatible: stream classdesc serialVersionUID = 1,
local class serialVersionUID = 2
Direct Kafka Stream 由
创建val streamingContext = new StreamingContext(spark.sparkContext, Seconds(2))
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "datanode1:9092,datanode2:9092,datanode3:9092",
"group.id" -> "myconsumergroup",
"auto.offset.reset" -> "largest")
val topics:Set[String] = Set("kafkatest")
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, topics)
并由以下人员处理:
val deviceMap = spark.read.parquet("/user/admin/temp/joinData.parquet").cache()
directKafkaStream.foreachRDD { rdd =>
val avgData = spark.read.schema(jsonDatastruct).json(rdd.map(i => i._2)).select("data.*").as[JsonInfo]
val deviceEnriched = avgData.join(deviceMap,Seq("COMMON_KEY"),"left")
deviceEnriched.show(false)
spark.sql("use my_database")
deviceEnriched.write.mode("append").saveAsTable("tbl_persisted_kafka_stream")
}
streamingContext.start()
streamingContext.awaitTermination()
简短回答:消息是使用 commons-lang3
JAR 版本序列化的,该版本与您在 Spark 中使用的 JAR 不兼容。
长答案:如果您刚刚用 Google 搜索该错误消息,然后搜索 Apache Commons 源代码,您会发现...
- this post 深入探讨 Java "class incompatible" 序列化问题,一般来说
FastDateFormat
的源代码指出serialVersionUID = 1L
直到 V3.1 but switching toserialVersionUID = 2L
with V3.2(因为当时二进制结构已更改)
顺便说一句,我刚刚检查过,CDH 5.9 在 V3.1 中附带了 commons-lang3
(对于 Hive、Impala、Sentry、Hive- in-Oozie、Sqoop-in-Oozie)和 V3.3.2(对于 Spark-in-Oozie)和 V3.4(对于 Sqoop)而 Spark 本身根本不需要它。去图吧。
由于 CDH 尚未随 Spark 2 一起提供,我猜您要么下载了 "beta" 包裹,要么下载了 Apache 版本——我检查过,Apache 版本 (V2.0.2) 随 commons-lang3
V3.3.2
我的 2 美分:只需在您的 Spark 2 命令行中强制 --jars /opt/cloudera/parcels/CDH/jars/commons-lang3-3.1.jar
,看看是否足以解决您的问题。
Edit 额外支付 2 美分,确保您的 "custom" JAR 优先于YARN 类路径,--conf spark.yarn.user.classpath.first=true