使用 JDBC 使用自定义类型数组将 CSV 复制到 Postgres
CSV copy to Postgres with array of custom type using JDBC
我在我的数据库中定义了一个自定义类型
CREATE TYPE address AS (ip inet, port int);
以及在数组中使用此类型的 table:
CREATE TABLE my_table (
addresses address[] NULL
)
我有一个包含以下内容的示例 CSV 文件
{(10.10.10.1,80),(10.10.10.2,443)}
{(10.10.10.3,8080),(10.10.10.4,4040)}
然后我使用以下代码片段执行复制:
Class.forName("org.postgresql.Driver");
String input = loadCsvFromFile();
Reader reader = new StringReader(input);
Connection connection = DriverManager.getConnection(
"jdbc:postgresql://db_host:5432/db_name", "user",
"password");
CopyManager copyManager = connection.unwrap(PGConnection.class).getCopyAPI();
String copyCommand = "COPY my_table (addresses) " +
"FROM STDIN WITH (" +
"DELIMITER '\t', " +
"FORMAT csv, " +
"NULL '\N', " +
"ESCAPE '\"', " +
"QUOTE '\"')";
copyManager.copyIn(copyCommand, reader);
执行该程序会产生以下异常:
Exception in thread "main" org.postgresql.util.PSQLException: ERROR: malformed record literal: "(10.10.10.1"
Detail: Unexpected end of input.
Where: COPY only_address, line 1, column addresses: "{(10.10.10.1,80),(10.10.10.2,443)}"
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2422)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1114)
at org.postgresql.core.v3.QueryExecutorImpl.endCopy(QueryExecutorImpl.java:963)
at org.postgresql.core.v3.CopyInImpl.endCopy(CopyInImpl.java:43)
at org.postgresql.copy.CopyManager.copyIn(CopyManager.java:185)
at org.postgresql.copy.CopyManager.copyIn(CopyManager.java:160)
我尝试了输入中括号的不同组合,但似乎无法使 COPY 工作。我有什么地方可能出错的想法吗?
请参阅 https://git.mikael.io/mikaelhg/pg-object-csv-copy-poc/ 以了解具有 JUnit 测试的项目,它可以满足您的需求。
基本上,您希望能够将逗号用于两件事:分隔数组项和分隔类型字段,但您不希望 CSV 解析将逗号解释为字段描述符。
所以
- 您想告诉 CSV 解析器将整行视为一个字符串,一个字段,您可以通过将其括在单引号中并告诉 CSV 解析器这一点来实现,并且
- 您希望 PG 字段解析器考虑将每个数组项类型实例括在双引号中。
代码:
copyManager.copyIn("COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''", reader);
DML 示例 1:
COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''
CSV 示例 1:
'{"(10.0.0.1,1)","(10.0.0.2,2)"}'
'{"(10.10.10.1,80)","(10.10.10.2,443)"}'
'{"(10.10.10.3,8080)","(10.10.10.4,4040)"}'
DML 示例 2,转义双引号:
COPY my_table (addresses) FROM STDIN WITH CSV
CSV 示例 2,转义双引号:
"{""(10.0.0.1,1)"",""(10.0.0.2,2)""}"
"{""(10.10.10.1,80)"",""(10.10.10.2,443)""}"
"{""(10.10.10.3,8080)"",""(10.10.10.4,4040)""}"
完整的 JUnit 测试 class:
package io.mikael.poc;
import com.google.common.io.CharStreams;
import org.junit.*;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
import org.testcontainers.containers.PostgreSQLContainer;
import java.io.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import static java.nio.charset.StandardCharsets.UTF_8;
public class CopyTest {
private Reader reader;
private Connection connection;
private CopyManager copyManager;
private static final String CREATE_TYPE = "CREATE TYPE address AS (ip inet, port int)";
private static final String CREATE_TABLE = "CREATE TABLE my_table (addresses address[] NULL)";
private String loadCsvFromFile(final String fileName) throws IOException {
try (InputStream is = getClass().getResourceAsStream(fileName)) {
return CharStreams.toString(new InputStreamReader(is, UTF_8));
}
}
@ClassRule
public static PostgreSQLContainer db = new PostgreSQLContainer("postgres:10-alpine");
@BeforeClass
public static void beforeClass() throws Exception {
Class.forName("org.postgresql.Driver");
}
@Before
public void before() throws Exception {
String input = loadCsvFromFile("/data_01.csv");
reader = new StringReader(input);
connection = DriverManager.getConnection(db.getJdbcUrl(), db.getUsername(), db.getPassword());
copyManager = connection.unwrap(PGConnection.class).getCopyAPI();
connection.setAutoCommit(false);
connection.beginRequest();
connection.prepareCall(CREATE_TYPE).execute();
connection.prepareCall(CREATE_TABLE).execute();
}
@After
public void after() throws Exception {
connection.rollback();
}
@Test
public void copyTest01() throws Exception {
copyManager.copyIn("COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''", reader);
final StringWriter writer = new StringWriter();
copyManager.copyOut("COPY my_table TO STDOUT WITH CSV", writer);
System.out.printf("roundtrip:%n%s%n", writer.toString());
final ResultSet rs = connection.prepareStatement(
"SELECT array_to_json(array_agg(t)) FROM (SELECT addresses FROM my_table) t")
.executeQuery();
rs.next();
System.out.printf("json:%n%s%n", rs.getString(1));
}
}
测试输出:
roundtrip:
"{""(10.0.0.1,1)"",""(10.0.0.2,2)""}"
"{""(10.10.10.1,80)"",""(10.10.10.2,443)""}"
"{""(10.10.10.3,8080)"",""(10.10.10.4,4040)""}"
json:
[{"addresses":[{"ip":"10.0.0.1","port":1},{"ip":"10.0.0.2","port":2}]},{"addresses":[{"ip":"10.10.10.1","port":80},{"ip":"10.10.10.2","port":443}]},{"addresses":[{"ip":"10.10.10.3","port":8080},{"ip":"10.10.10.4","port":4040}]}]
在CSV格式中,当您指定一个分隔符时,您不能将其用作数据中的字符,除非您将其转义!
使用逗号作为分隔符的 csv 文件示例
一条正确记录:data1, data2
解析结果:[0] => data1 [1] => data2
一个不正确的:data,1, data2
解析结果:[0] => data [1] => 1 [2] => data2
最后,您不需要将文件作为 csv 文件加载,而是作为简单文件加载,因此请将您的方法 loadCsvFromFile();
替换为
public String loadRecordsFromFile(File file) {
LineIterator it = FileUtils.lineIterator(file, "UTF-8");
StringBuilder sb = new StringBuilder();
try {
while (it.hasNext()) {
sb.append(it.nextLine()).append(System.nextLine);
}
}
finally {
LineIterator.closeQuietly(iterator);
}
return sb.toString();
}
不要忘记在你的 pom 文件中添加这个依赖项
<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
下载 JAR
1NF
首先,我认为您的 table 设计是错误的,因为它不符合 1NF。每个字段都应该只包含原子属性,但事实并非如此。为什么不 table 像:
CREATE TABLE my_table (
id,
ip inet,
port int
)
其中 id
是您在源文件中的行号,而 ip
/port
是该行中的地址之一?
示例数据:
id | ip | port
-----------------------
1 | 10.10.10.1 | 80
1 | 10.10.10.2 | 443
2 | 10.10.10.3 | 8080
2 | 10.10.10.4 | 4040
...
因此,您将能够在单个地址上查询您的数据库(查找所有关联的地址,return 如果两个地址在同一行上则为真,无论您想要什么...)。
加载数据
但我们假设您知道自己在做什么。这里的主要问题是您的输入数据文件采用特殊格式。它可能是一个单列 CSV 文件,但它会是一个非常退化的 CSV 文件。无论如何,您必须先转换这些行,然后再将它们插入数据库。您有两个选择:
- 你读了输入文件的每一行,然后做了一个
INSERT
(这可能需要一段时间);
- 您将输入文件转换为具有预期格式的文本文件并使用
COPY
。
一个一个插入
第一个选项似乎很简单:对于 csv 文件的第一行 {(10.10.10.1,80),(10.10.10.2,443)}
,您必须 运行 查询:
INSERT INTO my_table VALUES (ARRAY[('10.10.10.1',80),('10.10.10.2',443)]::address[], 4)
为此,您只需创建一个新字符串:
String value = row.replaceAll("\{", "ARRAY[")
.replaceAll("\}", "]::address[]")
.replaceAll("\(([0-9.]+),", "''");
String sql = String.format("INSERT INTO my_table VALUES (%s)", value);
并对输入文件的每一行执行查询(或者为了更好的安全性,使用 prepared statement)。
插入 COPY
我会详细说明第二个选项。您必须在 Java 代码中使用:
copyManager.copyIn(sql, from);
其中复制查询是 COPY FROM STDIN
语句,from
是 reader。声明将是:
COPY my_table (addresses) FROM STDIN WITH (FORMAT text);
要向文案管理员提供数据,您需要像这样的数据(注意引号):
{"(10.10.10.1,80)","(10.10.10.2,443)"}
{"(10.10.10.3,8080)","(10.10.10.4,4040)"}
使用临时文件
以正确格式获取数据的更简单方法是创建一个临时文件。您读取输入文件的每一行并将 (
替换为 "(
,将 )
替换为 )"
。将此处理后的行写入临时文件。然后将此文件的 reader 传递给复制管理员。
即时
有两个线程
您可以使用两个线程:
线程 1 读取输入文件,逐行处理并将它们写入 PipedWriter
.
线程 2 将连接到前一个 PipedWriter
的 PipedReader
传递给复制管理器。
主要的困难是同步线程,使线程 2 在线程 1 开始将数据写入 PipedWriter
之前开始读取 PipedReader
。有关示例,请参阅 this project of mine。
有自定义reader
from
reader 可能是类似(原始版本)的实例:
class DataReader extends Reader {
PushbackReader csvFileReader;
private boolean wasParenthese;
public DataReader(Reader csvFileReader) {
this.csvFileReader = new PushbackReader(csvFileReader, 1);
wasParenthese = false;
}
@Override
public void close() throws IOException {
this.csvFileReader.close();
}
@Override
public int read(char[] cbuf, int off, int len) throws IOException {
// rely on read()
for (int i = off; i < off + len; i++) {
int c = this.read();
if (c == -1) {
return i-off > 0 ? i-off : -1;
}
cbuf[i] = (char) c;
}
return len;
}
@Override
public int read() throws IOException {
final int c = this.csvFileReader.read();
if (c == '(' && !this.wasParenthese) {
this.wasParenthese = true;
this.csvFileReader.unread('(');
return '"'; // add " before (
} else {
this.wasParenthese = false;
if (c == ')') {
this.csvFileReader.unread('"');
return ')'; // add " after )
} else {
return c;
}
}
}
}
(这是一个幼稚的版本,因为正确的做法是只覆盖 public int read(char[] cbuf, int off, int len)
。但是您应该随后处理 cbuf
以添加引号并存储推送的额外字符右边:这有点乏味)。
现在,如果 r
是文件的 reader:
{(10.10.10.1,80),(10.10.10.2,443)}
{(10.10.10.3,8080),(10.10.10.4,4040)}
只需使用:
Class.forName("org.postgresql.Driver");
Connection connection = DriverManager
.getConnection("jdbc:postgresql://db_host:5432/db_base", "user", "passwd");
CopyManager copyManager = connection.unwrap(PGConnection.class).getCopyAPI();
copyManager.copyIn("COPY my_table FROM STDIN WITH (FORMAT text)", new DataReader(r));
关于批量加载
如果你正在加载大量数据,不要忘记the basic tips:禁用自动提交,删除索引和约束,并使用TRUNCATE
和ANALYZE
如下:
TRUNCATE my_table;
COPY ...;
ANALYZE my_table;
这将加快加载速度。
我在我的数据库中定义了一个自定义类型
CREATE TYPE address AS (ip inet, port int);
以及在数组中使用此类型的 table:
CREATE TABLE my_table (
addresses address[] NULL
)
我有一个包含以下内容的示例 CSV 文件
{(10.10.10.1,80),(10.10.10.2,443)}
{(10.10.10.3,8080),(10.10.10.4,4040)}
然后我使用以下代码片段执行复制:
Class.forName("org.postgresql.Driver");
String input = loadCsvFromFile();
Reader reader = new StringReader(input);
Connection connection = DriverManager.getConnection(
"jdbc:postgresql://db_host:5432/db_name", "user",
"password");
CopyManager copyManager = connection.unwrap(PGConnection.class).getCopyAPI();
String copyCommand = "COPY my_table (addresses) " +
"FROM STDIN WITH (" +
"DELIMITER '\t', " +
"FORMAT csv, " +
"NULL '\N', " +
"ESCAPE '\"', " +
"QUOTE '\"')";
copyManager.copyIn(copyCommand, reader);
执行该程序会产生以下异常:
Exception in thread "main" org.postgresql.util.PSQLException: ERROR: malformed record literal: "(10.10.10.1"
Detail: Unexpected end of input.
Where: COPY only_address, line 1, column addresses: "{(10.10.10.1,80),(10.10.10.2,443)}"
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2422)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1114)
at org.postgresql.core.v3.QueryExecutorImpl.endCopy(QueryExecutorImpl.java:963)
at org.postgresql.core.v3.CopyInImpl.endCopy(CopyInImpl.java:43)
at org.postgresql.copy.CopyManager.copyIn(CopyManager.java:185)
at org.postgresql.copy.CopyManager.copyIn(CopyManager.java:160)
我尝试了输入中括号的不同组合,但似乎无法使 COPY 工作。我有什么地方可能出错的想法吗?
请参阅 https://git.mikael.io/mikaelhg/pg-object-csv-copy-poc/ 以了解具有 JUnit 测试的项目,它可以满足您的需求。
基本上,您希望能够将逗号用于两件事:分隔数组项和分隔类型字段,但您不希望 CSV 解析将逗号解释为字段描述符。
所以
- 您想告诉 CSV 解析器将整行视为一个字符串,一个字段,您可以通过将其括在单引号中并告诉 CSV 解析器这一点来实现,并且
- 您希望 PG 字段解析器考虑将每个数组项类型实例括在双引号中。
代码:
copyManager.copyIn("COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''", reader);
DML 示例 1:
COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''
CSV 示例 1:
'{"(10.0.0.1,1)","(10.0.0.2,2)"}'
'{"(10.10.10.1,80)","(10.10.10.2,443)"}'
'{"(10.10.10.3,8080)","(10.10.10.4,4040)"}'
DML 示例 2,转义双引号:
COPY my_table (addresses) FROM STDIN WITH CSV
CSV 示例 2,转义双引号:
"{""(10.0.0.1,1)"",""(10.0.0.2,2)""}"
"{""(10.10.10.1,80)"",""(10.10.10.2,443)""}"
"{""(10.10.10.3,8080)"",""(10.10.10.4,4040)""}"
完整的 JUnit 测试 class:
package io.mikael.poc;
import com.google.common.io.CharStreams;
import org.junit.*;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
import org.testcontainers.containers.PostgreSQLContainer;
import java.io.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import static java.nio.charset.StandardCharsets.UTF_8;
public class CopyTest {
private Reader reader;
private Connection connection;
private CopyManager copyManager;
private static final String CREATE_TYPE = "CREATE TYPE address AS (ip inet, port int)";
private static final String CREATE_TABLE = "CREATE TABLE my_table (addresses address[] NULL)";
private String loadCsvFromFile(final String fileName) throws IOException {
try (InputStream is = getClass().getResourceAsStream(fileName)) {
return CharStreams.toString(new InputStreamReader(is, UTF_8));
}
}
@ClassRule
public static PostgreSQLContainer db = new PostgreSQLContainer("postgres:10-alpine");
@BeforeClass
public static void beforeClass() throws Exception {
Class.forName("org.postgresql.Driver");
}
@Before
public void before() throws Exception {
String input = loadCsvFromFile("/data_01.csv");
reader = new StringReader(input);
connection = DriverManager.getConnection(db.getJdbcUrl(), db.getUsername(), db.getPassword());
copyManager = connection.unwrap(PGConnection.class).getCopyAPI();
connection.setAutoCommit(false);
connection.beginRequest();
connection.prepareCall(CREATE_TYPE).execute();
connection.prepareCall(CREATE_TABLE).execute();
}
@After
public void after() throws Exception {
connection.rollback();
}
@Test
public void copyTest01() throws Exception {
copyManager.copyIn("COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''", reader);
final StringWriter writer = new StringWriter();
copyManager.copyOut("COPY my_table TO STDOUT WITH CSV", writer);
System.out.printf("roundtrip:%n%s%n", writer.toString());
final ResultSet rs = connection.prepareStatement(
"SELECT array_to_json(array_agg(t)) FROM (SELECT addresses FROM my_table) t")
.executeQuery();
rs.next();
System.out.printf("json:%n%s%n", rs.getString(1));
}
}
测试输出:
roundtrip:
"{""(10.0.0.1,1)"",""(10.0.0.2,2)""}"
"{""(10.10.10.1,80)"",""(10.10.10.2,443)""}"
"{""(10.10.10.3,8080)"",""(10.10.10.4,4040)""}"
json:
[{"addresses":[{"ip":"10.0.0.1","port":1},{"ip":"10.0.0.2","port":2}]},{"addresses":[{"ip":"10.10.10.1","port":80},{"ip":"10.10.10.2","port":443}]},{"addresses":[{"ip":"10.10.10.3","port":8080},{"ip":"10.10.10.4","port":4040}]}]
在CSV格式中,当您指定一个分隔符时,您不能将其用作数据中的字符,除非您将其转义!
使用逗号作为分隔符的 csv 文件示例
一条正确记录:data1, data2
解析结果:[0] => data1 [1] => data2
一个不正确的:data,1, data2
解析结果:[0] => data [1] => 1 [2] => data2
最后,您不需要将文件作为 csv 文件加载,而是作为简单文件加载,因此请将您的方法 loadCsvFromFile();
替换为
public String loadRecordsFromFile(File file) {
LineIterator it = FileUtils.lineIterator(file, "UTF-8");
StringBuilder sb = new StringBuilder();
try {
while (it.hasNext()) {
sb.append(it.nextLine()).append(System.nextLine);
}
}
finally {
LineIterator.closeQuietly(iterator);
}
return sb.toString();
}
不要忘记在你的 pom 文件中添加这个依赖项
<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
下载 JAR
1NF
首先,我认为您的 table 设计是错误的,因为它不符合 1NF。每个字段都应该只包含原子属性,但事实并非如此。为什么不 table 像:
CREATE TABLE my_table (
id,
ip inet,
port int
)
其中 id
是您在源文件中的行号,而 ip
/port
是该行中的地址之一?
示例数据:
id | ip | port
-----------------------
1 | 10.10.10.1 | 80
1 | 10.10.10.2 | 443
2 | 10.10.10.3 | 8080
2 | 10.10.10.4 | 4040
...
因此,您将能够在单个地址上查询您的数据库(查找所有关联的地址,return 如果两个地址在同一行上则为真,无论您想要什么...)。
加载数据
但我们假设您知道自己在做什么。这里的主要问题是您的输入数据文件采用特殊格式。它可能是一个单列 CSV 文件,但它会是一个非常退化的 CSV 文件。无论如何,您必须先转换这些行,然后再将它们插入数据库。您有两个选择:
- 你读了输入文件的每一行,然后做了一个
INSERT
(这可能需要一段时间); - 您将输入文件转换为具有预期格式的文本文件并使用
COPY
。
一个一个插入
第一个选项似乎很简单:对于 csv 文件的第一行 {(10.10.10.1,80),(10.10.10.2,443)}
,您必须 运行 查询:
INSERT INTO my_table VALUES (ARRAY[('10.10.10.1',80),('10.10.10.2',443)]::address[], 4)
为此,您只需创建一个新字符串:
String value = row.replaceAll("\{", "ARRAY[")
.replaceAll("\}", "]::address[]")
.replaceAll("\(([0-9.]+),", "''");
String sql = String.format("INSERT INTO my_table VALUES (%s)", value);
并对输入文件的每一行执行查询(或者为了更好的安全性,使用 prepared statement)。
插入 COPY
我会详细说明第二个选项。您必须在 Java 代码中使用:
copyManager.copyIn(sql, from);
其中复制查询是 COPY FROM STDIN
语句,from
是 reader。声明将是:
COPY my_table (addresses) FROM STDIN WITH (FORMAT text);
要向文案管理员提供数据,您需要像这样的数据(注意引号):
{"(10.10.10.1,80)","(10.10.10.2,443)"}
{"(10.10.10.3,8080)","(10.10.10.4,4040)"}
使用临时文件
以正确格式获取数据的更简单方法是创建一个临时文件。您读取输入文件的每一行并将 (
替换为 "(
,将 )
替换为 )"
。将此处理后的行写入临时文件。然后将此文件的 reader 传递给复制管理员。
即时
有两个线程 您可以使用两个线程:
线程 1 读取输入文件,逐行处理并将它们写入
PipedWriter
.线程 2 将连接到前一个
PipedWriter
的PipedReader
传递给复制管理器。
主要的困难是同步线程,使线程 2 在线程 1 开始将数据写入 PipedWriter
之前开始读取 PipedReader
。有关示例,请参阅 this project of mine。
有自定义reader
from
reader 可能是类似(原始版本)的实例:
class DataReader extends Reader {
PushbackReader csvFileReader;
private boolean wasParenthese;
public DataReader(Reader csvFileReader) {
this.csvFileReader = new PushbackReader(csvFileReader, 1);
wasParenthese = false;
}
@Override
public void close() throws IOException {
this.csvFileReader.close();
}
@Override
public int read(char[] cbuf, int off, int len) throws IOException {
// rely on read()
for (int i = off; i < off + len; i++) {
int c = this.read();
if (c == -1) {
return i-off > 0 ? i-off : -1;
}
cbuf[i] = (char) c;
}
return len;
}
@Override
public int read() throws IOException {
final int c = this.csvFileReader.read();
if (c == '(' && !this.wasParenthese) {
this.wasParenthese = true;
this.csvFileReader.unread('(');
return '"'; // add " before (
} else {
this.wasParenthese = false;
if (c == ')') {
this.csvFileReader.unread('"');
return ')'; // add " after )
} else {
return c;
}
}
}
}
(这是一个幼稚的版本,因为正确的做法是只覆盖 public int read(char[] cbuf, int off, int len)
。但是您应该随后处理 cbuf
以添加引号并存储推送的额外字符右边:这有点乏味)。
现在,如果 r
是文件的 reader:
{(10.10.10.1,80),(10.10.10.2,443)}
{(10.10.10.3,8080),(10.10.10.4,4040)}
只需使用:
Class.forName("org.postgresql.Driver");
Connection connection = DriverManager
.getConnection("jdbc:postgresql://db_host:5432/db_base", "user", "passwd");
CopyManager copyManager = connection.unwrap(PGConnection.class).getCopyAPI();
copyManager.copyIn("COPY my_table FROM STDIN WITH (FORMAT text)", new DataReader(r));
关于批量加载
如果你正在加载大量数据,不要忘记the basic tips:禁用自动提交,删除索引和约束,并使用TRUNCATE
和ANALYZE
如下:
TRUNCATE my_table;
COPY ...;
ANALYZE my_table;
这将加快加载速度。