使用 Groovy 覆盖 NiFi 中的 FlowFile

Using Groovy to overwrite a FlowFile in NiFi

我正在尝试做一些相当简单的事情,从传入的 FlowFile 中读取 i9 PDF 表单,将其中的名字和姓氏解析为 JSON,然后输出 JSON传出的流文件。

我没有找到关于如何执行此操作的官方文档,但有人写了几个 cookbooks on doing things in several scripting languages in NiFi here. 这看起来很简单,我很确定我正在做那里写的,但我甚至根本不确定是否正在阅读 PDF。它只是每次将未修改的 PDF 传递给 REL_SUCCESS。

Link to sample PDF

import java.nio.charset.StandardCharsets
import org.apache.pdfbox.io.IOUtils
import org.apache.pdfbox.pdmodel.PDDocument
import org.apache.pdfbox.util.PDFTextStripperByArea
import java.awt.Rectangle
import org.apache.pdfbox.pdmodel.PDPage
import com.google.gson.Gson
import java.nio.charset.StandardCharsets
def flowFile = session.get()
flowFile = session.write(flowFile, { inputStream, outputStream ->
    try {
        //Load Flowfile contents
        PDDocument document = PDDocument.load(inputStream)
        PDFTextStripperByArea stripper = new PDFTextStripperByArea()
        //Get the first page
        List<PDPage> allPages = document.getDocumentCatalog().getAllPages()
        PDPage page = allPages.get(0)

    //Define the areas to search and add them as search regions
    stripper = new PDFTextStripperByArea()
    Rectangle lname = new Rectangle(25, 226, 240, 15)
    stripper.addRegion("lname", lname)
    Rectangle fname = new Rectangle(276, 226, 240, 15)
    stripper.addRegion("fname", fname)
    //Load the results into a JSON
    def boxMap = [:]
    stripper.setSortByPosition(true)
    stripper.extractRegions(page)
    regions = stripper.getRegions()
    for (String region : regions) {
        String box = stripper.getTextForRegion(region)
        boxMap.put(region, box)
    }
    Gson gson = new Gson()
    //Remove random noise from the output
    json = gson.toJson(boxMap, LinkedHashMap.class)
    json = json.replace('\n', '')
    json = json.replace('\r', '')
    json = json.replace(',"', ',\n"')
    //Overwrite flowfile contents with JSON
    outputStream.write(json.getBytes(StandardCharsets.UTF_8))
    } catch (Exception e){
        System.out.println(e.getMessage())
        session.transfer(flowFile, REL_FAILURE)
    }
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)

编辑: 能够通过插入一个 txt 文件来确认 flowFile 对象正在被正确读取。所以问题似乎是 inputStream 永远不会被移交给 PDDocument 或者当它发生时发生了什么。我编辑了代码,尝试先将其读入 File 对象,但这导致了错误:

FlowFileHandlingException: null is not known in this session

EDIT 编辑: 通过移动我的 try/catch 解决了。我似乎不明白它是如何工作的,我上面的代码已经过编辑并且可以正常工作。

session.get可以return为null,所以一定要在if(!flowFile) return后面加一行。还要把 try/catch 放在 session.write 外面,这样你就可以把 session.transfer(flowFile, REL_SUCCESS) 放在 session.write 之后(在 try 里面)和catch可以转失败。

我也无法从代码中看出 PDFTextStripperByArea 如何从传入文档中获取信息。看起来所有文档内容都在 try 中,因此 PDFTextStripper 将无法使用(并且未传入)。

None 这些东西解释了为什么你在 success 关系上得到原始流文件,但也许有一些我没有看到的东西会被上面的变化神奇地修复:)

此外,如果您使用 log.info()log.error() 而不是 System.out.println,您将在 NiFi 日志中看到输出(对于错误,它将 post处理器的公告,如果将鼠标悬停在处理器的右上角(如果存在公告,则为红色方块)上方,您可以看到该消息。