合并具有相同 ID 但不同序列号的行
Combining rows with same ID but different sequence number
我有一个大型(1500 万行)csv 文件,其中包含一些 ID 的多个期限(STRT_DTE 和 EXP_DATE)。我还有一个名为 TRM_NBR 的列,它可以取值 1、2、3 等。顺序 TRM_NBR 表示连续的期限(即具有相同 ID 和 TRM_NBR 的三行1,2,3 是一个连续的句点,而 1,2,1 是两个句点)。
我需要减少 csv 文件,以便每个连续的期限(和 ID)有一行。请参见下面的示例。具体来说,我需要输出序列中 first term 的 ID,STRT_DTE,以及 last term[=] 的 EXP_DTE 27=] 在序列中。 (保留 TRM_NBR 是可选的。)
原始数据:
ID;TRM_NBR;STRT_DTE;EXP_DTE
000020000007;1;2017-08-22 00:00:00.000;2016-09-20 00:00:00.000
000020000009;1;2015-07-23 00:00:00.000;2015-03-24 00:00:00.000
000020000017;1;2014-10-02 00:00:00.000;2014-10-02 00:00:00.000
000020000063;1;2018-11-19 00:00:00.000;2018-11-19 00:00:00.000
000020000063;2;2020-11-19 00:00:00.000;2020-11-19 00:00:00.000
000020000356;1;2020-06-23 00:00:00.000;2020-06-23 00:00:00.000
000020000356;2;2021-05-20 00:00:00.000;2021-05-20 00:00:00.000
000020000356;3;2022-04-21 00:00:00.000;2021-10-21 00:00:00.000
000020000356;2;2014-07-22 00:00:00.000;2014-09-09 00:00:00.000
000020000356;3;2015-07-21 00:00:00.000;2015-07-21 00:00:00.000
000020000356;4;2016-07-12 00:00:00.000;2016-07-12 00:00:00.000
000020000356;5;2017-07-11 00:00:00.000;2017-07-11 00:00:00.000
期望的输出:
ID;TRM_NBR;STRT_DTE;EXP_DTE
000020000007;1;2017-08-22 00:00:00.000;2016-09-20 00:00:00.000
000020000009;1;2015-07-23 00:00:00.000;2015-03-24 00:00:00.000
000020000017;1;2014-10-02 00:00:00.000;2014-10-02 00:00:00.000
000020000063;1;2018-11-19 00:00:00.000;2020-11-19 00:00:00.000
000020000356;1;2020-06-23 00:00:00.000;2021-10-21 00:00:00.000
000020000356;2;2014-07-22 00:00:00.000;2017-07-11 00:00:00.000
连续的句点可以是从 1 到无穷大,可以从 1 开始,也可以不以 1 开始, 但总是以 1 递增(如果有多于一行)。输出文件可以包含相同 ID 的多行(但序列不同)。您可以假设文件的顺序正确。
我可以使用Python、MySQL或Mac终端工具来解决。
MySQL的解决方案。
- 为导入到的源数据创建table:
CREATE TEMPORARY TABLE tmp (
rowno INT AUTO_INCREMENT PRIMARY KEY,
id CHAR(12),
trm_nbr TINYINT,
strt_dte DATETIME(3),
exp_dte DATETIME(3)
) ENGINE = Memory;
- 将您的源数据导入其中:
LOAD DATA INFILE 'X:/folder/filename.CSV'
INTO TABLE tmp
COLUMNS TERMINATED BY ';'
LINES TERMINATED BY '\r\n'
IGNORE 1 LINES
(id, trm_nbr, strt_dte, exp_dte);
- 处理数据并将其保存到输出文件:
WITH
cte1 AS ( SELECT *,
LAG(id) OVER (ORDER BY rowno) lag_id,
1 + LAG(trm_nbr) OVER (ORDER BY rowno) lag_trm_nbr
FROM tmp ),
cte2 AS ( SELECT *,
SUM(CASE WHEN (id, trm_nbr) = (lag_id, lag_trm_nbr)
THEN 0
ELSE 1
END) OVER (ORDER BY rowno) grp_no
FROM cte1 )
SELECT 'ID', 'TRM_NBR', 'STRT_DTE', 'EXP_DTE'
UNION ALL
SELECT MAX(id) id,
ROW_NUMBER() OVER (PARTITION BY MAX(id) ORDER BY grp_no) trm_nbr,
MIN(strt_dte) strt_dte,
MAX(exp_dte) exp_dte
FROM cte2
GROUP BY grp_no
INTO OUTFILE 'X:/folder/new_filename.CSV'
COLUMNS TERMINATED BY ';'
LINES TERMINATED BY '\r\n';
您当前的 MySQL 帐户必须具有 FILE 权限。
secure_file_priv
会话变量值必须是空字符串或某些 drive:path
(通过 SELECT @@secure_file_priv;
查询检查)。如果它不是空字符串,则查询中的 X:/folder
必须等于此值(在 Windows 系统上,路径值中的反斜杠必须加倍或替换为直接斜杠)。
如果secure_file_priv
会话变量值为NULL,则您根本无法使用此方法。在这种情况下,请尝试让服务器管理员设置此变量并允许与文件系统进行交互。
查询使用 Windows 样式 CSV 的设置,对于 Unix 样式文件使用 LINES TERMINATED BY '\n'
.
如果您的源文件太大(超过 ~100 MB 或超过 max_allowed_packet
会话变量值),则从 table 定义中删除 ENGINE = Memory
。
您不需要删除临时文件 table - 它会在您关闭连接时自动删除。但是你仍然可以明确地放弃它。
Python 基于标准 itertools
和第 3 方 convtools 库的替代方案:
import itertools
from convtools import conversion as c
from convtools.contrib.tables import Table
# read rows into iterable of dicts
rows = Table.from_csv(
"input.csv", header=True, dialect=Table.csv_dialect(delimiter=";")
).into_iter_rows(dict)
# group into sequences of IDs
key_to_rows = itertools.groupby(rows, key=lambda r: r["ID"])
# prepare the converter, which takes groups of rows from the
# previous step and reduces each group to one row.
# This is where code generation happens, consider storing the
# resulting converter somewhere for further reuse.
converter = c.iter(
c.item(1).pipe(
c.aggregate(
{
"ID": c.ReduceFuncs.First(c.item("ID")),
"LAST_TRM_NBR": c.ReduceFuncs.Last(c.item("TRM_NBR")),
"STRT_DTE": c.ReduceFuncs.First(c.item("STRT_DTE")),
"EXP_DTE": c.ReduceFuncs.Last(c.item("EXP_DTE")),
}
)
)
).gen_converter()
# process iterable from itertools
processed_rows = converter(key_to_rows)
# output processed rows to "out.csv" (consider passing same dialect
# as above to .into_csv method
Table.from_rows(processed_rows).into_csv("out.csv")
输出为:
ID,LAST_TRM_NBR,STRT_DTE,EXP_DTE
000020000007,1,2017-08-22 00:00:00.000,2016-09-20 00:00:00.000
000020000009,1,2015-07-23 00:00:00.000,2015-03-24 00:00:00.000
000020000017,1,2014-10-02 00:00:00.000,2014-10-02 00:00:00.000
000020000063,2,2018-11-19 00:00:00.000,2020-11-19 00:00:00.000
000020000356,5,2020-06-23 00:00:00.000,2017-07-11 00:00:00.000
我有一个大型(1500 万行)csv 文件,其中包含一些 ID 的多个期限(STRT_DTE 和 EXP_DATE)。我还有一个名为 TRM_NBR 的列,它可以取值 1、2、3 等。顺序 TRM_NBR 表示连续的期限(即具有相同 ID 和 TRM_NBR 的三行1,2,3 是一个连续的句点,而 1,2,1 是两个句点)。
我需要减少 csv 文件,以便每个连续的期限(和 ID)有一行。请参见下面的示例。具体来说,我需要输出序列中 first term 的 ID,STRT_DTE,以及 last term[=] 的 EXP_DTE 27=] 在序列中。 (保留 TRM_NBR 是可选的。)
原始数据:
ID;TRM_NBR;STRT_DTE;EXP_DTE
000020000007;1;2017-08-22 00:00:00.000;2016-09-20 00:00:00.000
000020000009;1;2015-07-23 00:00:00.000;2015-03-24 00:00:00.000
000020000017;1;2014-10-02 00:00:00.000;2014-10-02 00:00:00.000
000020000063;1;2018-11-19 00:00:00.000;2018-11-19 00:00:00.000
000020000063;2;2020-11-19 00:00:00.000;2020-11-19 00:00:00.000
000020000356;1;2020-06-23 00:00:00.000;2020-06-23 00:00:00.000
000020000356;2;2021-05-20 00:00:00.000;2021-05-20 00:00:00.000
000020000356;3;2022-04-21 00:00:00.000;2021-10-21 00:00:00.000
000020000356;2;2014-07-22 00:00:00.000;2014-09-09 00:00:00.000
000020000356;3;2015-07-21 00:00:00.000;2015-07-21 00:00:00.000
000020000356;4;2016-07-12 00:00:00.000;2016-07-12 00:00:00.000
000020000356;5;2017-07-11 00:00:00.000;2017-07-11 00:00:00.000
期望的输出:
ID;TRM_NBR;STRT_DTE;EXP_DTE
000020000007;1;2017-08-22 00:00:00.000;2016-09-20 00:00:00.000
000020000009;1;2015-07-23 00:00:00.000;2015-03-24 00:00:00.000
000020000017;1;2014-10-02 00:00:00.000;2014-10-02 00:00:00.000
000020000063;1;2018-11-19 00:00:00.000;2020-11-19 00:00:00.000
000020000356;1;2020-06-23 00:00:00.000;2021-10-21 00:00:00.000
000020000356;2;2014-07-22 00:00:00.000;2017-07-11 00:00:00.000
连续的句点可以是从 1 到无穷大,可以从 1 开始,也可以不以 1 开始, 但总是以 1 递增(如果有多于一行)。输出文件可以包含相同 ID 的多行(但序列不同)。您可以假设文件的顺序正确。
我可以使用Python、MySQL或Mac终端工具来解决。
MySQL的解决方案。
- 为导入到的源数据创建table:
CREATE TEMPORARY TABLE tmp (
rowno INT AUTO_INCREMENT PRIMARY KEY,
id CHAR(12),
trm_nbr TINYINT,
strt_dte DATETIME(3),
exp_dte DATETIME(3)
) ENGINE = Memory;
- 将您的源数据导入其中:
LOAD DATA INFILE 'X:/folder/filename.CSV'
INTO TABLE tmp
COLUMNS TERMINATED BY ';'
LINES TERMINATED BY '\r\n'
IGNORE 1 LINES
(id, trm_nbr, strt_dte, exp_dte);
- 处理数据并将其保存到输出文件:
WITH
cte1 AS ( SELECT *,
LAG(id) OVER (ORDER BY rowno) lag_id,
1 + LAG(trm_nbr) OVER (ORDER BY rowno) lag_trm_nbr
FROM tmp ),
cte2 AS ( SELECT *,
SUM(CASE WHEN (id, trm_nbr) = (lag_id, lag_trm_nbr)
THEN 0
ELSE 1
END) OVER (ORDER BY rowno) grp_no
FROM cte1 )
SELECT 'ID', 'TRM_NBR', 'STRT_DTE', 'EXP_DTE'
UNION ALL
SELECT MAX(id) id,
ROW_NUMBER() OVER (PARTITION BY MAX(id) ORDER BY grp_no) trm_nbr,
MIN(strt_dte) strt_dte,
MAX(exp_dte) exp_dte
FROM cte2
GROUP BY grp_no
INTO OUTFILE 'X:/folder/new_filename.CSV'
COLUMNS TERMINATED BY ';'
LINES TERMINATED BY '\r\n';
您当前的 MySQL 帐户必须具有 FILE 权限。
secure_file_priv
会话变量值必须是空字符串或某些 drive:path
(通过 SELECT @@secure_file_priv;
查询检查)。如果它不是空字符串,则查询中的 X:/folder
必须等于此值(在 Windows 系统上,路径值中的反斜杠必须加倍或替换为直接斜杠)。
如果secure_file_priv
会话变量值为NULL,则您根本无法使用此方法。在这种情况下,请尝试让服务器管理员设置此变量并允许与文件系统进行交互。
查询使用 Windows 样式 CSV 的设置,对于 Unix 样式文件使用 LINES TERMINATED BY '\n'
.
如果您的源文件太大(超过 ~100 MB 或超过 max_allowed_packet
会话变量值),则从 table 定义中删除 ENGINE = Memory
。
您不需要删除临时文件 table - 它会在您关闭连接时自动删除。但是你仍然可以明确地放弃它。
Python 基于标准 itertools
和第 3 方 convtools 库的替代方案:
import itertools
from convtools import conversion as c
from convtools.contrib.tables import Table
# read rows into iterable of dicts
rows = Table.from_csv(
"input.csv", header=True, dialect=Table.csv_dialect(delimiter=";")
).into_iter_rows(dict)
# group into sequences of IDs
key_to_rows = itertools.groupby(rows, key=lambda r: r["ID"])
# prepare the converter, which takes groups of rows from the
# previous step and reduces each group to one row.
# This is where code generation happens, consider storing the
# resulting converter somewhere for further reuse.
converter = c.iter(
c.item(1).pipe(
c.aggregate(
{
"ID": c.ReduceFuncs.First(c.item("ID")),
"LAST_TRM_NBR": c.ReduceFuncs.Last(c.item("TRM_NBR")),
"STRT_DTE": c.ReduceFuncs.First(c.item("STRT_DTE")),
"EXP_DTE": c.ReduceFuncs.Last(c.item("EXP_DTE")),
}
)
)
).gen_converter()
# process iterable from itertools
processed_rows = converter(key_to_rows)
# output processed rows to "out.csv" (consider passing same dialect
# as above to .into_csv method
Table.from_rows(processed_rows).into_csv("out.csv")
输出为:
ID,LAST_TRM_NBR,STRT_DTE,EXP_DTE
000020000007,1,2017-08-22 00:00:00.000,2016-09-20 00:00:00.000
000020000009,1,2015-07-23 00:00:00.000,2015-03-24 00:00:00.000
000020000017,1,2014-10-02 00:00:00.000,2014-10-02 00:00:00.000
000020000063,2,2018-11-19 00:00:00.000,2020-11-19 00:00:00.000
000020000356,5,2020-06-23 00:00:00.000,2017-07-11 00:00:00.000