从多个服务器加载数据时避免数据重复

Avoiding Data Duplication when Loading Data from Multiple Servers

我有十几个网络服务器,每个服务器都将数据写入日志文件。在每个小时的开始,使用 cron 脚本 运行 命令将前一小时的数据加载到配置单元中:

hive -e "LOAD DATA LOCAL INPATH 'myfile.log' INTO TABLE my_table PARTITION(dt='2015-08-17-05')"

在某些情况下,命令会失败并以 0 以外的代码退出,在这种情况下,我们的脚本会等待并再次尝试。问题是,在某些失败的情况下,数据加载确实 而不是 失败,即使它显示失败消息。如何确定数据是否已加载?

此类 "failure" 的示例,其中数据 已被 加载:

Loading data to table default.my_table partition (dt=2015-08-17-05) Failed with exception org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter partition. FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MoveTask

编辑: 或者,有没有办法查询配置单元中加载的文件名?我可以使用 DESCRIBE 查看文件数。我可以知道他们的名字吗?

我不相信您可以在 Hadoop/Hive 中简单地做到这一点。所以这里是 python 中实现的基础知识:

import subprocess
x=subprocess.check_output([hive -e "select count(*) from my_table where dt='2015-08-17-05'"])
print type(x)
print x

但是您必须花一些时间使用反斜杠才能让 hive -e 使用 python 工作。这可能非常困难。先写一个包含简单查询的文件,然后使用 hive -f filename 可能更容易。然后,打印 subprocess.check_output 的输出以查看输出是如何存储的。您可能需要进行一些正则表达式或类型转换,但我认为它应该作为字符串返回。然后简单地使用一个 if 语句:

if x > 0:
    pass
else:
    hive -e "LOAD DATA LOCAL INPATH 'myfile.log' INTO TABLE my_table PARTITION(dt='2015-08-17-05')"

关于"which files have been loaded in a partition":

  • 如果您使用了 EXTERNAL TABLE 并且刚刚上传了您的原始数据 映射到 LOCATION 的 HDFS 目录中的文件,那么您可以

(a) 只需从命令行在该目录上 运行 a hdfs dfs -ls (或使用等效的 Java API 调用) (b) 运行 一个 Hive 查询,例如 select distinct INPUT__FILE__NAME from (...)

  • 但在你的情况下,你将数据复制到 "managed" table,所以有 无法检索数据沿袭(即使用了哪个日志文件 创建每个托管数据文件)
  • ...除非您添加 明确地 原始文件名 inside 日志文件, 课程(在 "special" header 记录上,或在每条记录的开头 - 这可以用旧 sed 完成)

关于"how to automagically avoid duplication on INSERT":有一种方法,但它需要相当多的re-engineering,并且会花费你处理时间/(额外的地图步骤加上地图连接)/。 ..

  1. 将您的日志文件映射到 EXTERNAL TABLE 以便您可以 运行 INSERT-SELECT查询
  2. 使用 INPUT__FILE__NAME pseudo-column 作为来源
  3. 将原始文件名上传到您的托管 table
  4. 添加一个带有相关 sub-query 的 WHERE NOT EXISTS 子句,这样如果源文件名已经存在于目标中,那么您不再加载任何内容

    INSERT INTO TABLE Target SELECT ColA, ColB, ColC, INPUT__FILE__NAME AS SrcFileName FROM Source src WHERE NOT EXISTS (SELECT DISTINCT 1 FROM Target trg WHERE trg.SrcFileName =src.INPUT__FILE__NAME )

    注意愚蠢的 DISTINCT,它实际上是避免耗尽 Mappers 中的 RAM 所必需的;对于像 Oracle 这样成熟的 DBMS 来说是没有用的,但是 Hive 优化器仍然相当粗糙...