在 spark structured streaming 中出错

getting error in spark structured streaming

我正在尝试使用 python 在 Kafka 的 spark 结构化流媒体上创建 POC,下面是代码。

Spark 版本 - 2.3.2 卡夫卡 - 2.11-2.1.0 Hadoop - 2.8.3

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

brokers, topic = sys.argv[1:]
print("broker : {} and Topic : {}".format(brokers,topic))    

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", brokers) \
  .option("subscribe", topic) \
  .load()

numbericdf = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
numbericdf.createOrReplaceTempView("updates")
average  = spark.sql("select value from updates")
print(average)

query = average \
    .writeStream \
    .outputMode("append") \
    .format("console")\
    .start()

query.awaitTermination()

关于 spark-submit 出现以下错误。

.\bin\spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 spark_struct.py localhost:9092 tempre

19/01/07 13:34:55 ERROR MicroBatchExecution: Query [id = 03fdd202-d795-4f69-ad8d-e712568e3d88, runId = 03dd8e92-ffee-414d-99a1-f43a819630dd] terminated with error
java.lang.IllegalArgumentException
        at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
        at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
        at org.apache.spark.util.FieldAccessFinder$$anon$$anonfun$visitMethodInsn.apply(ClosureCleaner.scala:449)
        at org.apache.spark.util.FieldAccessFinder$$anon$$anonfun$visitMethodInsn.apply(ClosureCleaner.scala:432)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach.apply(TraversableLike.scala:733)
        at scala.collection.mutable.HashMap$$anon$$anonfun$foreach.apply(HashMap.scala:103)
        at scala.collection.mutable.HashMap$$anon$$anonfun$foreach.apply(HashMap.scala:103)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap$$anon.foreach(HashMap.scala:103)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
        at org.apache.spark.util.FieldAccessFinder$$anon.visitMethodInsn(ClosureCleaner.scala:432)
        at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
        at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean.apply(ClosureCleaner.scala:262)
        at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean.apply(ClosureCleaner.scala:261)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:261)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2073)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:945)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)

从 java 11 移动到 java 8 后,此问题已解决。