如何使用 Apache Nifi 加入两个 CSV

How to join two CSVs with Apache Nifi

我正在研究 ETL 工具(如 Talend)并调查是否可以使用 Apache Nifi。 Nifi 能否用于执行以下操作:

  1. 选取两个放在本地磁盘上的 CSV 文件
  2. 在公共列上加入 CSV
  3. 将加入的 CSV 写入磁盘

我试过在 Nifi 中设置作业,但看不到如何执行两个单独的 CSV 文件的连接。这个任务在 Apache Nifi 中可行吗?

看起来 QueryDNS processor 可用于使用另一个 CSV 文件来丰富一个 CSV 文件,但对于此用例来说这似乎过于复杂。

这是输入 CSV 的示例,需要在 state_id 上加入:

输入文件

customers.csv

id | name | address      | state_id
---|------|--------------|---------
1  | John | 10 Blue Lane | 100
2  | Bob  | 15 Green St. | 200

states.csv

state_id | state
---------|---------
100      | Alabama
200      | New York

输出文件

output.csv

id | name | address      | state
---|------|--------------|---------
1  | John | 10 Blue Lane | Alabama
2  | Bob  | 15 Green St. | New York

为此遵循的典型模式是将参考集加载到 NiFi 中的地图缓存控制器服务中。在本例中是 states.csv 数据。然后客户数据的实时提要进来,并使用像 ReplaceText or you could even write a custom processor in Groovy. There are a lot of ways to slice this. There is also a JIRA/PR 这样的参考数据来丰富这个参考数据,使这更容易。实时流连接的某些元素最好在 Apache Storm、Spark 和 Flink 等处理系统中完成,但对于您提到的情况,它可以在 NiFi 中完成。

Apache NiFi 更像是一种数据流工具,并不是真正用于执行流数据的任意连接。通常这些类型的操作更适合流处理系统,如 Storm、Flink、Apex 等,或 ETL 工具。

NiFi 擅长的连接类型是丰富查找,其中有固定大小的查找数据集,对于传入数据中的每条记录,您使用查找数据集检索一些值。例如,在您的情况下,可能有一个名为 LookUpState 的处理器,它有一个 属性 "State Data" 指向一个包含所有状态的文件,那么 customers.csv 可能是该处理器的输入.

一位社区成员启动了一个为 NiFi 提供通用查找服务的项目: https://github.com/jfrazee/nifi-lookup-service

workflow

我还尝试使用公共列连接两个 CSV 文件,并使用 nifi 中的查找记录属性成功完成 lookup record config

在这里,我使用 simplecsvlookup 服务作为我的查找服务,我还附加了它的配置 simplecsvlookup configuration

我们首先要学习的是如何使用查找记录属性。在这里,我有两个 csv 文件:

sample.csv: id,msisdn,recharge_amount 1,9048108594,399

new1: msisdn,类型 9048108594,1

输出: id,msisdn,recharge_amount,类型 1,9048108594,399,1

最需要注意的是结果记录路径和key 在这种情况下,密钥是 msisdn(因为这个是两个文件中的公共密钥) 并且,对于结果记录路径,我们应该使用我们需要合并的列名,在本例中为 "type:"

result record path--->> /type
key----->> /msisdn

并且,在查找服务中,给出相应的键和值名称。

它会起作用。