spark.sql.AnalysisException: 文本数据源不支持二进制数据类型

spark.sql.AnalysisException: Text data source does not support binary data type

我有一个 Spark Structured Streaming 作业,它从 Kafka 主题读取并将其写入 S3 存储桶。我 运行 使用 AWS Spark 2.4.7。基本例外是org.apache.spark.sql.AnalysisException:文本数据源不支持二进制数据类型。;

val df = spark
  .readStream 
  .format("kafka") 
  .option(broker) 
  .option("subscribe", topic) 
  .option("kafka.security.protocol", "SSL") 
  .option("kafka.ssl.keystore.location",keystore) 
  .option("kafka.ssl.keystore.password",pw) 
  .option("kafka.ssl.key.password",pw) 
  .option("startingOffsets","earliest")
  .option("failOnDataLoss","false")
  .load()
df.selectExpr("CAST(value AS STRING)")
  .as[(String)]
    
    
// Write data from a DataFrame to S3 using a topic specified in the data
val ds = df
  .writeStream 
  .format("text")  
  .option("checkpointLocation","/tmp/checkpoint") 
  .option("path", output_path) 
  .outputMode("append") 
  .start()

完全异常:

21/01/19 14:39:23 ERROR MicroBatchExecution: Query [id = 72525f8b-8fd0-4bd6-949d-7d8b0678271c, runId = 2e88f59a-13af-4f01-9836-454ff9039fc2] terminated with error
org.apache.spark.sql.AnalysisException: Text data source does not support binary data type.;
        at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema.apply(DataSourceUtils.scala:83)
        at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema.apply(DataSourceUtils.scala:81)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
        at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:81)
        at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifyWriteSchema(DataSourceUtils.scala:36)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:100)
        at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:131)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$$anonfun$apply.apply(MicroBatchExecution.scala:537)
        at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery(SQLExecution.scala:83)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$$anonfun$apply.apply(SQLExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
        at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:93)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch.apply(MicroBatchExecution.scala:535)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply$mcV$sp(MicroBatchExecution.scala:198)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream.apply$mcZ$sp(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:193)

注意你的代码目前的写法,部分

df.selectExpr("CAST(value AS STRING)")
  .as[(String)]

确实没有任何影响。话虽如此,您的 Dataframe df 具有 Kafka 源的原始模式,即

Column Type
key binary
value binary
topic string
partition int
offset long
timestamp timestamp
timestampType int
headers (optional) array

Spark + Kafka 集成中提供了更多详细信息guide。如您所见,keyvalue 这两列都是二进制格式,2.4.7 和 format("text").

不支持这种格式

您需要做的是删除 df.selectExpr("CAST(value AS STRING)").as[(String)] 中的 df。此外,不要忘记在代码末尾调用 ds.awaitTermination()

只有 Spark 3.0 及更高版本添加了对二进制文件作为数据源的支持,请参阅 Binary File Data Source