@Poller-s 如何在 Spring 集成中工作?

How do @Poller-s work in Spring Integration?

我正在使用两个 PollableChannel 构建 Sprint 集成的实现:

  1. 常规频道
  2. 错误通道

从常规频道轮询消息并进行处理。如果在处理过程中出现错误(例如,外部服务不可用),则将消息发送到错误通道。它从错误通道重新排队到常规通道,循环继续,直到消息被成功处理。

想法是不经常轮询错误通道,给处理器一些时间(希望)恢复。

我在下面的测试中模拟了这个工作流程:

package com.Whosebug.questions.sipoller;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.MessageBuilder;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.FIVE_MINUTES;
import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;

@SpringBootTest
class SiPollerApplicationTests {

    private final static Logger LOG = LoggerFactory.getLogger(SiPollerApplicationTests.class);

    private final static String QUEUE_CHANNEL_REGULAR = "queueChannelRegular";
    private final static String QUEUE_CHANNEL_ERROR = "queueChannelError";

    private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
    private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds

    private final static AtomicInteger NUMBER_OF_ATTEMPTS = new AtomicInteger();
    private final static AtomicInteger NUMBER_OF_SUCCESSES = new AtomicInteger();
    private final static List<Instant> ATTEMPT_INSTANTS = Collections.synchronizedList(new ArrayList<>());

    @Autowired
    @Qualifier(QUEUE_CHANNEL_REGULAR)
    private PollableChannel channelRegular;

    @Test
    void testTimingOfMessageProcessing() {
        channelRegular.send(MessageBuilder.withPayload("Test message").build());

        await()
                .atMost(FIVE_MINUTES)
                .with()
                .pollInterval(ONE_HUNDRED_MILLISECONDS)
                .until(
                        () -> {
                            if (NUMBER_OF_SUCCESSES.intValue() == 1) {
                                reportGaps();
                                return true;
                            }
                            return false;
                        }
                );
    }

    private void reportGaps() {
        List<Long> gaps = IntStream
                .range(1, ATTEMPT_INSTANTS.size())
                .mapToObj(
                        i -> Duration
                                .between(
                                        ATTEMPT_INSTANTS.get(i - 1),
                                        ATTEMPT_INSTANTS.get(i)
                                )
                                .toMillis()
                )
                .collect(Collectors.toList());
        LOG.info("Gaps between attempts (in ms): {}", gaps);
    }

    @Configuration
    @EnableIntegration
    @Import(SiPollerApplicationTestEndpoint.class)
    static class SiPollerApplicationTestConfig {

        @Bean(name = QUEUE_CHANNEL_REGULAR)
        public PollableChannel queueChannelRegular() {
            return MessageChannels.queue(QUEUE_CHANNEL_REGULAR).get();
        }

        @Bean(name = QUEUE_CHANNEL_ERROR)
        public PollableChannel queueChannelError() {
            return MessageChannels.queue(QUEUE_CHANNEL_ERROR).get();
        }

        @Router(
                inputChannel = QUEUE_CHANNEL_ERROR,
                poller = @Poller(fixedRate = POLLER_PERIOD_ERROR)
        )
        public String retryProcessing() {
            return QUEUE_CHANNEL_REGULAR;
        }
    }

    @MessageEndpoint
    static class SiPollerApplicationTestEndpoint {

        @Autowired
        @Qualifier(QUEUE_CHANNEL_ERROR)
        private PollableChannel channelError;

        @ServiceActivator(
                inputChannel = QUEUE_CHANNEL_REGULAR,
                poller = @Poller(fixedRate = POLLER_PERIOD_REGULAR)
        )
        public void handleMessage(Message<String> message) {
            // Count and time attempts
            int numberOfAttempts = NUMBER_OF_ATTEMPTS.getAndIncrement();
            ATTEMPT_INSTANTS.add(Instant.now());

            // First few times - refuse to process message and bounce it into
            // error channel
            if (numberOfAttempts < 5) {
                channelError.send(message);
                return;
            }

            // After that - process message
            NUMBER_OF_SUCCESSES.getAndIncrement();
        }
    }

}

pom.xml 依赖项是:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.awaitility</groupId>
            <artifactId>awaitility</artifactId>
            <version>4.0.2</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

注意 Pollers 的配置:

    private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
    private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds

正常通道应该每半秒轮询一次,而错误通道应该每三秒轮询一次。

该测试模拟消息处理期间的中断:前五次处理消息的尝试被拒绝。此外,测试记录了每次处理尝试的 Instant。最后,在我的机器上,测试输出:

Gaps between attempts (in ms): [1, 0, 0, 0, 0]

换句话说,在每次失败后几乎立即重试消息。

在我看来,我从根本上误解了 Pollers 在 Spring 集成中的工作原理。所以我的问题是:

  1. 为什么轮询器配置与实际轮询频率之间存在如此不一致。
  2. Spring 集成是否提供了一种方法来实现我所描述的模式?

有两个设置可以影响此行为。

QueueChannel 轮询器默认会清空队列; setMaxMessagesPerPoll(1) 每次投票仅接收一条消息。

此外,默认情况下,QueueChannel 默认超时为 1 秒(1000 毫秒)。

所以第一次投票可能比你想象的要早;将其设置为 0 以在队列中没有消息时立即退出。