如何使用 Apache Nifi 加入两个 CSV
How to join two CSVs with Apache Nifi
我正在研究 ETL 工具(如 Talend)并调查是否可以使用 Apache Nifi。 Nifi 能否用于执行以下操作:
- 选取两个放在本地磁盘上的 CSV 文件
- 在公共列上加入 CSV
- 将加入的 CSV 写入磁盘
我试过在 Nifi 中设置作业,但看不到如何执行两个单独的 CSV 文件的连接。这个任务在 Apache Nifi 中可行吗?
看起来 QueryDNS processor 可用于使用另一个 CSV 文件来丰富一个 CSV 文件,但对于此用例来说这似乎过于复杂。
这是输入 CSV 的示例,需要在 state_id 上加入:
输入文件
customers.csv
id | name | address | state_id
---|------|--------------|---------
1 | John | 10 Blue Lane | 100
2 | Bob | 15 Green St. | 200
states.csv
state_id | state
---------|---------
100 | Alabama
200 | New York
输出文件
output.csv
id | name | address | state
---|------|--------------|---------
1 | John | 10 Blue Lane | Alabama
2 | Bob | 15 Green St. | New York
为此遵循的典型模式是将参考集加载到 NiFi 中的地图缓存控制器服务中。在本例中是 states.csv
数据。然后客户数据的实时提要进来,并使用像 ReplaceText
or you could even write a custom processor in Groovy. There are a lot of ways to slice this. There is also a JIRA/PR 这样的参考数据来丰富这个参考数据,使这更容易。实时流连接的某些元素最好在 Apache Storm、Spark 和 Flink 等处理系统中完成,但对于您提到的情况,它可以在 NiFi 中完成。
Apache NiFi 更像是一种数据流工具,并不是真正用于执行流数据的任意连接。通常这些类型的操作更适合流处理系统,如 Storm、Flink、Apex 等,或 ETL 工具。
NiFi 擅长的连接类型是丰富查找,其中有固定大小的查找数据集,对于传入数据中的每条记录,您使用查找数据集检索一些值。例如,在您的情况下,可能有一个名为 LookUpState 的处理器,它有一个 属性 "State Data" 指向一个包含所有状态的文件,那么 customers.csv 可能是该处理器的输入.
一位社区成员启动了一个为 NiFi 提供通用查找服务的项目:
https://github.com/jfrazee/nifi-lookup-service
workflow
我还尝试使用公共列连接两个 CSV 文件,并使用 nifi 中的查找记录属性成功完成 lookup record config
在这里,我使用 simplecsvlookup
服务作为我的查找服务,我还附加了它的配置 simplecsvlookup configuration
我们首先要学习的是如何使用查找记录属性。在这里,我有两个 csv 文件:
sample.csv:
id,msisdn,recharge_amount
1,9048108594,399
new1:
msisdn,类型
9048108594,1
输出:
id,msisdn,recharge_amount,类型
1,9048108594,399,1
最需要注意的是结果记录路径和key
在这种情况下,密钥是 msisdn(因为这个是两个文件中的公共密钥)
并且,对于结果记录路径,我们应该使用我们需要合并的列名,在本例中为 "type:"
result record path--->> /type
key----->> /msisdn
并且,在查找服务中,给出相应的键和值名称。
它会起作用。
我正在研究 ETL 工具(如 Talend)并调查是否可以使用 Apache Nifi。 Nifi 能否用于执行以下操作:
- 选取两个放在本地磁盘上的 CSV 文件
- 在公共列上加入 CSV
- 将加入的 CSV 写入磁盘
我试过在 Nifi 中设置作业,但看不到如何执行两个单独的 CSV 文件的连接。这个任务在 Apache Nifi 中可行吗?
看起来 QueryDNS processor 可用于使用另一个 CSV 文件来丰富一个 CSV 文件,但对于此用例来说这似乎过于复杂。
这是输入 CSV 的示例,需要在 state_id 上加入:
输入文件
customers.csv
id | name | address | state_id
---|------|--------------|---------
1 | John | 10 Blue Lane | 100
2 | Bob | 15 Green St. | 200
states.csv
state_id | state
---------|---------
100 | Alabama
200 | New York
输出文件
output.csv
id | name | address | state
---|------|--------------|---------
1 | John | 10 Blue Lane | Alabama
2 | Bob | 15 Green St. | New York
为此遵循的典型模式是将参考集加载到 NiFi 中的地图缓存控制器服务中。在本例中是 states.csv
数据。然后客户数据的实时提要进来,并使用像 ReplaceText
or you could even write a custom processor in Groovy. There are a lot of ways to slice this. There is also a JIRA/PR 这样的参考数据来丰富这个参考数据,使这更容易。实时流连接的某些元素最好在 Apache Storm、Spark 和 Flink 等处理系统中完成,但对于您提到的情况,它可以在 NiFi 中完成。
Apache NiFi 更像是一种数据流工具,并不是真正用于执行流数据的任意连接。通常这些类型的操作更适合流处理系统,如 Storm、Flink、Apex 等,或 ETL 工具。
NiFi 擅长的连接类型是丰富查找,其中有固定大小的查找数据集,对于传入数据中的每条记录,您使用查找数据集检索一些值。例如,在您的情况下,可能有一个名为 LookUpState 的处理器,它有一个 属性 "State Data" 指向一个包含所有状态的文件,那么 customers.csv 可能是该处理器的输入.
一位社区成员启动了一个为 NiFi 提供通用查找服务的项目: https://github.com/jfrazee/nifi-lookup-service
workflow
我还尝试使用公共列连接两个 CSV 文件,并使用 nifi 中的查找记录属性成功完成 lookup record config
在这里,我使用 simplecsvlookup
服务作为我的查找服务,我还附加了它的配置 simplecsvlookup configuration
我们首先要学习的是如何使用查找记录属性。在这里,我有两个 csv 文件:
sample.csv: id,msisdn,recharge_amount 1,9048108594,399
new1: msisdn,类型 9048108594,1
输出: id,msisdn,recharge_amount,类型 1,9048108594,399,1
最需要注意的是结果记录路径和key 在这种情况下,密钥是 msisdn(因为这个是两个文件中的公共密钥) 并且,对于结果记录路径,我们应该使用我们需要合并的列名,在本例中为 "type:"
result record path--->> /type
key----->> /msisdn
并且,在查找服务中,给出相应的键和值名称。
它会起作用。