NiFi 替换文本过程,尝试用属性值替换第 n 列最终出错

NiFi Replace Text process, trying to replace nth column with an attribute value ends up in error

使用替换文本处理器将 44 列文件中的几列替换为逗号 (,) 作为分隔符。

一个流文件只有一行 44 个字段。

在替换文本处理器中,

我需要用属性更改流文件中的第 3 列。 所以我把它分成 4 组,并用一个属性单独替换第二组的数据。

如果我这样做,处理器会挂起。如何用特定属性或字符串替换第 n 列?

您使用的是什么版本的 NiFi?从 NiFi 1.3.0 开始,您可以使用 "record-aware" 处理器,例如 UpdateRecord. You would configure a CSVReader,可能是通过从 header 行推断字符串字段或为字段提供您自己的 Avro 模式。假设所需 column/field 的名称是 "fname"。在 UpdateRecord 中,您可以将 Replacement Strategy 设置为 "Literal" 并添加名为“/fname”的 user-defined 属性,值为“${filename}”。这应该允许您更新 CSV 文件 in-place,而无需拆分行或处理正则表达式来解析行。

注意:如果使用Apache NiFi 1.3.0+版本,Matt的方式更好

我的建议是使用 ExecuteScript 处理器并使用 Groovy 来执行此操作。我相信你最终可以制作一个正则表达式来匹配你正在寻找的东西,但正如你所注意到的,性能不会很好,如果有更大的流文件进来,你可能会崩溃堆。

在Groovy(或Python/Ruby/etc)中,这将是一个简单的字符串替换操作,如下所示:

import org.apache.commons.io.IOUtils
import java.nio.charset.*

def flowFile = session.get()
if(!flowFile) return

flowFile = session.write(flowFile, {inputStream, outputStream ->
    def elements = IOUtils.toString(inputStream, StandardCharsets.UTF_8).split(",")
    // Rather than hardcoding, you could make the column index also read from a flowfile attribute to make this more generic
    elements[2] = flowfile.getAttribute("myAttributeName")
    def outputString = elements.join(",")

   outputStream.write(outputString.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)