spark.sql.AnalysisException: 文本数据源不支持二进制数据类型
spark.sql.AnalysisException: Text data source does not support binary data type
我有一个 Spark Structured Streaming 作业,它从 Kafka 主题读取并将其写入 S3 存储桶。我 运行 使用 AWS Spark 2.4.7。基本例外是org.apache.spark.sql.AnalysisException:文本数据源不支持二进制数据类型。;
val df = spark
.readStream
.format("kafka")
.option(broker)
.option("subscribe", topic)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.keystore.location",keystore)
.option("kafka.ssl.keystore.password",pw)
.option("kafka.ssl.key.password",pw)
.option("startingOffsets","earliest")
.option("failOnDataLoss","false")
.load()
df.selectExpr("CAST(value AS STRING)")
.as[(String)]
// Write data from a DataFrame to S3 using a topic specified in the data
val ds = df
.writeStream
.format("text")
.option("checkpointLocation","/tmp/checkpoint")
.option("path", output_path)
.outputMode("append")
.start()
完全异常:
21/01/19 14:39:23 ERROR MicroBatchExecution: Query [id = 72525f8b-8fd0-4bd6-949d-7d8b0678271c, runId = 2e88f59a-13af-4f01-9836-454ff9039fc2] terminated with error
org.apache.spark.sql.AnalysisException: Text data source does not support binary data type.;
at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema.apply(DataSourceUtils.scala:83)
at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema.apply(DataSourceUtils.scala:81)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:81)
at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifyWriteSchema(DataSourceUtils.scala:36)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:100)
at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:131)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$$anonfun$apply.apply(MicroBatchExecution.scala:537)
at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery(SQLExecution.scala:83)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$$anonfun$apply.apply(SQLExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:93)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch.apply(MicroBatchExecution.scala:535)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:193)
注意你的代码目前的写法,部分
df.selectExpr("CAST(value AS STRING)")
.as[(String)]
确实没有任何影响。话虽如此,您的 Dataframe df
具有 Kafka 源的原始模式,即
Column
Type
key
binary
value
binary
topic
string
partition
int
offset
long
timestamp
timestamp
timestampType
int
headers (optional)
array
Spark + Kafka 集成中提供了更多详细信息guide。如您所见,key
和 value
这两列都是二进制格式,2.4.7 和 format("text").
不支持这种格式
您需要做的是删除 df.selectExpr("CAST(value AS STRING)").as[(String)]
中的 df
。此外,不要忘记在代码末尾调用 ds.awaitTermination()
。
只有 Spark 3.0 及更高版本添加了对二进制文件作为数据源的支持,请参阅 Binary File Data Source
我有一个 Spark Structured Streaming 作业,它从 Kafka 主题读取并将其写入 S3 存储桶。我 运行 使用 AWS Spark 2.4.7。基本例外是org.apache.spark.sql.AnalysisException:文本数据源不支持二进制数据类型。;
val df = spark
.readStream
.format("kafka")
.option(broker)
.option("subscribe", topic)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.keystore.location",keystore)
.option("kafka.ssl.keystore.password",pw)
.option("kafka.ssl.key.password",pw)
.option("startingOffsets","earliest")
.option("failOnDataLoss","false")
.load()
df.selectExpr("CAST(value AS STRING)")
.as[(String)]
// Write data from a DataFrame to S3 using a topic specified in the data
val ds = df
.writeStream
.format("text")
.option("checkpointLocation","/tmp/checkpoint")
.option("path", output_path)
.outputMode("append")
.start()
完全异常:
21/01/19 14:39:23 ERROR MicroBatchExecution: Query [id = 72525f8b-8fd0-4bd6-949d-7d8b0678271c, runId = 2e88f59a-13af-4f01-9836-454ff9039fc2] terminated with error
org.apache.spark.sql.AnalysisException: Text data source does not support binary data type.;
at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema.apply(DataSourceUtils.scala:83)
at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema.apply(DataSourceUtils.scala:81)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:81)
at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifyWriteSchema(DataSourceUtils.scala:36)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:100)
at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:131)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$$anonfun$apply.apply(MicroBatchExecution.scala:537)
at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery(SQLExecution.scala:83)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$$anonfun$apply.apply(SQLExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:93)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch.apply(MicroBatchExecution.scala:535)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:193)
注意你的代码目前的写法,部分
df.selectExpr("CAST(value AS STRING)")
.as[(String)]
确实没有任何影响。话虽如此,您的 Dataframe df
具有 Kafka 源的原始模式,即
Column | Type |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | timestamp |
timestampType | int |
headers (optional) | array |
Spark + Kafka 集成中提供了更多详细信息guide。如您所见,key
和 value
这两列都是二进制格式,2.4.7 和 format("text").
您需要做的是删除 df.selectExpr("CAST(value AS STRING)").as[(String)]
中的 df
。此外,不要忘记在代码末尾调用 ds.awaitTermination()
。
只有 Spark 3.0 及更高版本添加了对二进制文件作为数据源的支持,请参阅 Binary File Data Source