对网关的调用在成功时永远不会 return 给调用者
Calls to gateway result never return to caller when successful
我正在使用 Spring 集成 DSL 并且有一个简单的网关:
@MessagingGateway(name = "eventGateway", defaultRequestChannel = "inputChannel")
public interface EventProcessorGateway {
@Gateway(requestChannel="inputChannel")
public void processEvent(Message message)
}
我的 spring 集成流程定义为:
@Bean MessageChannel inputChannel() { return new DirectChannel(); }
@Bean MessageChannel errorChannel() { return new DirectChannel(); }
@Bean MessageChannel retryGatewayChannel() { return new DirectChannel(); }
@Bean MessageChannel jsonChannel() { return new DirectChannel(); }
@Bean
public IntegrationFlow postEvents() {
return IntegrationFlows.from(inputChannel())
.route("headers.contentType", m -> m.channelMapping(MediaType.APPLICATION_JSON_VALUE, "json")
)
.get();
}
@Bean
public IntegrationFlow retryGateway() {
return IntegrationFlows.from("json")
.gateway(retryGatewayChannel(), e -> e.advice(retryAdvice()))
.get();
}
@Bean
public IntegrationFlow transformJsonEvents() {
return IntegrationFlows
.from(retryGatewayChannel())
.transform(new JsonTransformer())
.handle(new JsonHandler())
.get();
}
JsonTransformer 是一个简单的 AbstractTransformer,它转换 JSON 数据并将其传递给 JsonHandler。
class JsonHandler extends AbstractMessageHandler {
public void handleMessageInternal(Message message) throws Exception {
// do stuff, return nothing if success else throw Exception
}
}
我通过这样的代码调用我的网关:
try {
Message<List<EventRecord>> message = MessageBuilder.createMessage(eventList, new MessageHeaders(['contentType': contentType]))
eventProcessorGateway.processEvent(message)
logSuccess(eventList)
} catch (Exception e) {
logError(eventList)
}
我希望整个调用和处理是同步的,并且捕获发生的任何错误,以便我可以适当地处理它们。对网关的调用有效,消息通过 Transformer 发送到 Handler,进行处理,如果发生异常,它会冒泡并被捕获并调用 logError()。但是,如果调用成功,则永远不会调用 logSuccess()。就好比在Handler处理完消息后才执行stops/hangs而从不returns。我实际上不需要得到任何回应,我更关心是否有什么事情无法处理。我是否需要将某些内容发送回初始 EventProcessorGateway?
您的问题在这里:
return IntegrationFlows.from("json")
.gateway(retryGatewayChannel(), e -> e.advice(retryAdvice()))
.get();
其中 .gateway()
是 request/reply
因为它是主流的一部分。
它类似于 <chain>
.
中的 <gateway>
所以,即使你的主流程是单向的,在里面使用 .gateway()
也需要你的子流程的一些回复,但是这个:
.handle(new JsonHandler())
.get();
不会那样做。
因为是one-way
MessageHandler
.
从另一方面来说,即使你将最后一个设为 request-reply
(AbstractReplyProducingMessageHandler
),它也无济于事,因为你不知道如何处理该回复中流后gateway
。只因为你的主流是 one-way
.
你必须重新考虑你的设计并尝试摆脱那个中流网关。我看到你试图用 retryAdvice()
做一些逻辑。
但是如何将它移动到 .handle(new JsonHandler())
而不是那个错误的 .gateway()
?
我正在使用 Spring 集成 DSL 并且有一个简单的网关:
@MessagingGateway(name = "eventGateway", defaultRequestChannel = "inputChannel")
public interface EventProcessorGateway {
@Gateway(requestChannel="inputChannel")
public void processEvent(Message message)
}
我的 spring 集成流程定义为:
@Bean MessageChannel inputChannel() { return new DirectChannel(); }
@Bean MessageChannel errorChannel() { return new DirectChannel(); }
@Bean MessageChannel retryGatewayChannel() { return new DirectChannel(); }
@Bean MessageChannel jsonChannel() { return new DirectChannel(); }
@Bean
public IntegrationFlow postEvents() {
return IntegrationFlows.from(inputChannel())
.route("headers.contentType", m -> m.channelMapping(MediaType.APPLICATION_JSON_VALUE, "json")
)
.get();
}
@Bean
public IntegrationFlow retryGateway() {
return IntegrationFlows.from("json")
.gateway(retryGatewayChannel(), e -> e.advice(retryAdvice()))
.get();
}
@Bean
public IntegrationFlow transformJsonEvents() {
return IntegrationFlows
.from(retryGatewayChannel())
.transform(new JsonTransformer())
.handle(new JsonHandler())
.get();
}
JsonTransformer 是一个简单的 AbstractTransformer,它转换 JSON 数据并将其传递给 JsonHandler。
class JsonHandler extends AbstractMessageHandler {
public void handleMessageInternal(Message message) throws Exception {
// do stuff, return nothing if success else throw Exception
}
}
我通过这样的代码调用我的网关:
try {
Message<List<EventRecord>> message = MessageBuilder.createMessage(eventList, new MessageHeaders(['contentType': contentType]))
eventProcessorGateway.processEvent(message)
logSuccess(eventList)
} catch (Exception e) {
logError(eventList)
}
我希望整个调用和处理是同步的,并且捕获发生的任何错误,以便我可以适当地处理它们。对网关的调用有效,消息通过 Transformer 发送到 Handler,进行处理,如果发生异常,它会冒泡并被捕获并调用 logError()。但是,如果调用成功,则永远不会调用 logSuccess()。就好比在Handler处理完消息后才执行stops/hangs而从不returns。我实际上不需要得到任何回应,我更关心是否有什么事情无法处理。我是否需要将某些内容发送回初始 EventProcessorGateway?
您的问题在这里:
return IntegrationFlows.from("json")
.gateway(retryGatewayChannel(), e -> e.advice(retryAdvice()))
.get();
其中 .gateway()
是 request/reply
因为它是主流的一部分。
它类似于 <chain>
.
<gateway>
所以,即使你的主流程是单向的,在里面使用 .gateway()
也需要你的子流程的一些回复,但是这个:
.handle(new JsonHandler())
.get();
不会那样做。
因为是one-way
MessageHandler
.
从另一方面来说,即使你将最后一个设为 request-reply
(AbstractReplyProducingMessageHandler
),它也无济于事,因为你不知道如何处理该回复中流后gateway
。只因为你的主流是 one-way
.
你必须重新考虑你的设计并尝试摆脱那个中流网关。我看到你试图用 retryAdvice()
做一些逻辑。
但是如何将它移动到 .handle(new JsonHandler())
而不是那个错误的 .gateway()
?