spring-集成 amqp 拒绝消息(如果未处理)

spring-integration amqp reject message if not procced

我需要解决这个问题。我有两个 amqp 消费者集来获取一条消息。

@Bean
public IntegrationFlow jmsPrimaryFlow() {
    return IntegrationFlows.from(
        Amqp.inboundGateway(
            taskManager().getPrimaryMessageListenerContainer()).errorChannel(errorChannel())
        )
        .channel(taskChannel())
        .get();
}

@Bean
public IntegrationFlow jmsSecondaryFlow() {
    return IntegrationFlows.from(
        Amqp.inboundGateway(
            taskManager().getSecondaryMessageListenerContainer()).errorChannel(errorChannel())
            .autoStartup(false)
        )
        .channel(taskChannel())
        .get();
}

taskChannel 是 queuechannel 但一次只允许一条消息使用,因此没有并行处理。 如果另一条消息的处理时间太长,我如何在超时后拒绝一条消息。 所以这条消息将返回队列以由另一个节点继续处理?我的意思是,这两个消费者预取了两条消息,但一次只能处理一条消息,所以如果第一条预取消息的处理时间过长,如何释放第二条预取消息。

你的问题不清楚。您可以在队列通道上设置容量限制(比如 1)并在网关上设置 sendTimeout。然后,如果队列已满,尝试添加消息将在超时后失败。但是,在这种情况下使用队列通道是危险的——如果服务器出现故障,您可能会丢失消息,因为消息一旦放入队列就会被确认。

如果您改用 RendezvousChannel,生产者将阻塞等待消费者接收消息。

但请记住,如果切换后服务器崩溃,即使是这条消息也可能丢失。