如何使用 spark 将 kafka 主题中的流数据写入 hdfs?
How can I use spark to writeStream data from a kafka topic into hdfs?
几个小时以来,我一直在努力让这段代码正常工作:
val spark = SparkSession.builder()
.appName("Consumer")
.getOrCreate()
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", url)
.option("subscribe", topic)
.load()
.select("value")
.writeStream
.format(fileFormat)
.option("path", filePath)
.option("checkpointLocation", "/tmp/checkpoint")
.start()
.awaitTermination()
它给出了这个例外:
Logical Plan:
Project [value#8]
+- StreamingExecutionRelation KafkaV2[Subscribe[MyTopic]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:189)
Caused by: java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$$anonfun$apply.apply(MicroBatchExecution.scala:405)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$$anonfun$apply.apply(MicroBatchExecution.scala:390)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch.apply(MicroBatchExecution.scala:390)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch.apply(MicroBatchExecution.scala:390)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply$mcV$sp(MicroBatchExecution.scala:133)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
我不明白发生了什么,我只是想使用 spark streaming 将数据从 kafka 主题写入 HDFS。为什么这么难?我该怎么做?
我得到的批处理版本工作得很好:
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", url)
.option("subscribe", topic)
.load()
.selectExpr("CAST(value AS String)")
.write
.format(fileFormat)
.save(filePath)
@happy 您在结构化流媒体中遇到了一个已知错误 https://issues.apache.org/jira/browse/SPARK-25257
这是因为磁盘偏移量从未被反序列化,修复将在即将发布的版本中合并
在我将我的 spark 版本更改为 2.3.2
后,一切都开始工作了。
几个小时以来,我一直在努力让这段代码正常工作:
val spark = SparkSession.builder()
.appName("Consumer")
.getOrCreate()
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", url)
.option("subscribe", topic)
.load()
.select("value")
.writeStream
.format(fileFormat)
.option("path", filePath)
.option("checkpointLocation", "/tmp/checkpoint")
.start()
.awaitTermination()
它给出了这个例外:
Logical Plan:
Project [value#8]
+- StreamingExecutionRelation KafkaV2[Subscribe[MyTopic]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:189)
Caused by: java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$$anonfun$apply.apply(MicroBatchExecution.scala:405)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$$anonfun$apply.apply(MicroBatchExecution.scala:390)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch.apply(MicroBatchExecution.scala:390)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch.apply(MicroBatchExecution.scala:390)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply$mcV$sp(MicroBatchExecution.scala:133)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
我不明白发生了什么,我只是想使用 spark streaming 将数据从 kafka 主题写入 HDFS。为什么这么难?我该怎么做?
我得到的批处理版本工作得很好:
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", url)
.option("subscribe", topic)
.load()
.selectExpr("CAST(value AS String)")
.write
.format(fileFormat)
.save(filePath)
@happy 您在结构化流媒体中遇到了一个已知错误 https://issues.apache.org/jira/browse/SPARK-25257
这是因为磁盘偏移量从未被反序列化,修复将在即将发布的版本中合并
在我将我的 spark 版本更改为 2.3.2
后,一切都开始工作了。