保存到 cassandra 时有时会出现 NullPointerException

Getting sometimes NullPointerException while saving into cassandra

我有以下方法可以写入 cassandra 一段时间,它可以很好地保存数据。 当我再次 运行 时,有时它会抛出 NullPointerException 不知道这里出了什么问题......你能帮帮我吗?

'
  @throws(classOf[IOException])
  def writeDfToCassandra(o_model_family:DataFrame , keyspace:String, columnFamilyName: String) = {
    logger.info(s"writeDfToCassandra")

    o_model_family.write.format("org.apache.spark.sql.cassandra")
    .options(Map( "table" -> columnFamilyName, "keyspace" -> keyspace ))
    .mode(SaveMode.Append)
    .save()
  }

'
18/10/29 05:23:56 ERROR BMValsProcessor: java.lang.NullPointerException
    at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
    at java.util.regex.Matcher.reset(Matcher.java:309)
    at java.util.regex.Matcher.<init>(Matcher.java:229)
    at java.util.regex.Pattern.matcher(Pattern.java:1093)
    at scala.util.matching.Regex.findFirstIn(Regex.scala:388)
    at org.apache.spark.util.Utils$$anonfun$redact$$anonfun$apply.apply(Utils.scala:2698)
    at org.apache.spark.util.Utils$$anonfun$redact$$anonfun$apply.apply(Utils.scala:2698)
    at scala.Option.orElse(Option.scala:289)
    at org.apache.spark.util.Utils$$anonfun$redact.apply(Utils.scala:2698)
    at org.apache.spark.util.Utils$$anonfun$redact.apply(Utils.scala:2696)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.util.Utils$.redact(Utils.scala:2696)
    at org.apache.spark.util.Utils$.redact(Utils.scala:2663)
    at org.apache.spark.sql.internal.SQLConf$$anonfun$redactOptions.apply(SQLConf.scala:1650)
    at org.apache.spark.sql.internal.SQLConf$$anonfun$redactOptions.apply(SQLConf.scala:1650)
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
    at scala.collection.immutable.List.foldLeft(List.scala:84)
    at org.apache.spark.sql.internal.SQLConf.redactOptions(SQLConf.scala:1650)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.simpleString(SaveIntoDataSourceCommand.scala:52)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:178)
    at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:556)
    at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:480)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun.apply(QueryExecution.scala:198)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun.apply(QueryExecution.scala:198)
    at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100)
    at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:198)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
    at com.snp.utils.DbUtils$.writeDfToCassandra(DbUtils.scala:47)

奇怪的是,这在 Spark Utils 的 "redact" 函数中失败了。这用于可能传递给 Spark 的选项,以从 UI 等中删除敏感数据。我无法想象为什么空键名会在您的 SqlConf 中弹出(因为我相信您只能有空字符串)但我会在那里检查。执行方法时可能是 conf 的突变?