自定义nifi处理器-流文件的写入
custom nifi processor - writing of flow file
我想创建一个自定义 NiFi
处理器,它可以读取 ESRi ASCII grid files
和 return CSV
就像每个文件的一些元数据和地理参考用户数据的表示WKT 格式。
遗憾的是,解析结果没有作为更新的流文件写回。
遗憾的是,只有原始文件被 returned。转换后的输出不会持久化。
尝试调整它以手动序列化某些 CSV 字符串时,例如:
val lineSep = System.getProperty("line.separator")
val csvResult = result.map(p => p.productIterator.map{
case Some(value) => value
case None => ""
case rest => rest
}.mkString(";")).mkString(lineSep)
var output = session.write(flowFile, new OutputStreamCallback() {
@throws[IOException]
def process(outputStream: OutputStream): Unit = {
IOUtils.write(csvResult, outputStream, "UTF-8")
}
})
仍然没有写任何flowflies。上面的问题仍然存在,或者我得到 outputStream 的 Stream not closed 异常。
一定是少了一点点,但我好像找不到少了的一点。
每个更改流文件的会话方法 session.write() returns 一个新版本的文件,你必须传输这个新版本。
如果您在 converterIngester() 函数中更改文件,则必须 return 将此新版本传递给调用者函数以转移到关系。
我想创建一个自定义 NiFi
处理器,它可以读取 ESRi ASCII grid files
和 return CSV
就像每个文件的一些元数据和地理参考用户数据的表示WKT 格式。
遗憾的是,解析结果没有作为更新的流文件写回。
遗憾的是,只有原始文件被 returned。转换后的输出不会持久化。
尝试调整它以手动序列化某些 CSV 字符串时,例如:
val lineSep = System.getProperty("line.separator")
val csvResult = result.map(p => p.productIterator.map{
case Some(value) => value
case None => ""
case rest => rest
}.mkString(";")).mkString(lineSep)
var output = session.write(flowFile, new OutputStreamCallback() {
@throws[IOException]
def process(outputStream: OutputStream): Unit = {
IOUtils.write(csvResult, outputStream, "UTF-8")
}
})
仍然没有写任何flowflies。上面的问题仍然存在,或者我得到 outputStream 的 Stream not closed 异常。
一定是少了一点点,但我好像找不到少了的一点。
每个更改流文件的会话方法 session.write() returns 一个新版本的文件,你必须传输这个新版本。
如果您在 converterIngester() 函数中更改文件,则必须 return 将此新版本传递给调用者函数以转移到关系。