java.util.ConcurrentModificationException: KafkaConsumer 多线程访问不安全
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
我有一个 Scala
Spark Streaming
应用程序,它从 3 个不同的 Kafka producers
接收来自同一主题的数据。
Spark 流应用程序在主机 0.0.0.179
的机器上,Kafka 服务器在主机 0.0.0.178
的机器上,Kafka producers
在主机 0.0.0.180
上, 0.0.0.181
, 0.0.0.182
.
当我尝试 运行 时,Spark Streaming
应用程序出现以下错误
Exception in thread "main" org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0 in stage 19.0 failed 1 times,
most recent failure: Lost task 0.0 in stage 19.0 (TID 19, localhost):
java.util.ConcurrentModificationException: KafkaConsumer is not safe
for multi-threaded access at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1625)
at
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1198)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194)
at scala.collection.Iterator$$anon.next(Iterator.scala:409) at
scala.collection.Iterator$$anon.next(Iterator.scala:409) at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply$mcV$sp(PairRDDFunctions.scala:1204)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1203)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1203)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1211)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1190)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
现在我阅读了数千篇不同的帖子,但似乎没有人能够找到解决这个问题的方法。
我该如何处理我的申请?我是否必须修改 Kakfa 上的某些参数(目前 num.partition
参数设置为 1)?
以下是我的应用程序代码:
// Create the context with a 5 second batch size
val sparkConf = new SparkConf().setAppName("SparkScript").set("spark.driver.allowMultipleContexts", "true").set("spark.streaming.concurrentJobs", "3").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(3))
case class Thema(name: String, metadata: String)
case class Tempo(unit: String, count: Int, metadata: String)
case class Spatio(unit: String, metadata: String)
case class Stt(spatial: Spatio, temporal: Tempo, thematic: Thema)
case class Location(latitude: Double, longitude: Double, name: String)
case class Datas1(location : Location, timestamp : String, windspeed : Double, direction: String, strenght : String)
case class Sensors1(sensor_name: String, start_date: String, end_date: String, data1: Datas1, stt: Stt)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "0.0.0.178:9092",
"key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"group.id" -> "test_luca",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics1 = Array("topics1")
val s1 = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(record => {
implicit val formats = DefaultFormats
parse(record.value).extract[Sensors1]
}
)
s1.print()
s1.saveAsTextFiles("results/", "")
ssc.start()
ssc.awaitTermination()
谢谢
你的问题在这里:
s1.print()
s1.saveAsTextFiles("results/", "")
由于 Spark 创建了流图,并且您在此处定义了两个流:
Read from Kafka -> Print to console
Read from Kafka -> Save to text file
Spark 将尝试同时 运行 这两个图,因为它们彼此独立。由于 Kafka 使用缓存消费者方法,因此它实际上是在尝试对两个流执行使用相同的消费者。
您可以做的是在 运行 执行两个查询之前缓存 DStream
:
val dataFromKafka = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(/* stuff */)
val cachedStream = dataFromKafka.cache()
cachedStream.print()
cachedStream.saveAsTextFiles("results/", "")
使用缓存对我有用。在我的例子中,打印、转换然后在 JavaPairDstream 上打印给了我那个错误。
我在第一次打印之前使用了缓存,它对我有用。
s1.print()
s1.saveAsTextFiles("results/", "")
下面的代码可以工作,我使用了类似的代码。
s1.cache();
s1.print();
s1.saveAsTextFiles("results/", "");
我有一个 Scala
Spark Streaming
应用程序,它从 3 个不同的 Kafka producers
接收来自同一主题的数据。
Spark 流应用程序在主机 0.0.0.179
的机器上,Kafka 服务器在主机 0.0.0.178
的机器上,Kafka producers
在主机 0.0.0.180
上, 0.0.0.181
, 0.0.0.182
.
当我尝试 运行 时,Spark Streaming
应用程序出现以下错误
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 1 times, most recent failure: Lost task 0.0 in stage 19.0 (TID 19, localhost): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1625) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1198) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194) at scala.collection.Iterator$$anon.next(Iterator.scala:409) at scala.collection.Iterator$$anon.next(Iterator.scala:409) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply$mcV$sp(PairRDDFunctions.scala:1204) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1203) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1203) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1211) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1190) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)
现在我阅读了数千篇不同的帖子,但似乎没有人能够找到解决这个问题的方法。
我该如何处理我的申请?我是否必须修改 Kakfa 上的某些参数(目前 num.partition
参数设置为 1)?
以下是我的应用程序代码:
// Create the context with a 5 second batch size
val sparkConf = new SparkConf().setAppName("SparkScript").set("spark.driver.allowMultipleContexts", "true").set("spark.streaming.concurrentJobs", "3").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(3))
case class Thema(name: String, metadata: String)
case class Tempo(unit: String, count: Int, metadata: String)
case class Spatio(unit: String, metadata: String)
case class Stt(spatial: Spatio, temporal: Tempo, thematic: Thema)
case class Location(latitude: Double, longitude: Double, name: String)
case class Datas1(location : Location, timestamp : String, windspeed : Double, direction: String, strenght : String)
case class Sensors1(sensor_name: String, start_date: String, end_date: String, data1: Datas1, stt: Stt)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "0.0.0.178:9092",
"key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"group.id" -> "test_luca",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics1 = Array("topics1")
val s1 = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(record => {
implicit val formats = DefaultFormats
parse(record.value).extract[Sensors1]
}
)
s1.print()
s1.saveAsTextFiles("results/", "")
ssc.start()
ssc.awaitTermination()
谢谢
你的问题在这里:
s1.print()
s1.saveAsTextFiles("results/", "")
由于 Spark 创建了流图,并且您在此处定义了两个流:
Read from Kafka -> Print to console
Read from Kafka -> Save to text file
Spark 将尝试同时 运行 这两个图,因为它们彼此独立。由于 Kafka 使用缓存消费者方法,因此它实际上是在尝试对两个流执行使用相同的消费者。
您可以做的是在 运行 执行两个查询之前缓存 DStream
:
val dataFromKafka = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(/* stuff */)
val cachedStream = dataFromKafka.cache()
cachedStream.print()
cachedStream.saveAsTextFiles("results/", "")
使用缓存对我有用。在我的例子中,打印、转换然后在 JavaPairDstream 上打印给了我那个错误。 我在第一次打印之前使用了缓存,它对我有用。
s1.print()
s1.saveAsTextFiles("results/", "")
下面的代码可以工作,我使用了类似的代码。
s1.cache();
s1.print();
s1.saveAsTextFiles("results/", "");