如何将出处信息关联到 NiFi 中的 ExecuteSQL 结果?
How to associate provenance information to an ExecuteSQL result in NiFi?
我在数据库中列出 table 之后执行一系列查询,基本上是在添加一些智能后进行数据库转储。
当数据从 ExecuteSql
处理器中出来时,它是 Avro 格式。我可以使用 ConvertAvroToJson
转换为 JSON。然后我可以将 JSON 发送到其他地方。太棒了!
但是,我需要在该 JSON 文档中嵌入更多信息。我至少想要:
- 执行查询的 table 的名称,
- 数据库 DSN(没有凭证,可能硬编码到配置中,因为我认为它不能从 NiFi 表达式语言字段访问),
- 执行生成记录的查询,
- 查询集中的记录数(总行数已经是一个属性)。
- 从环境变量或配置文件中提取的任意信息,回退到
UpdateAttributes
处理器中的硬编码,否则
ExecuteSQL
似乎没有提供此信息,但 it does seem to copy attributes from the input flowfile。我可能能够通过管道中较早的 UpdateAttributes
将一些信息放入输入流文件属性中。如果可能的话,我如何将 ExecuteSQL -> ConvertAvroToJSON
的 JSON 输出与 AttributesToJson
处理器输出的属性合并?
一些评论:
1) 您是先使用 ListDatabaseTables 吗?如果是这样,您的流文件将具有属性,包括 table 名称、数据库名称等
2) 此信息当前不可用于处理器,DBCPService API 仅公开 JDBC Connection. Perhaps you could use a scripted processor like ExecuteScript to execute the SQL (I have an example on my blog) 并访问元数据,但我不确定它是否包含您想要的所有信息。该处理器的出处也没有该信息,因为我们没有存储出处事件的传输 URI。我们应该考虑至少让处理器可以使用传输 URI(减去敏感值)。
3) 如果您在 ExecuteSQL 处理器中对 SQL 查询进行硬编码,您也可以预先使用 UpdateAttribute 将其硬编码到属性中。如果流文件包含 SQL 查询,您可以使用 ExtractText 将其放入属性中。我们还应该增强处理器以将查询添加为属性(必须是可选的,因为某些查询可能非常大)。
4) 您是否希望将每条记录与其 "row number" 作为属性进行拆分?在 SplitAvro 之后,您将拥有一个 fragment.index 属性,该属性是与每条记录关联的从零开始的数字。
5) 您可以在 UpdateAttribute 中使用表达式语言,因此结合 Variable Registry you could read values from environment variables or a registry file. You may also be interested in PropertiesFileLookupService.
我在数据库中列出 table 之后执行一系列查询,基本上是在添加一些智能后进行数据库转储。
当数据从 ExecuteSql
处理器中出来时,它是 Avro 格式。我可以使用 ConvertAvroToJson
转换为 JSON。然后我可以将 JSON 发送到其他地方。太棒了!
但是,我需要在该 JSON 文档中嵌入更多信息。我至少想要:
- 执行查询的 table 的名称,
- 数据库 DSN(没有凭证,可能硬编码到配置中,因为我认为它不能从 NiFi 表达式语言字段访问),
- 执行生成记录的查询,
- 查询集中的记录数(总行数已经是一个属性)。
- 从环境变量或配置文件中提取的任意信息,回退到
UpdateAttributes
处理器中的硬编码,否则
ExecuteSQL
似乎没有提供此信息,但 it does seem to copy attributes from the input flowfile。我可能能够通过管道中较早的 UpdateAttributes
将一些信息放入输入流文件属性中。如果可能的话,我如何将 ExecuteSQL -> ConvertAvroToJSON
的 JSON 输出与 AttributesToJson
处理器输出的属性合并?
一些评论:
1) 您是先使用 ListDatabaseTables 吗?如果是这样,您的流文件将具有属性,包括 table 名称、数据库名称等
2) 此信息当前不可用于处理器,DBCPService API 仅公开 JDBC Connection. Perhaps you could use a scripted processor like ExecuteScript to execute the SQL (I have an example on my blog) 并访问元数据,但我不确定它是否包含您想要的所有信息。该处理器的出处也没有该信息,因为我们没有存储出处事件的传输 URI。我们应该考虑至少让处理器可以使用传输 URI(减去敏感值)。
3) 如果您在 ExecuteSQL 处理器中对 SQL 查询进行硬编码,您也可以预先使用 UpdateAttribute 将其硬编码到属性中。如果流文件包含 SQL 查询,您可以使用 ExtractText 将其放入属性中。我们还应该增强处理器以将查询添加为属性(必须是可选的,因为某些查询可能非常大)。
4) 您是否希望将每条记录与其 "row number" 作为属性进行拆分?在 SplitAvro 之后,您将拥有一个 fragment.index 属性,该属性是与每条记录关联的从零开始的数字。
5) 您可以在 UpdateAttribute 中使用表达式语言,因此结合 Variable Registry you could read values from environment variables or a registry file. You may also be interested in PropertiesFileLookupService.