发布者从出站适配器确认后如何将 basicAck 发送到入站适配器

How to send basicAck to inbound adapter after publisher confirm from outbound adapter

我们有一个接收事件通知的入站通道适配器。消费者标准的复杂性限制了我们使用简单路由密钥分发消息的能力,因此应用程序使用拆分器通过直接交换将该消息发送到感兴趣的订阅者队列。

我们想在我们的出站通道适配器上使用发布者确认,以确保交付到客户端队列。我们希望等待发布者确认 ack 原始消息,如果未能收到发布者确认或者 ack==false 我们希望取消来自入站通道适配器的原始消息。

我假设这将在 Rabbit 模板的 confirm-callback 中完成,但我不确定如何完成。 (或者如果可能的话)

<rabbit:connection-factory id="rabbitConnectionFactory" 
        host="${amqpHost}"
        username="${amqpUsername}"
        password="${amqpPassword}"
        virtual-host="${amqpVirtualHost}" 
        publisher-confirms="true" />

<rabbit:template id="rabbitTemplate" 
        connection-factory="rabbitConnectionFactory"
        confirm-callback="PublisherConfirms"    />

<int-amqp:inbound-channel-adapter channel="notificationsFromRabbit" 
        queue-names="#{'${productNotificationQueue}' + '${queueSuffix}'}"
        connection-factory="rabbitConnectionFactory"
        mapped-request-headers="*"
        message-converter="productNotificationMessageConverter"  />

<int:chain input-channel="notificationsFromRabbit" output-channel="notificationsToClients">
        <int:service-activator ref="NotificationRouter" 
                      method="addRecipientsHeaders" />
        <int:splitter ref="NotificationRouter" 
                      method="groupMessages" />
        <int:object-to-json-transformer />
</int:chain>

<int-amqp:outbound-channel-adapter channel="notificationsToClients"
        amqp-template="rabbitTemplate"
        exchange-name="${servicesClientsExchange}"
        routing-key=""
        mapped-request-headers="*"  />

目前我们是 acking groupMessages 方法中的消息,方法是将 Channel 和 Delivery 标记作为参数传递。但是,如果代理从不发送带有 ack=falsereturn 或 returns,那么 nack 来自入站通道适配器的消息为时已晚。

我是否需要一个 bean 来保留 Map<Channel, Long> 的频道和交付标签以在 confirm-callback 中访问,或者是否有其他方式?

当我收到发布者确认时,来自入站通道适配器的通道是否会关闭?

只要挂起消费者线程,直到所有acks/nacks都收到,就可以为所欲为了。

如果您将 notificationsFromRabbit 设为 publish-subscribe 频道,您可以在暂停线程的地方添加另一个订阅者 (service-activator);等待所有 acks/nacks 并采取您想要的行动。

编辑:

您也可以使用 Spring Integration to manage the acks for you,它会将它们作为消息从出站适配器发出(而不是您自己使用回调)。

编辑 2:

然后您可以在相关数据中使用拆分器的序列 size/sequence 编号 headers,以便在收到所有确认后释放消费者。

编辑 3:

像这样的东西应该有用...

在出站适配器上,设置 confirm-correlation-expression="#this"(整个出站消息)。

Class 有两种方法

private final Map<String, BlockingQueue<Boolean> suspenders;

public void suspend(Message<?> original) {
    BlockingQueue<Boolean> bq = new LinkedBlockingQueue();
    String key = someKeyFromOriginal(original);
    suspenders.put(key, bq);
    Boolean result = bq.poll(// some timeout);
    try {
        if (result == null) {
            // timed out
        } 
        else if (!result) {
            // throw some exception to nack the message
        }
    }
    finally {
        suspenders.remove(key);
    }
}

public void ackNack(Message<Message<?>> ackNak) {
    Message<?> theOutbound = ackNak.payload;
    BlockingQueue<Boolean> bq = suspenders.get(someKeyFromOriginal(theOutbound));
    if (bq == null) // late ack/nack; ignore
    else {
        // check the ack/nack header
        // if nack, bq.put(false)
        // else, use another map field, to 
        // keep track of ack count Vs sequenceSize header in 
        // theOutbound; when all acks received, bq.put(true);
    }
}

在第一个方法中挂起消费者线程;将 acks/nacks 从出站适配器路由到第二种方法。

警告:这没有经过测试,只是我的头脑;但应该很接近了。