使用内部联接合并 2 个大型 csv 文件
Merge 2 large csv files using inner join
我需要非常了解 java 和内存问题的人的建议。我有一个很大的 CSV 文件(每个大约 500mb),我需要仅使用 64mb 的 xmx 将这些文件合并为一个文件。我试过用不同的方法来做,但没有任何效果——总是出现内存异常。我应该怎么做才能让它正常工作?
任务是:
开发一个简单的实现,以合理有效的方式连接两个输入表,并在需要时将两个表存储在 RAM 中。
我的代码可以运行,但它占用大量内存,因此不能容纳 64mb。
public class ImprovedInnerJoin {
public static void main(String[] args) throws IOException {
RandomAccessFile firstFile = new RandomAccessFile("input_A.csv", "r");
FileChannel firstChannel = firstFile.getChannel();
RandomAccessFile secondFile = new RandomAccessFile("input_B.csv", "r");
FileChannel secondChannel = secondFile.getChannel();
RandomAccessFile resultFile = new RandomAccessFile("result2.csv", "rw");
FileChannel resultChannel = resultFile.getChannel().position(0);
ByteBuffer resultBuffer = ByteBuffer.allocate(40);
ByteBuffer firstBuffer = ByteBuffer.allocate(25);
ByteBuffer secondBuffer = ByteBuffer.allocate(25);
while (secondChannel.position() != secondChannel.size()){
Map <String, List<String>>table2Part = new HashMap();
for (int i = 0; i < secondChannel.size(); ++i){
if (secondChannel.read(secondBuffer) == -1)
break;
secondBuffer.rewind();
String[] table2Tuple = (new String(secondBuffer.array(), Charset.defaultCharset())).split(",");
if (!table2Part.containsKey(table2Tuple[0]))
table2Part.put(table2Tuple[0], new ArrayList());
table2Part.get(table2Tuple[0]).add(table2Tuple[1]);
secondBuffer.clear();
}
Set <String> taple2keys = table2Part.keySet();
while (firstChannel.read(firstBuffer) != -1){
firstBuffer.rewind();
String[] table1Tuple = (new String(firstBuffer.array(), Charset.defaultCharset())).split(",");
for (String table2key : taple2keys){
if (table1Tuple[0].equals(table2key)){
for (String value : table2Part.get(table2key)){
String result = table1Tuple[0] + "," + table1Tuple[1].substring(0,14) + "," + value; // 0,14 or result buffer will be overflown
resultBuffer.put(result.getBytes());
resultBuffer.rewind();
while(resultBuffer.hasRemaining()){
resultChannel.write(resultBuffer);
}
resultBuffer.clear();
}
}
}
firstBuffer.clear();
}
firstChannel.position(0);
table2Part.clear();
}
firstChannel.close();
secondChannel.close();
resultChannel.close();
System.out.println("Operation completed.");
}
}
如果中央内存是您的应用程序的限制,但您可以访问持久文件,我会按照 blahfunk 的建议创建一个临时 SQLite 文件到您的 tmp 文件夹,逐块读取每个文件并通过简单的连接合并它们.您可以通过查看 Hibernate, just take a look to what have I found on this Whosebug question: How to create database in Hibernate at runtime?
等库来创建临时 SQLite 数据库
如果您不能执行这样的任务,您剩下的选择是消耗更多 cpu 并仅加载第一个文件的第一行,在第二个文件中搜索具有相同索引的行,缓冲结果并在输出文件中尽可能晚地刷新它们,对第一个文件的每一行重复此操作。
也许您可以流式传输 first file
并将每一行变成 hashcode
并将所有这些 hashcodes
保存在内存中。然后流式传输 second file
并为每一行创建一个 hashcode
。如果 hashcode
在 first file
中,即在内存中,则不要写该行,否则写该行。之后,将 first file
完整地附加到 result file
.
这将有效地创建一个索引来比较您的更新。
一个非常容易实现的外部连接版本是 external hash join
。
它比外部合并排序连接更容易实现,并且只有一个缺点(稍后会详细介绍)。
它是如何工作的?
非常类似于哈希表。
选择一个数字 n
,表示您要将数据分发到多少个文件 ("buckets") 中。
然后执行以下操作:
- 设置
n
文件编写器
- 对于您要加入的每个文件和每一行:
- 获取要加入的密钥的哈希码
- 计算哈希码和
n
的模,这将得到 k
- 将您的 csv 行附加到第
k
个文件编写器
- Flush/Close 所有
n
位作者。
现在您有 n
个文件,希望更小,保证相同的密钥将始终在相同的文件中 。现在,您可以 运行 基于标准 HashMap/HashMultiSet
的每个文件分别加入。
限制
为什么我提到希望 较小的文件?好吧,这取决于密钥及其哈希码的分布。想想最坏的情况,你所有的文件都有完全相同的密钥:你只有一个文件,你没有从分区中获得任何东西。
类似于 倾斜 分布,有时您的一些存储桶文件太大而无法放入您的 RAM。
通常有三种方法可以摆脱这种困境:
- 运行 算法再次具有更大的
n
,因此您有更多的桶分配给
- 只取太大的桶并只对这些文件进行另一个散列分区传递(因此每个文件再次进入
n
个新创建的桶)
- 回退到对大分区文件进行外部合并排序。
有时这三者以不同的组合使用,这称为动态分区。
我需要非常了解 java 和内存问题的人的建议。我有一个很大的 CSV 文件(每个大约 500mb),我需要仅使用 64mb 的 xmx 将这些文件合并为一个文件。我试过用不同的方法来做,但没有任何效果——总是出现内存异常。我应该怎么做才能让它正常工作?
任务是: 开发一个简单的实现,以合理有效的方式连接两个输入表,并在需要时将两个表存储在 RAM 中。
我的代码可以运行,但它占用大量内存,因此不能容纳 64mb。
public class ImprovedInnerJoin {
public static void main(String[] args) throws IOException {
RandomAccessFile firstFile = new RandomAccessFile("input_A.csv", "r");
FileChannel firstChannel = firstFile.getChannel();
RandomAccessFile secondFile = new RandomAccessFile("input_B.csv", "r");
FileChannel secondChannel = secondFile.getChannel();
RandomAccessFile resultFile = new RandomAccessFile("result2.csv", "rw");
FileChannel resultChannel = resultFile.getChannel().position(0);
ByteBuffer resultBuffer = ByteBuffer.allocate(40);
ByteBuffer firstBuffer = ByteBuffer.allocate(25);
ByteBuffer secondBuffer = ByteBuffer.allocate(25);
while (secondChannel.position() != secondChannel.size()){
Map <String, List<String>>table2Part = new HashMap();
for (int i = 0; i < secondChannel.size(); ++i){
if (secondChannel.read(secondBuffer) == -1)
break;
secondBuffer.rewind();
String[] table2Tuple = (new String(secondBuffer.array(), Charset.defaultCharset())).split(",");
if (!table2Part.containsKey(table2Tuple[0]))
table2Part.put(table2Tuple[0], new ArrayList());
table2Part.get(table2Tuple[0]).add(table2Tuple[1]);
secondBuffer.clear();
}
Set <String> taple2keys = table2Part.keySet();
while (firstChannel.read(firstBuffer) != -1){
firstBuffer.rewind();
String[] table1Tuple = (new String(firstBuffer.array(), Charset.defaultCharset())).split(",");
for (String table2key : taple2keys){
if (table1Tuple[0].equals(table2key)){
for (String value : table2Part.get(table2key)){
String result = table1Tuple[0] + "," + table1Tuple[1].substring(0,14) + "," + value; // 0,14 or result buffer will be overflown
resultBuffer.put(result.getBytes());
resultBuffer.rewind();
while(resultBuffer.hasRemaining()){
resultChannel.write(resultBuffer);
}
resultBuffer.clear();
}
}
}
firstBuffer.clear();
}
firstChannel.position(0);
table2Part.clear();
}
firstChannel.close();
secondChannel.close();
resultChannel.close();
System.out.println("Operation completed.");
}
}
如果中央内存是您的应用程序的限制,但您可以访问持久文件,我会按照 blahfunk 的建议创建一个临时 SQLite 文件到您的 tmp 文件夹,逐块读取每个文件并通过简单的连接合并它们.您可以通过查看 Hibernate, just take a look to what have I found on this Whosebug question: How to create database in Hibernate at runtime?
等库来创建临时 SQLite 数据库如果您不能执行这样的任务,您剩下的选择是消耗更多 cpu 并仅加载第一个文件的第一行,在第二个文件中搜索具有相同索引的行,缓冲结果并在输出文件中尽可能晚地刷新它们,对第一个文件的每一行重复此操作。
也许您可以流式传输 first file
并将每一行变成 hashcode
并将所有这些 hashcodes
保存在内存中。然后流式传输 second file
并为每一行创建一个 hashcode
。如果 hashcode
在 first file
中,即在内存中,则不要写该行,否则写该行。之后,将 first file
完整地附加到 result file
.
这将有效地创建一个索引来比较您的更新。
一个非常容易实现的外部连接版本是 external hash join
。
它比外部合并排序连接更容易实现,并且只有一个缺点(稍后会详细介绍)。
它是如何工作的?
非常类似于哈希表。
选择一个数字 n
,表示您要将数据分发到多少个文件 ("buckets") 中。
然后执行以下操作:
- 设置
n
文件编写器 - 对于您要加入的每个文件和每一行:
- 获取要加入的密钥的哈希码
- 计算哈希码和
n
的模,这将得到k
- 将您的 csv 行附加到第
k
个文件编写器
- Flush/Close 所有
n
位作者。
现在您有 n
个文件,希望更小,保证相同的密钥将始终在相同的文件中 。现在,您可以 运行 基于标准 HashMap/HashMultiSet
的每个文件分别加入。
限制
为什么我提到希望 较小的文件?好吧,这取决于密钥及其哈希码的分布。想想最坏的情况,你所有的文件都有完全相同的密钥:你只有一个文件,你没有从分区中获得任何东西。
类似于 倾斜 分布,有时您的一些存储桶文件太大而无法放入您的 RAM。 通常有三种方法可以摆脱这种困境:
- 运行 算法再次具有更大的
n
,因此您有更多的桶分配给 - 只取太大的桶并只对这些文件进行另一个散列分区传递(因此每个文件再次进入
n
个新创建的桶) - 回退到对大分区文件进行外部合并排序。
有时这三者以不同的组合使用,这称为动态分区。