Spark 流式处理异常:java.util.NoSuchElementException:None.get
Spark Streaming Exception: java.util.NoSuchElementException: None.get
我正在将 SparkStreaming
数据写入 HDFS,方法是将其转换为数据帧:
代码
object KafkaSparkHdfs {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")
sparkConf.set("spark.driver.allowMultipleContexts", "true");
val sc = new SparkContext(sparkConf)
def main(args: Array[String]): Unit = {
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val ssc = new StreamingContext(sparkConf, Seconds(20))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "stream3",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("fridaydata")
val stream = KafkaUtils.createDirectStream[String, String](
ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)
)
val lines = stream.map(consumerRecord => consumerRecord.value)
val words = lines.flatMap(_.split(" "))
val wordMap = words.map(word => (word, 1))
val wordCount = wordMap.reduceByKey(_ + _)
wordCount.foreachRDD(rdd => {
val dataframe = rdd.toDF();
dataframe.write
.mode(SaveMode.Append)
.save("hdfs://localhost:9000/newfile24")
})
ssc.start()
ssc.awaitTermination()
}
}
文件夹已创建但文件未写入。
程序因以下错误而终止:
18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
at java.lang.Thread.run(Thread.java:748)
18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
在我的 pom 中,我使用了各自的依赖项:
- spark-core_2.11
- spark-sql_2.11
- spark-streaming_2.11
- spark-streaming-kafka-0-10_2.11
这里有一个明显的问题 - coalesce(1)
。
dataframe.coalesce(1)
虽然在许多情况下减少文件数量可能很诱人,但当且仅当数据量足够低以供节点处理时才应该这样做(显然这里没有)。
此外,让我引用the documentation:
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).
结论是您应该根据预期的数据量和所需的并行度相应地调整参数。 coalesce(1)
因此在实践中很少有用,尤其是在像流式传输这样的上下文中,其中数据属性可能会随时间变化。
错误是由于尝试同时 运行 多个 spark 上下文。将 allowMultipleContexts
设置为 true 主要用于测试目的,不鼓励使用它。因此,您的问题的解决方案是确保在所有地方都使用相同的 SparkContext
。从代码中我们可以看到 SparkContext
(sc
) 用于创建一个 SQLContext
,这很好。但是,在创建 StreamingContext
时不使用它,而是使用 SparkConf
。
通过查看 documentation 我们看到:
Create a StreamingContext by providing the configuration necessary for a new SparkContext
换句话说,通过使用 SparkConf
作为参数,将创建一个新的 SparkContext
。现在有两个独立的上下文。
这里最简单的解决方案是继续使用与以前相同的上下文。将创建 StreamingContext
的行更改为:
val ssc = new StreamingContext(sc, Seconds(20))
注意: 在较新版本的 Spark (2.0+) 中,请改用 SparkSession
。然后可以使用 StreamingContext(spark.sparkContext, ...)
创建一个新的流上下文。它可以如下所示:
val spark = SparkSession().builder
.setMaster("local[*]")
.setAppName("SparkKafka")
.getOrCreate()
import sqlContext.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
我正在将 SparkStreaming
数据写入 HDFS,方法是将其转换为数据帧:
代码
object KafkaSparkHdfs {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")
sparkConf.set("spark.driver.allowMultipleContexts", "true");
val sc = new SparkContext(sparkConf)
def main(args: Array[String]): Unit = {
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val ssc = new StreamingContext(sparkConf, Seconds(20))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "stream3",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("fridaydata")
val stream = KafkaUtils.createDirectStream[String, String](
ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)
)
val lines = stream.map(consumerRecord => consumerRecord.value)
val words = lines.flatMap(_.split(" "))
val wordMap = words.map(word => (word, 1))
val wordCount = wordMap.reduceByKey(_ + _)
wordCount.foreachRDD(rdd => {
val dataframe = rdd.toDF();
dataframe.write
.mode(SaveMode.Append)
.save("hdfs://localhost:9000/newfile24")
})
ssc.start()
ssc.awaitTermination()
}
}
文件夹已创建但文件未写入。
程序因以下错误而终止:
18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
at java.lang.Thread.run(Thread.java:748)
18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
在我的 pom 中,我使用了各自的依赖项:
- spark-core_2.11
- spark-sql_2.11
- spark-streaming_2.11
- spark-streaming-kafka-0-10_2.11
这里有一个明显的问题 - coalesce(1)
。
dataframe.coalesce(1)
虽然在许多情况下减少文件数量可能很诱人,但当且仅当数据量足够低以供节点处理时才应该这样做(显然这里没有)。
此外,让我引用the documentation:
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).
结论是您应该根据预期的数据量和所需的并行度相应地调整参数。 coalesce(1)
因此在实践中很少有用,尤其是在像流式传输这样的上下文中,其中数据属性可能会随时间变化。
错误是由于尝试同时 运行 多个 spark 上下文。将 allowMultipleContexts
设置为 true 主要用于测试目的,不鼓励使用它。因此,您的问题的解决方案是确保在所有地方都使用相同的 SparkContext
。从代码中我们可以看到 SparkContext
(sc
) 用于创建一个 SQLContext
,这很好。但是,在创建 StreamingContext
时不使用它,而是使用 SparkConf
。
通过查看 documentation 我们看到:
Create a StreamingContext by providing the configuration necessary for a new SparkContext
换句话说,通过使用 SparkConf
作为参数,将创建一个新的 SparkContext
。现在有两个独立的上下文。
这里最简单的解决方案是继续使用与以前相同的上下文。将创建 StreamingContext
的行更改为:
val ssc = new StreamingContext(sc, Seconds(20))
注意: 在较新版本的 Spark (2.0+) 中,请改用 SparkSession
。然后可以使用 StreamingContext(spark.sparkContext, ...)
创建一个新的流上下文。它可以如下所示:
val spark = SparkSession().builder
.setMaster("local[*]")
.setAppName("SparkKafka")
.getOrCreate()
import sqlContext.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(20))