使用内部联接合并 2 个大型 csv 文件

Merge 2 large csv files using inner join

我需要非常了解 java 和内存问题的人的建议。我有一个很大的 CSV 文件(每个大约 500mb),我需要仅使用 64mb 的 xmx 将这些文件合并为一个文件。我试过用不同的方法来做,但没有任何效果——总是出现内存异常。我应该怎么做才能让它正常工作?

任务是: 开发一个简单的实现,以合理有效的方式连接两个输入表,并在需要时将两个表存储在 RAM 中。

我的代码可以运行,但它占用大量内存,因此不能容纳 64mb。

public class ImprovedInnerJoin {
public static void main(String[] args) throws IOException {

    RandomAccessFile firstFile     = new RandomAccessFile("input_A.csv", "r");
    FileChannel      firstChannel = firstFile.getChannel();
    RandomAccessFile secondFile     = new RandomAccessFile("input_B.csv", "r");
    FileChannel      secondChannel = secondFile.getChannel();
    RandomAccessFile resultFile     = new RandomAccessFile("result2.csv", "rw");
    FileChannel      resultChannel = resultFile.getChannel().position(0);

    ByteBuffer resultBuffer = ByteBuffer.allocate(40);
    ByteBuffer firstBuffer = ByteBuffer.allocate(25);
    ByteBuffer secondBuffer = ByteBuffer.allocate(25);

    while (secondChannel.position() != secondChannel.size()){
        Map <String, List<String>>table2Part = new HashMap();
        for (int i = 0; i < secondChannel.size(); ++i){
            if (secondChannel.read(secondBuffer) == -1)
                break;
            secondBuffer.rewind();
            String[] table2Tuple = (new String(secondBuffer.array(), Charset.defaultCharset())).split(",");
            if (!table2Part.containsKey(table2Tuple[0]))
                table2Part.put(table2Tuple[0], new ArrayList());
            table2Part.get(table2Tuple[0]).add(table2Tuple[1]);
            secondBuffer.clear();
        }

        Set <String> taple2keys = table2Part.keySet();
        while (firstChannel.read(firstBuffer) != -1){
            firstBuffer.rewind();
            String[] table1Tuple = (new String(firstBuffer.array(), Charset.defaultCharset())).split(",");
            for (String table2key : taple2keys){
                if (table1Tuple[0].equals(table2key)){
                    for (String value : table2Part.get(table2key)){
                        String result = table1Tuple[0] + "," + table1Tuple[1].substring(0,14) + "," + value; // 0,14 or result buffer will be overflown
                        resultBuffer.put(result.getBytes());
                        resultBuffer.rewind();
                        while(resultBuffer.hasRemaining()){
                            resultChannel.write(resultBuffer);
                        }
                        resultBuffer.clear();
                    }
                }
            }
            firstBuffer.clear();
        }
        firstChannel.position(0);
        table2Part.clear();
    }

    firstChannel.close();
    secondChannel.close();
    resultChannel.close();
    System.out.println("Operation completed.");
}
}

如果中央内存是您的应用程序的限制,但您可以访问持久文件,我会按照 blahfunk 的建议创建一个临时 SQLite 文件到您的 tmp 文件夹,逐块读取每个文件并通过简单的连接合并它们.您可以通过查看 Hibernate, just take a look to what have I found on this Whosebug question: How to create database in Hibernate at runtime?

等库来创建临时 SQLite 数据库

如果您不能执行这样的任务,您剩下的选择是消耗更多 cpu 并仅加载第一个文件的第一行,在第二个文件中搜索具有相同索引的行,缓冲结果并在输出文件中尽可能晚地刷新它们,对第一个文件的每一行重复此操作。

也许您可以流式传输 first file 并将每一行变成 hashcode 并将所有这些 hashcodes 保存在内存中。然后流式传输 second file 并为每一行创建一个 hashcode。如果 hashcodefirst file 中,即在内存中,则不要写该行,否则写该行。之后,将 first file 完整地附加到 result file.

这将有效地创建一个索引来比较您的更新。

一个非常容易实现的外部连接版本是 external hash join。 它比外部合并排序连接更容易实现,并且只有一个缺点(稍后会详细介绍)。

它是如何工作的?

非常类似于哈希表。 选择一个数字 n,表示您要将数据分发到多少个文件 ("buckets") 中。

然后执行以下操作:

  • 设置n 文件编写器
  • 对于您要加入的每个文件和每一行:
    • 获取要加入的密钥的哈希码
    • 计算哈希码和 n 的模,这将得到 k
    • 将您的 csv 行附加到第 k 个文件编写器
  • Flush/Close 所有 n 位作者。

现在您有 n 个文件,希望更小,保证相同的密钥将始终在相同的文件中 。现在,您可以 运行 基于标准 HashMap/HashMultiSet 的每个文件分别加入。

限制

为什么我提到希望 较小的文件?好吧,这取决于密钥及其哈希码的分布。想想最坏的情况,你所有的文件都有完全相同的密钥:你只有一个文件,你没有从分区中获得任何东西。

类似于 倾斜 分布,有时您的一些存储桶文件太大而无法放入您的 RAM。 通常有三种方法可以摆脱这种困境:

  1. 运行 算法再次具有更大的 n,因此您有更多的桶分配给
  2. 只取太大的桶并只对这些文件进行另一个散列分区传递(因此每个文件再次进入 n 个新创建的桶)
  3. 回退到对大分区文件进行外部合并排序。

有时这三者以不同的组合使用,这称为动态分区