如何从 Kafka 读取并打印出记录以在 pyspark 的结构化流中进行控制台?
How to read from Kafka and print out records to console in Structured Streaming in pyspark?
我正在使用 Spark 2.4.3、Scala 2.11.8、Java 1.8 并使用此 spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 data_stream.py
提交作业。
下面是给出异常的代码(见下文):
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "service-calls") \
.option("startingOffsets", "earliest") \
.load()
df.printSchema()
schema = StructType([
StructField("crime_id", StringType(), True),
StructField("original_crime_type_name", StringType(), True),
StructField("report_date", StringType(), True),
StructField("call_date", StringType(), True),
StructField("offense_date", StringType(), True),
StructField("call_time", StringType(), True),
StructField("call_date_time", StringType(), True),
StructField("disposition", StringType(), True),
StructField("address", StringType(), True),
StructField("city", StringType(), True),
StructField("state", StringType(), True),
StructField("agency_id", StringType(), True),
StructField("address_type", StringType(), True),
StructField("common_location", StringType(), True)
])
kafka_df_string = df.selectExpr("CAST(value AS STRING)")
service_table = kafka_df_string.select(psf.from_json(psf.col('value'), schema).alias("SERVICE_CALLS"))
distinct_table = service_table.select(psf.col('crime_id'))
query = distinct_table.writeStream.format("console").start()
query.awaitTermination()
当我 运行 代码时,它给出了以下错误:
Traceback (most recent call last):
File "/Users/PycharmProjects/data-streaming-project/streaming/data_stream.py", line 55, in <module>
run_spark_job(spark)
File "/Users/PycharmProjects/data-streaming-project/streaming/data_stream.py", line 39, in run_spark_job
distinct_table = service_table.select(psf.col('crime_id'))
File "/Users/dev/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 1202, in select
File "/Users/dev/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
File "/Users/dev/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: u"cannot resolve '`crime_id`' given input columns: [SERVICE_CALLS];;\n'Project ['crime_id]\n+- AnalysisBarrier\n +- Project [jsontostructs(StructField(crime_id,StringType,true), StructField(original_crime_type_name,StringType,true), StructField(report_date,StringType,true), StructField(call_date,StringType,true), StructField(offense_date,StringType,true), StructField(call_time,StringType,true), StructField(call_date_time,StringType,true), StructField(disposition,StringType,true), StructField(address,StringType,true), StructField(city,StringType,true), StructField(state,StringType,true), StructField(agency_id,StringType,true), StructField(address_type,StringType,true), StructField(common_location,StringType,true), value#21, Some(America/Los_Angeles)) AS SERVICE_CALLS#23]\n +- Project [cast(value#8 as string) AS value#21]\n +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@5c2220e0, kafka, Map(startingOffsets -> earliest, subscribe -> service-calls, kafka.bootstrap.servers -> localhost:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@6faadbc2,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> service-calls, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n"
感谢任何帮助!
编辑
既然我在代码中附加了 .select("SERVICE_CALLS.*")
,
我在 awaitTermination
行中遇到了很多错误。
pyspark.sql.utils.StreamingQueryException: u'org.apache.spark.sql.kafka010.KafkaMicroBatchReader.createDataReaderFactories()Ljava/util/List;\n=== Streaming Query ===\nIdentifier: [id = 6c33d7c9-3f9c-428d-aece-66f94315545a, runId = a6a41d93-b470-4636-b292-172ab13c36c7]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {KafkaV2[Subscribe[service-calls]]: {"service-calls":{"0":1414}}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [crime_id#25]\n+- Project [SERVICE_CALLS#23.crime_id AS crime_id#25, SERVICE_CALLS#23.original_crime_type_name AS original_crime_type_name#26, SERVICE_CALLS#23.report_date AS report_date#27, SERVICE_CALLS#23.call_date AS call_date#28, SERVICE_CALLS#23.offense_date AS offense_date#29, SERVICE_CALLS#23.call_time AS call_time#30, SERVICE_CALLS#23.call_date_time AS call_date_time#31, SERVICE_CALLS#23.disposition AS disposition#32, SERVICE_CALLS#23.address AS address#33, SERVICE_CALLS#23.city AS city#34, SERVICE_CALLS#23.state AS state#35, SERVICE_CALLS#23.agency_id AS agency_id#36, SERVICE_CALLS#23.address_type AS address_type#37, SERVICE_CALLS#23.common_location AS common_location#38]\n +- Project [jsontostructs(StructField(crime_id,StringType,true), StructField(original_crime_type_name,StringType,true), StructField(report_date,StringType,true), StructField(call_date,StringType,true), StructField(offense_date,StringType,true), StructField(call_time,StringType,true), StructField(call_date_time,StringType,true), StructField(disposition,StringType,true), StructField(address,StringType,true), StructField(city,StringType,true), StructField(state,StringType,true), StructField(agency_id,StringType,true), StructField(address_type,StringType,true), StructField(common_location,StringType,true), value#21, Some(America/Los_Angeles)) AS SERVICE_CALLS#23]\n +- Project [cast(value#8 as string) AS value#21]\n +- StreamingExecutionRelation KafkaV2[Subscribe[service-calls]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]\n'
和
java.lang.AbstractMethodError: org.apache.spark.sql.kafka010.KafkaMicroBatchReader.createDataReaderFactories()Ljava/util/List;
实际上这里有几件事。一个是错别字,一个是更严重的。
service_table
dataframe 在你执行 kafka_df_string.select(psf.from_json(psf.col('value'), schema).alias("SERVICE_CALLS"))
之后只有一列 SERVICE_CALLS
所以你不能 service_table.select(psf.col('crime_id'))
因为 crime_id
列实际上并不存在。那很容易,不是吗? :)
更严重的问题是 spark-submit
(来自 /Users/dev/spark-2.3.0-bin-hadoop2.7
目录),而 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
使用 2.4.3
作为 Spark 版本。它们根本不匹配,因此例外:
java.lang.AbstractMethodError: org.apache.spark.sql.kafka010.KafkaMicroBatchReader.createDataReaderFactories()Ljava/util/List;
请使用 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
(对于 Spark 2.3.0)来匹配您的 spark-submit
,您应该没问题。
我正在使用 Spark 2.4.3、Scala 2.11.8、Java 1.8 并使用此 spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 data_stream.py
提交作业。
下面是给出异常的代码(见下文):
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "service-calls") \
.option("startingOffsets", "earliest") \
.load()
df.printSchema()
schema = StructType([
StructField("crime_id", StringType(), True),
StructField("original_crime_type_name", StringType(), True),
StructField("report_date", StringType(), True),
StructField("call_date", StringType(), True),
StructField("offense_date", StringType(), True),
StructField("call_time", StringType(), True),
StructField("call_date_time", StringType(), True),
StructField("disposition", StringType(), True),
StructField("address", StringType(), True),
StructField("city", StringType(), True),
StructField("state", StringType(), True),
StructField("agency_id", StringType(), True),
StructField("address_type", StringType(), True),
StructField("common_location", StringType(), True)
])
kafka_df_string = df.selectExpr("CAST(value AS STRING)")
service_table = kafka_df_string.select(psf.from_json(psf.col('value'), schema).alias("SERVICE_CALLS"))
distinct_table = service_table.select(psf.col('crime_id'))
query = distinct_table.writeStream.format("console").start()
query.awaitTermination()
当我 运行 代码时,它给出了以下错误:
Traceback (most recent call last):
File "/Users/PycharmProjects/data-streaming-project/streaming/data_stream.py", line 55, in <module>
run_spark_job(spark)
File "/Users/PycharmProjects/data-streaming-project/streaming/data_stream.py", line 39, in run_spark_job
distinct_table = service_table.select(psf.col('crime_id'))
File "/Users/dev/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 1202, in select
File "/Users/dev/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
File "/Users/dev/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: u"cannot resolve '`crime_id`' given input columns: [SERVICE_CALLS];;\n'Project ['crime_id]\n+- AnalysisBarrier\n +- Project [jsontostructs(StructField(crime_id,StringType,true), StructField(original_crime_type_name,StringType,true), StructField(report_date,StringType,true), StructField(call_date,StringType,true), StructField(offense_date,StringType,true), StructField(call_time,StringType,true), StructField(call_date_time,StringType,true), StructField(disposition,StringType,true), StructField(address,StringType,true), StructField(city,StringType,true), StructField(state,StringType,true), StructField(agency_id,StringType,true), StructField(address_type,StringType,true), StructField(common_location,StringType,true), value#21, Some(America/Los_Angeles)) AS SERVICE_CALLS#23]\n +- Project [cast(value#8 as string) AS value#21]\n +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@5c2220e0, kafka, Map(startingOffsets -> earliest, subscribe -> service-calls, kafka.bootstrap.servers -> localhost:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@6faadbc2,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> service-calls, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n"
感谢任何帮助!
编辑
既然我在代码中附加了 .select("SERVICE_CALLS.*")
,
我在 awaitTermination
行中遇到了很多错误。
pyspark.sql.utils.StreamingQueryException: u'org.apache.spark.sql.kafka010.KafkaMicroBatchReader.createDataReaderFactories()Ljava/util/List;\n=== Streaming Query ===\nIdentifier: [id = 6c33d7c9-3f9c-428d-aece-66f94315545a, runId = a6a41d93-b470-4636-b292-172ab13c36c7]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {KafkaV2[Subscribe[service-calls]]: {"service-calls":{"0":1414}}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [crime_id#25]\n+- Project [SERVICE_CALLS#23.crime_id AS crime_id#25, SERVICE_CALLS#23.original_crime_type_name AS original_crime_type_name#26, SERVICE_CALLS#23.report_date AS report_date#27, SERVICE_CALLS#23.call_date AS call_date#28, SERVICE_CALLS#23.offense_date AS offense_date#29, SERVICE_CALLS#23.call_time AS call_time#30, SERVICE_CALLS#23.call_date_time AS call_date_time#31, SERVICE_CALLS#23.disposition AS disposition#32, SERVICE_CALLS#23.address AS address#33, SERVICE_CALLS#23.city AS city#34, SERVICE_CALLS#23.state AS state#35, SERVICE_CALLS#23.agency_id AS agency_id#36, SERVICE_CALLS#23.address_type AS address_type#37, SERVICE_CALLS#23.common_location AS common_location#38]\n +- Project [jsontostructs(StructField(crime_id,StringType,true), StructField(original_crime_type_name,StringType,true), StructField(report_date,StringType,true), StructField(call_date,StringType,true), StructField(offense_date,StringType,true), StructField(call_time,StringType,true), StructField(call_date_time,StringType,true), StructField(disposition,StringType,true), StructField(address,StringType,true), StructField(city,StringType,true), StructField(state,StringType,true), StructField(agency_id,StringType,true), StructField(address_type,StringType,true), StructField(common_location,StringType,true), value#21, Some(America/Los_Angeles)) AS SERVICE_CALLS#23]\n +- Project [cast(value#8 as string) AS value#21]\n +- StreamingExecutionRelation KafkaV2[Subscribe[service-calls]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]\n'
和
java.lang.AbstractMethodError: org.apache.spark.sql.kafka010.KafkaMicroBatchReader.createDataReaderFactories()Ljava/util/List;
实际上这里有几件事。一个是错别字,一个是更严重的。
service_table
dataframe 在你执行 kafka_df_string.select(psf.from_json(psf.col('value'), schema).alias("SERVICE_CALLS"))
之后只有一列 SERVICE_CALLS
所以你不能 service_table.select(psf.col('crime_id'))
因为 crime_id
列实际上并不存在。那很容易,不是吗? :)
更严重的问题是 spark-submit
(来自 /Users/dev/spark-2.3.0-bin-hadoop2.7
目录),而 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
使用 2.4.3
作为 Spark 版本。它们根本不匹配,因此例外:
java.lang.AbstractMethodError: org.apache.spark.sql.kafka010.KafkaMicroBatchReader.createDataReaderFactories()Ljava/util/List;
请使用 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
(对于 Spark 2.3.0)来匹配您的 spark-submit
,您应该没问题。