如何以 CSV 格式编写窗口聚合?
How to write windowed aggregation in CSV format?
我正在开发一个 Spark Structured Streaming 应用程序,它流式传输 csv 文件并将它们与静态数据连接起来。 join后做了一些聚合
将查询结果以 CSV 格式写入 HDFS 时,出现以下错误:
19/01/09 14:00:30 ERROR MicroBatchExecution: Query [id = 830ca987-b55a-4c03-aa13-f71bc57e47ad, runId = 87cdb029-0022-4f1c-b55e-c2443c9f058a] terminated with error java.lang.UnsupportedOperationException: CSV data source does not support struct<start:timestamp,end:timestamp> data type.
at org.apache.spark.sql.execution.datasources.csv.CSVUtils$.org$apache$spark$sql$execution$datasources$csv$CSVUtils$$verifyType(CSVUtils.scala:127)
at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema.apply(CSVUtils.scala:131)
at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema.apply(CSVUtils.scala:131)
根本原因可能是什么?
以下是我的代码的相关部分:
val spark = SparkSession
.builder
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.streaming.checkpointLocation", "/user/sas/sparkCheckpoint")
.getOrCreate
...
val df_agg_without_time = sqlResultjoin
.withWatermark("event_time", "10 seconds")
.groupBy(
window($"event_time", "10 seconds", "5 seconds"),
$"section",
$"timestamp")
.agg(sum($"total") as "total")
...
finalTable_repo
.writeStream
.outputMode("append")
.partitionBy("xml_data_dt")
.format("csv")
.trigger(Trigger.ProcessingTime("2 seconds"))
.option("path", "hdfs://op/apps/hive/warehouse/area.db/finalTable_repo")
.start
您进行聚合的行 .groupBy(window($"event_time", "10 seconds", "5 seconds"), $"section", $"timestamp")
创建了 CSV 数据源不支持的 struct<start:timestamp,end:timestamp>
数据类型。
只需 df_agg_without_time.printSchema
即可看到该列。
一个解决方案就是将它转换为其他更简单的类型(可能使用 select
或 withColumn
)或者只是 select
出来(即不包含在以下数据框中) .
以下是一个示例批处理(非流式)结构化查询,它显示了流式结构化查询使用的模式(当您创建 df_agg_without_time
时)。
val q = spark
.range(4)
.withColumn("t", current_timestamp)
.groupBy(window($"t", "10 seconds"))
.count
scala> q.printSchema
root
|-- window: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- count: long (nullable = false)
对于示例流式查询,您可以使用速率数据源。
val q = spark
.readStream
.format("rate")
.load
.groupBy(window($"timestamp", "10 seconds"))
.count
scala> q.printSchema
root
|-- window: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- count: long (nullable = false)
我正在开发一个 Spark Structured Streaming 应用程序,它流式传输 csv 文件并将它们与静态数据连接起来。 join后做了一些聚合
将查询结果以 CSV 格式写入 HDFS 时,出现以下错误:
19/01/09 14:00:30 ERROR MicroBatchExecution: Query [id = 830ca987-b55a-4c03-aa13-f71bc57e47ad, runId = 87cdb029-0022-4f1c-b55e-c2443c9f058a] terminated with error java.lang.UnsupportedOperationException: CSV data source does not support struct<start:timestamp,end:timestamp> data type.
at org.apache.spark.sql.execution.datasources.csv.CSVUtils$.org$apache$spark$sql$execution$datasources$csv$CSVUtils$$verifyType(CSVUtils.scala:127)
at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema.apply(CSVUtils.scala:131)
at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema.apply(CSVUtils.scala:131)
根本原因可能是什么?
以下是我的代码的相关部分:
val spark = SparkSession
.builder
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.streaming.checkpointLocation", "/user/sas/sparkCheckpoint")
.getOrCreate
...
val df_agg_without_time = sqlResultjoin
.withWatermark("event_time", "10 seconds")
.groupBy(
window($"event_time", "10 seconds", "5 seconds"),
$"section",
$"timestamp")
.agg(sum($"total") as "total")
...
finalTable_repo
.writeStream
.outputMode("append")
.partitionBy("xml_data_dt")
.format("csv")
.trigger(Trigger.ProcessingTime("2 seconds"))
.option("path", "hdfs://op/apps/hive/warehouse/area.db/finalTable_repo")
.start
您进行聚合的行 .groupBy(window($"event_time", "10 seconds", "5 seconds"), $"section", $"timestamp")
创建了 CSV 数据源不支持的 struct<start:timestamp,end:timestamp>
数据类型。
只需 df_agg_without_time.printSchema
即可看到该列。
一个解决方案就是将它转换为其他更简单的类型(可能使用 select
或 withColumn
)或者只是 select
出来(即不包含在以下数据框中) .
以下是一个示例批处理(非流式)结构化查询,它显示了流式结构化查询使用的模式(当您创建 df_agg_without_time
时)。
val q = spark
.range(4)
.withColumn("t", current_timestamp)
.groupBy(window($"t", "10 seconds"))
.count
scala> q.printSchema
root
|-- window: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- count: long (nullable = false)
对于示例流式查询,您可以使用速率数据源。
val q = spark
.readStream
.format("rate")
.load
.groupBy(window($"timestamp", "10 seconds"))
.count
scala> q.printSchema
root
|-- window: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- count: long (nullable = false)