在 spark structured streaming 中出错
getting error in spark structured streaming
我正在尝试使用 python 在 Kafka 的 spark 结构化流媒体上创建 POC,下面是代码。
Spark 版本 - 2.3.2
卡夫卡 - 2.11-2.1.0
Hadoop - 2.8.3
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
brokers, topic = sys.argv[1:]
print("broker : {} and Topic : {}".format(brokers,topic))
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", brokers) \
.option("subscribe", topic) \
.load()
numbericdf = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
numbericdf.createOrReplaceTempView("updates")
average = spark.sql("select value from updates")
print(average)
query = average \
.writeStream \
.outputMode("append") \
.format("console")\
.start()
query.awaitTermination()
关于 spark-submit 出现以下错误。
.\bin\spark-submit --packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 spark_struct.py
localhost:9092 tempre
19/01/07 13:34:55 ERROR MicroBatchExecution: Query [id = 03fdd202-d795-4f69-ad8d-e712568e3d88, runId = 03dd8e92-ffee-414d-99a1-f43a819630dd] terminated with error
java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at org.apache.spark.util.FieldAccessFinder$$anon$$anonfun$visitMethodInsn.apply(ClosureCleaner.scala:449)
at org.apache.spark.util.FieldAccessFinder$$anon$$anonfun$visitMethodInsn.apply(ClosureCleaner.scala:432)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$$anonfun$foreach.apply(HashMap.scala:103)
at scala.collection.mutable.HashMap$$anon$$anonfun$foreach.apply(HashMap.scala:103)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon.foreach(HashMap.scala:103)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon.visitMethodInsn(ClosureCleaner.scala:432)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean.apply(ClosureCleaner.scala:262)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean.apply(ClosureCleaner.scala:261)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:261)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2073)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
从 java 11 移动到 java 8 后,此问题已解决。
我正在尝试使用 python 在 Kafka 的 spark 结构化流媒体上创建 POC,下面是代码。
Spark 版本 - 2.3.2 卡夫卡 - 2.11-2.1.0 Hadoop - 2.8.3
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
brokers, topic = sys.argv[1:]
print("broker : {} and Topic : {}".format(brokers,topic))
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", brokers) \
.option("subscribe", topic) \
.load()
numbericdf = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
numbericdf.createOrReplaceTempView("updates")
average = spark.sql("select value from updates")
print(average)
query = average \
.writeStream \
.outputMode("append") \
.format("console")\
.start()
query.awaitTermination()
关于 spark-submit 出现以下错误。
.\bin\spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 spark_struct.py localhost:9092 tempre
19/01/07 13:34:55 ERROR MicroBatchExecution: Query [id = 03fdd202-d795-4f69-ad8d-e712568e3d88, runId = 03dd8e92-ffee-414d-99a1-f43a819630dd] terminated with error
java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at org.apache.spark.util.FieldAccessFinder$$anon$$anonfun$visitMethodInsn.apply(ClosureCleaner.scala:449)
at org.apache.spark.util.FieldAccessFinder$$anon$$anonfun$visitMethodInsn.apply(ClosureCleaner.scala:432)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$$anonfun$foreach.apply(HashMap.scala:103)
at scala.collection.mutable.HashMap$$anon$$anonfun$foreach.apply(HashMap.scala:103)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon.foreach(HashMap.scala:103)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon.visitMethodInsn(ClosureCleaner.scala:432)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean.apply(ClosureCleaner.scala:262)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean.apply(ClosureCleaner.scala:261)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:261)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2073)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
从 java 11 移动到 java 8 后,此问题已解决。