Spark 认为我正在从 Parquet 文件中读取 DataFrame

Spark thinks I'm reading DataFrame from a Parquet file

Spark 2.x 在这里。我的代码:

val query = "SELECT * FROM some_big_table WHERE something > 1"

val df : DataFrame = spark.read
  .option("url",
    s"""jdbc:postgresql://${redshiftInfo.hostnameAndPort}/${redshiftInfo.database}?currentSchema=${redshiftInfo.schema}"""
  )
  .option("user", redshiftInfo.username)
  .option("password", redshiftInfo.password)
  .option("dbtable", query)
  .load()

产生:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun.apply(DataSource.scala:183)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun.apply(DataSource.scala:183)
    at scala.Option.getOrElse(Option.scala:121)

我没有从 Parquet 文件中读取 任何内容,我正在从 Redshift (RDBMS) table 中读取。那么为什么我会收到此错误?

如果您使用通用 load 函数,您还应该包括格式:

// Query has to be subquery 
val query = "(SELECT * FROM some_big_table WHERE something > 1) as tmp"

...
  .format("jdbc")
  .option("dbtable", query)
  .load()

否则 Spark 假定您使用默认格式,在没有特定配置的情况下,它是 Parquet。

也没有什么强迫你使用 dbtable

spark.read.jdbc(
  s"jdbc:postgresql://${hostnameAndPort}/${database}?currentSchema=${schema}",
  query, 
  props 
)

变体也有效。

当然,对于如此简单的查询,所有这些都不需要:

spark.read.jdbc(
  s"jdbc:postgresql://${hostnameAndPort}/${database}?currentSchema=${schema}",
  some_big_table, 
  props 
).where("something > 1")

将以相同的方式工作,如果您想提高性能,您应该考虑并行查询

  • How to improve performance for slow Spark jobs using DataFrame and JDBC connection?

甚至更好,尝试 Redshift connector