Apache Spark:从检查点恢复状态期间的 NPE

Apache Spark: NPE during restoring state from checkpoint

我们正在构建使用 HBase RDD 加入传入 DStream 的简单流应用程序。 示例代码:

val indexState = sc.newAPIHadoopRDD(
  conf,
  classOf[TableInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result]).map { case (rowkey, v) => //some logic}

val result = dStream.transform { rdd =>
  rdd.leftOuterJoin(indexState)
}  

它工作正常,但是当我们为 StreamingContext 启用检查点时 并让应用程序从先前创建的检查点恢复, 它总是抛出 NullPointerException。

ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
java.lang.NullPointerException
        at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119)
        at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)

有人遇到过同样的问题吗? 版本:

谢谢!

Spark Streaming 检查点不能用于从以前的作业中恢复,至少在 1.6.x 中是这样。如果您的作业停止并且re-submitted,检查点数据不能是re-used。在提交作业之前,您必须删除所有旧的检查点数据。

[R]estarting from earlier checkpoint information of pre-upgrade code cannot be done. The checkpoint information essentially contains serialized Scala/Java/Python objects and trying to deserialize objects with new, modified classes may lead to errors. In this case, either start the upgraded app with a different checkpoint directory, or delete the previous checkpoint directory.

Upgrading the code - checkpointing