NiFi SpringContextProcessor 中断流
NiFi SpringContextProcessor interrupts the flow
我在我的 NiFi 流中添加了一个 SpringContextProcessor,它按预期执行并更新了 FlowFile 的内容和属性。但是在 NiFi 的数据来源部分,我看到的不是 SEND/RECEIVE,而是
03/27/2017 11:47:57.164 MDT RECEIVE 42fa1c3f-edde-4cb7-8e73-ce752f7e3d66
03/27/2017 11:47:57.163 MDT DROP 667094a7-8eef-4657-981a-dc9fdc6c4056
03/27/2017 11:47:57.163 MDT SEND 667094a7-8eef-4657-981a-dc9fdc6c4056
看起来原来的邮件正在被删除并被新邮件取代。我没有在其他组件中看到这种行为,即它们似乎都保留了原始流文件 UUID。 Spring 处理器代码的简化版本:
@ServiceActivator(inputChannel = "fromNiFi", outputChannel = "toNiFi")
public Message<byte[]> process1(Message<byte[]> inMessage) {
String inMessagePayload = new String(inMessage.getPayload());
String userId = getUserIdFromDb(inMessagePayload);
String outMessagePayload = inMessagePayload + userId;
return MessageBuilder.withPayload(outMessagePayload.getBytes())
.copyHeaders(inMessage.getHeaders())
.setHeader("userId", userId)
.build();
}
有没有办法在传出消息中保留原始流文件 UUID?
这可能是我们的疏忽,所以是的,请提出一个 JIRA。
但是,作为一种解决方法,您可以尝试从传入消息 headers 中提取 FlowFile 属性,然后将它们传播回传出消息。
public Message<byte[]> process1(Message<byte[]> inMessage) {
String myHeader = inMessage.getHeader("someHeader");
. . .
return MessageBuilder.withPayload(outMessagePayload.getBytes())
.copyHeaders(inMessage.getHeaders())
.setHeader("userId", userId)
.setHeader("someHeader", myHeader)
.build();
}
我在我的 NiFi 流中添加了一个 SpringContextProcessor,它按预期执行并更新了 FlowFile 的内容和属性。但是在 NiFi 的数据来源部分,我看到的不是 SEND/RECEIVE,而是
03/27/2017 11:47:57.164 MDT RECEIVE 42fa1c3f-edde-4cb7-8e73-ce752f7e3d66
03/27/2017 11:47:57.163 MDT DROP 667094a7-8eef-4657-981a-dc9fdc6c4056
03/27/2017 11:47:57.163 MDT SEND 667094a7-8eef-4657-981a-dc9fdc6c4056
看起来原来的邮件正在被删除并被新邮件取代。我没有在其他组件中看到这种行为,即它们似乎都保留了原始流文件 UUID。 Spring 处理器代码的简化版本:
@ServiceActivator(inputChannel = "fromNiFi", outputChannel = "toNiFi")
public Message<byte[]> process1(Message<byte[]> inMessage) {
String inMessagePayload = new String(inMessage.getPayload());
String userId = getUserIdFromDb(inMessagePayload);
String outMessagePayload = inMessagePayload + userId;
return MessageBuilder.withPayload(outMessagePayload.getBytes())
.copyHeaders(inMessage.getHeaders())
.setHeader("userId", userId)
.build();
}
有没有办法在传出消息中保留原始流文件 UUID?
这可能是我们的疏忽,所以是的,请提出一个 JIRA。 但是,作为一种解决方法,您可以尝试从传入消息 headers 中提取 FlowFile 属性,然后将它们传播回传出消息。
public Message<byte[]> process1(Message<byte[]> inMessage) {
String myHeader = inMessage.getHeader("someHeader");
. . .
return MessageBuilder.withPayload(outMessagePayload.getBytes())
.copyHeaders(inMessage.getHeaders())
.setHeader("userId", userId)
.setHeader("someHeader", myHeader)
.build();
}