在hadoop中合并小文件

Merging small files in hadoop

我在 HDFS 中有一个目录(Final Dir),其中每分钟加载一些文件(例如:10 mb)。 一段时间后,我想将所有小文件组合成一个大文件(例如:100 mb)。但是用户不断地将文件推送到 Final Dir。这是一个连续的过程。

所以我第一次需要将前 10 个文件组合成一个大文件(例如:large.txt)并将文件保存到 Finaldir。

现在我的问题是如何获取除前 10 个文件之外的后 10 个文件?

能不能帮帮我

@Andrew 向您指出了 6 年前在面向批处理的世界中合适的解决方案。
但现在是 2016 年,您有微批处理数据流 运行ning 并且需要 非阻塞 解决方案。

我就是这样做的:

  • 创建一个具有 3 个分区的 EXTERNAL table,映射到 3 个目录 例如new_datareorghistory
  • 将新文件输入 new_data
  • 执行一个作业来 运行 批量压缩,并且 运行 它周期性地

现在批量压缩逻辑:

  1. 确保在压缩 运行ning 时不会执行任何 SELECT 查询,否则它会 return 重复
  2. select所有适合压缩的文件(定义你自己的 criteria) 将它们new_data 目录移动到 reorg
  3. 合并所有这些reorg文件的内容,到history目录中的一个新文件中(随意GZip它在运行中,Hive 将识别 .gz 扩展)
  4. 删除 reorg
  5. 中的文件

所以这基本上是 2010 年的旧故事,除了您现有的数据流可以继续将新文件转储到 new_data 中,同时压缩在单独的目录中安全地 运行ning。如果压缩作业崩溃,您可以安全地调查/清理/恢复压缩,而不会影响数据流。


顺便说一句,我不太喜欢基于 "Hadoop Streaming" 工作的 2010 解决方案——一方面,"streaming" 现在的含义非常不同;另一方面,"Hadoop streaming" 在过去很有用,但现在已不复存在;在紧握的手上 [*] 您可以使用 Hive 查询非常简单地完成它,例如

INSERT INTO TABLE blahblah PARTITION (stage='history')
SELECT a, b, c, d
FROM blahblah
WHERE stage='reorg'
;

在该查询之前有几个 SET some.property = somevalue,您可以定义将在结果文件上应用的压缩编解码器,您想要多少个文件(或更准确地说,有多大)您希望文件是 - Hive 将 运行 相应地合并),等等

查看 hive.merge.mapfileshive.merge.mapredfiles 下的 https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties(如果您使用 TEZ,则查看 hive.merge.tezfiles)和 hive.merge.smallfiles.avgsize,然后是 hive.exec.compress.outputmapreduce.output.fileoutputformat.compress.codec -- 加上 hive.hadoop.supports.splittable.combineinputformat 以减少地图容器的数量,因为您的输入文件非常小。


[*] 非常古老的 SF 参考资料 :-)

还有一个替代方法,这仍然是@Andrew 在他的评论中指出的遗留方法,但是有额外的步骤将您的输入文件夹作为缓冲区接收小文件,及时将它们推送到 tmp 目录时尚并合并它们并将结果推回输入。

第 1 步:创建一个 tmp 目录

hadoop fs -mkdir tmp

第2步:在某个时间点将所有小文件移动到tmp目录

hadoop fs -mv input/*.txt tmp

步骤 3 - 在 hadoop-streaming jar 的帮助下合并小文件

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
                   -Dmapred.reduce.tasks=1 \
                   -input "/user/abc/input" \
                   -output "/user/abc/output" \
                   -mapper cat \
                   -reducer cat

第 4 步 - 将输出移动到输入文件夹

hadoop fs -mv output/part-00000 input/large_file.txt

第 5 步 - 删除输出

 hadoop fs -rm -R output/

第 6 步 - 从 tmp

中删除所有文件
hadoop fs -rm tmp/*.txt

从第 2 步到第 6 步创建一个 shell 脚本,并定期将其安排到 运行 以定期合并较小的文件(可能根据您的需要每分钟)

为合并小文件安排 cron 作业的步骤

第 1 步:在上述步骤(2 到 6)

的帮助下创建 shell 脚本 /home/abc/mergejob.sh

重要提示:需要在脚本中指定hadoop的绝对路径才能被cron理解

#!/bin/bash
/home/abc/hadoop-2.6.0/bin/hadoop fs -mv input/*.txt tmp
wait
/home/abc/hadoop-2.6.0/bin/hadoop jar /home/abc/hadoop-2.6.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
                   -Dmapred.reduce.tasks=1 \
                   -input "/user/abc/input" \
                   -output "/user/abc/output" \
                   -mapper cat \
                   -reducer cat
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -mv output/part-00000 input/large_file.txt
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -rm -R output/
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -rm tmp/*.txt

第 2 步:使用 cron 表达式 运行 将脚本安排为每分钟

a) 通过选择编辑器编辑 crontab

>crontab -e

b) 在末尾添加以下行并退出编辑器

* * * * * /bin/bash /home/abc/mergejob.sh > /dev/null 2>&1

合并作业将安排为每分钟 运行。

希望这对您有所帮助。