NiFi 中的命名约定

Naming convention in NiFi

我有一个流程GetFile->ConvertRecord->splittext->PutdatabaseRecord。我发送到 GetFile 的 csv 文件包含以下字段:

ID  TIME                      M00B01  M00B02  M00B03
1   2018-09-27 10:44:23.972   3242    35      335
2   2018-09-21 11:44:23.972   323     24      978

我的数据库table在MYSQL的骨架如下:

Create table test(ID INT,TIME DATETIME(3),MxB01 INT,MxB02 INT,MxB03 INT);

注意:我已将 headers 的名称替换为 MxB00、MxB01 等。

我的 convertRecord 处理器有错误,因为我以 CSVReader 读取并以 CSVSetWritter 写入。我附上两者的配置供您参考。

问题是它读取 CSV 文件,但由于 header 名称的更改,它使所有其他字段为空白(我更改了 header 名称,因为我必须编写 headers 的名称为 MxB00,以便匹配 MySQL table 中定义的 headers。我得到了 ID 和 Time 的值,因为我没有更改 CSVWritter 中那些字段的 Header 名称和 MySQL table 定义。所以我得到了这些值,但是对于其他值,我得到了空白因为名称更改导致它变得混乱。

我该如何解决这个问题?任何帮助都非常appreciated.Thank你!

如你所愿为输出 CSV 文件header创建自定义 header 然后配置 CSV reader 控制器服务如下图。

配置:

因为我们使用模式访问策略作为模式文本并将模式指定为

{
"type": "record",
"name": "SQLSchema",
"fields" : [
{"name": "ID", "type": ["null","int"]},
{"name": "TIME", "type": ["null","string"]},
{"name": "MxB01", "type": ["null","int"]},
{"name": "MxB02", "type": ["null","int"]},
{"name": "MxB03", "type": ["null","int"]}
]
}

并且我们将第一行 csv 数据视为 Header 并且 忽略 CSV header 列名称 ,因此输出流文件将具有我们上面定义的模式。

CsvWriter 配置:

由于我们正在继承模式写入策略,因此输出流文件将具有与我们在 reader.

中指定的内容相同的 header

此外我不确定你为什么在 ConvertRecord 之后使用 SplitText 处理器作为 PutDatabaseRecord 处理器,旨在一次处理大量记录。

即使您可以使用上面提到的 CsvReader 控制器服务 配置 PutDatabaseRecord 处理器,那么您的流程将是:

流量:

GetFile -> PutDatabaseRecord

注:

因为我没有在时间戳字段中使用 Avro 逻辑类型,如果您使用的是逻辑类型,那么请相应地更改控制器服务配置。

尝试GetFile->ReplaceText-->ConvertRecord->splittext->PutdatabaseRecord。

配置:

搜索值:输入headers, 重置值:新 headers, 替换策略:文字替换, 评估模式;全文