自定义nifi处理器-流文件的写入

custom nifi processor - writing of flow file

我想创建一个自定义 NiFi 处理器,它可以读取 ESRi ASCII grid files 和 return CSV 就像每个文件的一些元数据和地理参考用户数据的表示WKT 格式。

遗憾的是,解析结果没有作为更新的流文件写回。

https://github.com/geoHeil/geomesa-nifi/blob/rasterAsciiGridToWKT/geomesa-nifi-processors/src/main/scala/org/geomesa/nifi/geo/AsciiGrid2WKT.scala#L71-L107 是我在 NiFi 中的尝试。

遗憾的是,只有原始文件被 returned。转换后的输出不会持久化。

尝试调整它以手动序列化某些 CSV 字符串时,例如:

  val lineSep = System.getProperty("line.separator")
  val csvResult = result.map(p => p.productIterator.map{
    case Some(value) => value
    case None => ""
    case rest => rest
  }.mkString(";")).mkString(lineSep)

  var output = session.write(flowFile, new OutputStreamCallback() {
    @throws[IOException]
    def process(outputStream: OutputStream): Unit = {
      IOUtils.write(csvResult, outputStream, "UTF-8")
    }
  })

仍然没有写任何flowflies。上面的问题仍然存在,或者我得到 outputStream 的 Stream not closed 异常。

一定是少了一点点,但我好像找不到少了的一点。

每个更改流文件的会话方法 session.write() returns 一个新版本的文件,你必须传输这个新版本。

如果您在 converterIngester() 函数中更改文件,则必须 return 将此新版本传递给调用者函数以转移到关系。