NiFi SpringContextProcessor 中断流

NiFi SpringContextProcessor interrupts the flow

我在我的 NiFi 流中添加了一个 SpringContextProcessor,它按预期执行并更新了 FlowFile 的内容和属性。但是在 NiFi 的数据来源部分,我看到的不是 SEND/RECEIVE,而是

03/27/2017 11:47:57.164 MDT RECEIVE 42fa1c3f-edde-4cb7-8e73-ce752f7e3d66
03/27/2017 11:47:57.163 MDT DROP    667094a7-8eef-4657-981a-dc9fdc6c4056
03/27/2017 11:47:57.163 MDT SEND    667094a7-8eef-4657-981a-dc9fdc6c4056

看起来原来的邮件正在被删除并被新邮件取代。我没有在其他组件中看到这种行为,即它们似乎都保留了原始流文件 UUID。 Spring 处理器代码的简化版本:

@ServiceActivator(inputChannel = "fromNiFi", outputChannel = "toNiFi")
public Message<byte[]> process1(Message<byte[]> inMessage) {
    String inMessagePayload = new String(inMessage.getPayload());
    String userId = getUserIdFromDb(inMessagePayload);
    String outMessagePayload = inMessagePayload + userId;

    return MessageBuilder.withPayload(outMessagePayload.getBytes())
            .copyHeaders(inMessage.getHeaders())
            .setHeader("userId", userId)
            .build();
}

有没有办法在传出消息中保留原始流文件 UUID?

这可能是我们的疏忽,所以是的,请提出一个 JIRA。 但是,作为一种解决方法,您可以尝试从传入消息 headers 中提取 FlowFile 属性,然后将它们传播回传出消息。

public Message<byte[]> process1(Message<byte[]> inMessage) {
    String myHeader = inMessage.getHeader("someHeader");
    . . .
    return MessageBuilder.withPayload(outMessagePayload.getBytes())
            .copyHeaders(inMessage.getHeaders())
            .setHeader("userId", userId)
            .setHeader("someHeader", myHeader)
            .build();
}