将镶木地板文件存储到 PostgreSQL 数据库中

Storing parquet file into PostgreSQL Database

我想将 parquet 文件写入 PostgreSQL。我正在使用 Spark 并使用 Spark Dataframe 的 write.jdbc 函数编写文件。对于 long、decimal 或 text 等 parquet 列类型,一切正常。问题在于像 Map 这样的复杂类型。我想在我的 PostgreSQL 中将地图存储为 json。因为我知道 PostgreSQL 可以自动将文本数据类型转换为 json(使用强制转换操作),所以我将映射转储为 json 字符串。

但是 spark 程序抱怨说我们试图将 "character varying" 数据类型插入到 "json" 类型的列中。这清楚地表明 PostgreSQL 不会自动将 "character varying" 转换为 JSON。

我继续登录我的数据库并手动尝试将 JSON 字符串插入到 table 的 JSON 数据类型列中并且成功了。

我的问题是为什么我的 spark 程序抱怨转换操作?

我正在使用 Spark 版本 1.6.1、PostgreSQL 4.3 和 JDBC 42.1.1

这是代码片段

url = "jdbc:postgresql://host_name:host_port/db_name"
data_frame.write.jdbc(url, table_name, properties={"user": user, "password": password})

错误堆栈跟踪:

Hint: You will need to rewrite or cast the expression.
  Position: 66  Call getNextException to see other errors in the batch.
    at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
    at org.postgresql.core.ResultHandlerDelegate.handleError(ResultHandlerDelegate.java:50)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2190)
    at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1325)
    at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1350)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:458)
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:791)
    at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1547)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:215)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable.apply(JdbcUtils.scala:277)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable.apply(JdbcUtils.scala:276)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:920)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: org.postgresql.util.PSQLException: ERROR: column "value" is of type json but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 66
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2476)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2189)
    ... 18 more

现在已经很晚了,但这里是所有迷失灵魂的答案。

您需要将 "stringtype" 参数传递给 JDBC。它指定绑定通过 setString() 设置的 PreparedStatement 参数时使用的类型。默认情况下,它是 varchar,它强制该参数是 varchar 并防止任何转换操作(在我的例子中 JSON 字符串到 JSON)。如果我们指定,stringtype=="unspecified",那么它将留给数据库来决定参数是哪种类型。就我而言,它有助于 Postgres 轻松地将字符串转换为 JSON 的方式。

文档:https://jdbc.postgresql.org/documentation/head/connect.html

您在使用 aws 服务吗?如果是,则使用创建 table 的 aws glue 抓取您的文件。创建一个将此数据(table)作为输入目录和输出 select aws rds jdbc 连接并选择所需数据库的粘合作业。 运行 作业和您的 paraquet 文件数据将加载到 postgres table。