NiFi 替换文本过程,尝试用属性值替换第 n 列最终出错
NiFi Replace Text process, trying to replace nth column with an attribute value ends up in error
使用替换文本处理器将 44 列文件中的几列替换为逗号 (,) 作为分隔符。
一个流文件只有一行 44 个字段。
在替换文本处理器中,
我需要用属性更改流文件中的第 3 列。
所以我把它分成 4 组,并用一个属性单独替换第二组的数据。
如果我这样做,处理器会挂起。如何用特定属性或字符串替换第 n 列?
您使用的是什么版本的 NiFi?从 NiFi 1.3.0 开始,您可以使用 "record-aware" 处理器,例如 UpdateRecord. You would configure a CSVReader,可能是通过从 header 行推断字符串字段或为字段提供您自己的 Avro 模式。假设所需 column/field 的名称是 "fname"。在 UpdateRecord 中,您可以将 Replacement Strategy 设置为 "Literal" 并添加名为“/fname”的 user-defined 属性,值为“${filename}”。这应该允许您更新 CSV 文件 in-place,而无需拆分行或处理正则表达式来解析行。
注意:如果使用Apache NiFi 1.3.0+版本,Matt的方式更好
我的建议是使用 ExecuteScript
处理器并使用 Groovy 来执行此操作。我相信你最终可以制作一个正则表达式来匹配你正在寻找的东西,但正如你所注意到的,性能不会很好,如果有更大的流文件进来,你可能会崩溃堆。
在Groovy(或Python/Ruby/etc)中,这将是一个简单的字符串替换操作,如下所示:
import org.apache.commons.io.IOUtils
import java.nio.charset.*
def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inputStream, outputStream ->
def elements = IOUtils.toString(inputStream, StandardCharsets.UTF_8).split(",")
// Rather than hardcoding, you could make the column index also read from a flowfile attribute to make this more generic
elements[2] = flowfile.getAttribute("myAttributeName")
def outputString = elements.join(",")
outputStream.write(outputString.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
使用替换文本处理器将 44 列文件中的几列替换为逗号 (,) 作为分隔符。
一个流文件只有一行 44 个字段。
在替换文本处理器中,
我需要用属性更改流文件中的第 3 列。 所以我把它分成 4 组,并用一个属性单独替换第二组的数据。
如果我这样做,处理器会挂起。如何用特定属性或字符串替换第 n 列?
您使用的是什么版本的 NiFi?从 NiFi 1.3.0 开始,您可以使用 "record-aware" 处理器,例如 UpdateRecord. You would configure a CSVReader,可能是通过从 header 行推断字符串字段或为字段提供您自己的 Avro 模式。假设所需 column/field 的名称是 "fname"。在 UpdateRecord 中,您可以将 Replacement Strategy 设置为 "Literal" 并添加名为“/fname”的 user-defined 属性,值为“${filename}”。这应该允许您更新 CSV 文件 in-place,而无需拆分行或处理正则表达式来解析行。
注意:如果使用Apache NiFi 1.3.0+版本,Matt的方式更好
我的建议是使用 ExecuteScript
处理器并使用 Groovy 来执行此操作。我相信你最终可以制作一个正则表达式来匹配你正在寻找的东西,但正如你所注意到的,性能不会很好,如果有更大的流文件进来,你可能会崩溃堆。
在Groovy(或Python/Ruby/etc)中,这将是一个简单的字符串替换操作,如下所示:
import org.apache.commons.io.IOUtils
import java.nio.charset.*
def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inputStream, outputStream ->
def elements = IOUtils.toString(inputStream, StandardCharsets.UTF_8).split(",")
// Rather than hardcoding, you could make the column index also read from a flowfile attribute to make this more generic
elements[2] = flowfile.getAttribute("myAttributeName")
def outputString = elements.join(",")
outputStream.write(outputString.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)