将 Spark 结构化流数据帧转换为 Pandas 数据帧
Convert Spark Structure Streaming DataFrames to Pandas DataFrame
我设置了一个从 Kafka 主题消费的 Spark Streaming 应用程序,我需要使用一些接受 Pandas Dataframe 的 API,但是当我尝试转换它时,我得到了这个
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.completeString(QueryExecution.scala:219)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:202)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:62)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2832)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2809)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:745)
这是我的 python 代码
spark = SparkSession\
.builder\
.appName("sparkDf to pandasDf")\
.getOrCreate()
sparkDf = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafkahost:9092")\
.option("subscribe", "mytopic")\
.option("startingOffsets", "earliest")\
.load()
pandas_df = sparkDf.toPandas()
query = sparkDf.writeStream\
.outputMode("append")\
.format("console")\
.option("truncate", "false")\
.trigger(processingTime="5 seconds")\
.start()\
.awaitTermination()
现在我知道我正在创建流数据帧的另一个实例,但无论我在哪里尝试使用 start() 和 awaitTermination(),我都会遇到同样的错误。
有什么想法吗?
TL;DR这样的操作是行不通的
Now I am aware I am creating another instance of a streaming Dataframe
好吧,问题是你真的不知道。 toPandas
,调用 DataFrame
创建一个简单的本地非分布式 Pandas DataFrame
,。
它不仅与 Spark 无关,而且作为抽象本质上与结构化流不兼容 - Pandas DataFrame
表示一组固定的元组,而结构化流表示一个无限的元组流。
不太清楚你在这里试图实现什么,这可能是 XY 问题,但如果你真的需要使用 Pandas 和结构化流,你可以尝试使用 pandas_udf
- SCALAR
和 GROUPED_MAP
变体至少与基于时间的基本触发器兼容(也可能支持其他变体,尽管某些组合显然没有任何意义,我不知道任何官方兼容性矩阵)。
我设置了一个从 Kafka 主题消费的 Spark Streaming 应用程序,我需要使用一些接受 Pandas Dataframe 的 API,但是当我尝试转换它时,我得到了这个
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
at org.apache.spark.sql.execution.QueryExecution.completeString(QueryExecution.scala:219)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:202)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:62)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2832)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2809)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:745)
这是我的 python 代码
spark = SparkSession\
.builder\
.appName("sparkDf to pandasDf")\
.getOrCreate()
sparkDf = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafkahost:9092")\
.option("subscribe", "mytopic")\
.option("startingOffsets", "earliest")\
.load()
pandas_df = sparkDf.toPandas()
query = sparkDf.writeStream\
.outputMode("append")\
.format("console")\
.option("truncate", "false")\
.trigger(processingTime="5 seconds")\
.start()\
.awaitTermination()
现在我知道我正在创建流数据帧的另一个实例,但无论我在哪里尝试使用 start() 和 awaitTermination(),我都会遇到同样的错误。
有什么想法吗?
TL;DR这样的操作是行不通的
Now I am aware I am creating another instance of a streaming Dataframe
好吧,问题是你真的不知道。 toPandas
,调用 DataFrame
创建一个简单的本地非分布式 Pandas DataFrame
,
它不仅与 Spark 无关,而且作为抽象本质上与结构化流不兼容 - Pandas DataFrame
表示一组固定的元组,而结构化流表示一个无限的元组流。
不太清楚你在这里试图实现什么,这可能是 XY 问题,但如果你真的需要使用 Pandas 和结构化流,你可以尝试使用 pandas_udf
- SCALAR
和 GROUPED_MAP
变体至少与基于时间的基本触发器兼容(也可能支持其他变体,尽管某些组合显然没有任何意义,我不知道任何官方兼容性矩阵)。