对网关的调用在成功时永远不会 return 给调用者

Calls to gateway result never return to caller when successful

我正在使用 Spring 集成 DSL 并且有一个简单的网关:

@MessagingGateway(name = "eventGateway", defaultRequestChannel = "inputChannel")
public interface EventProcessorGateway {
  @Gateway(requestChannel="inputChannel")
  public void processEvent(Message message)
}

我的 spring 集成流程定义为:

@Bean MessageChannel inputChannel() { return new DirectChannel(); }

@Bean MessageChannel errorChannel() { return new DirectChannel(); }

@Bean MessageChannel retryGatewayChannel() { return new DirectChannel(); }

@Bean MessageChannel jsonChannel() { return new DirectChannel(); }

@Bean
public IntegrationFlow postEvents() {
    return IntegrationFlows.from(inputChannel())
            .route("headers.contentType", m -> m.channelMapping(MediaType.APPLICATION_JSON_VALUE, "json")
            )
            .get();
}

@Bean
public IntegrationFlow retryGateway() {
    return IntegrationFlows.from("json")
        .gateway(retryGatewayChannel(), e -> e.advice(retryAdvice()))
        .get();
}

@Bean
public IntegrationFlow transformJsonEvents() {
    return IntegrationFlows
            .from(retryGatewayChannel())
            .transform(new JsonTransformer())
            .handle(new JsonHandler())
            .get();
}

JsonTransformer 是一个简单的 AbstractTransformer,它转换 JSON 数据并将其传递给 JsonHandler。

class JsonHandler extends AbstractMessageHandler {
  public void handleMessageInternal(Message message) throws Exception {
    // do stuff, return nothing if success else throw Exception
  }
}

我通过这样的代码调用我的网关:

try {
  Message<List<EventRecord>> message = MessageBuilder.createMessage(eventList, new MessageHeaders(['contentType': contentType]))
  eventProcessorGateway.processEvent(message)
  logSuccess(eventList)
} catch (Exception e) {
  logError(eventList)
}

我希望整个调用和处理是同步的,并且捕获发生的任何错误,以便我可以适当地处理它们。对网关的调用有效,消息通过 Transformer 发送到 Handler,进行处理,如果发生异常,它会冒泡并被捕获并调用 logError()。但是,如果调用成功,则永远不会调用 logSuccess()。就好比在Handler处理完消息后才执行stops/hangs而从不returns。我实际上不需要得到任何回应,我更关心是否有什么事情无法处理。我是否需要将某些内容发送回初始 EventProcessorGateway?

您的问题在这里:

return IntegrationFlows.from("json")
    .gateway(retryGatewayChannel(), e -> e.advice(retryAdvice()))
    .get();

其中 .gateway()request/reply 因为它是主流的一部分。 它类似于 <chain>.

中的 <gateway>

所以,即使你的主流程是单向的,在里面使用 .gateway() 也需要你的子流程的一些回复,但是这个:

        .handle(new JsonHandler())
        .get();

不会那样做。

因为是one-wayMessageHandler.

从另一方面来说,即使你将最后一个设为 request-reply (AbstractReplyProducingMessageHandler),它也无济于事,因为你不知道如何处理该回复中流后gateway。只因为你的主流是 one-way.

你必须重新考虑你的设计并尝试摆脱那个中流网关。我看到你试图用 retryAdvice() 做一些逻辑。 但是如何将它移动到 .handle(new JsonHandler()) 而不是那个错误的 .gateway()