使用 NiFi 确定 Oracle INSERT 或 UPDATE 的最佳方法

Best approach to determine Oracle INSERT or UPDATE using NiFi

我有一个 JSON 流文件,我需要确定我是否应该执行插入或更新。诀窍是只更新与 JSON 属性匹配的列。我有一个 ExecuteSQL 正在工作,它 returns executesql.row.count,但是我丢失了我打算用作路由属性的原始 JSON 流文件。我试图让 MergeContent 加入 ExecuteSQL(转储 Avro 输出,我只需要 executesql.row.count 属性)和 JSON 流。在执行 ExecuteSQL 之前我已经设置了 follow:

fragment.count=2

fragment.identifier=${UUID()}

fragment.index=${nextInt()}

或者我可以创建一个 MERGE,如果有一种方法可以遍历匹配 Oracle table?

的 JSON 属性列表

你的 JSON 有多大?如果它很小,您可以考虑使用 ExtractText(匹配整个文档)将 JSON 放入属性中。然后您可以 运行 ExecuteSQL,然后 ReplaceText 将 JSON 放回内容中(覆盖 Avro 结果)。如果您的 JSON 很大,您可以使用 "Put Cache Value In Attribute" 属性 设置 DistributedMapCacheServer and (in a separate flow) run ExecuteSQL and store the value or executesql.row.count into the cache. Then in the JSON flow you can use FetchDistributedMapCache

如果你只需要JSON来使用RouteOnAttribute,也许你可以在ExecuteSQL之前使用EvaluateJsonPath,这样你的条件已经在属性中,你可以替换流文件内容。

如果您想使用 MergeContent,您可以将 fragment.count 设置为 2,而不是使用 UUID() 函数,您可以使用 UpdateAttribute 将 "parent.identifier" 设置为“${uuid}” ,然后 DuplicateFlowFile 创建 2 个副本,然后 UpdateAttribute 将 "fragment.identifier" 设置为 "${parent.identifier}" 并将 "fragment.index" 设置为 "${nextInt():mod(2)}”。这提供了一组可合并的两个流文件,您可以在 fragment.index 上路由为 0 或 1,将一个发送到 ExecuteSQL,一个通过另一个流,在 MergeContent 加入备份。

另一种选择是使用 ConvertJSONToSQL 设置为 "UPDATE",如果失败,将这些流文件路由到另一个设置为 "INSERT" 的 ConvertJSONToSQL 处理器。