@Poller-s 如何在 Spring 集成中工作?
How do @Poller-s work in Spring Integration?
我正在使用两个 PollableChannel
构建 Sprint 集成的实现:
- 常规频道
- 错误通道
从常规频道轮询消息并进行处理。如果在处理过程中出现错误(例如,外部服务不可用),则将消息发送到错误通道。它从错误通道重新排队到常规通道,循环继续,直到消息被成功处理。
想法是不经常轮询错误通道,给处理器一些时间(希望)恢复。
我在下面的测试中模拟了这个工作流程:
package com.Whosebug.questions.sipoller;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.MessageBuilder;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.FIVE_MINUTES;
import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
@SpringBootTest
class SiPollerApplicationTests {
private final static Logger LOG = LoggerFactory.getLogger(SiPollerApplicationTests.class);
private final static String QUEUE_CHANNEL_REGULAR = "queueChannelRegular";
private final static String QUEUE_CHANNEL_ERROR = "queueChannelError";
private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds
private final static AtomicInteger NUMBER_OF_ATTEMPTS = new AtomicInteger();
private final static AtomicInteger NUMBER_OF_SUCCESSES = new AtomicInteger();
private final static List<Instant> ATTEMPT_INSTANTS = Collections.synchronizedList(new ArrayList<>());
@Autowired
@Qualifier(QUEUE_CHANNEL_REGULAR)
private PollableChannel channelRegular;
@Test
void testTimingOfMessageProcessing() {
channelRegular.send(MessageBuilder.withPayload("Test message").build());
await()
.atMost(FIVE_MINUTES)
.with()
.pollInterval(ONE_HUNDRED_MILLISECONDS)
.until(
() -> {
if (NUMBER_OF_SUCCESSES.intValue() == 1) {
reportGaps();
return true;
}
return false;
}
);
}
private void reportGaps() {
List<Long> gaps = IntStream
.range(1, ATTEMPT_INSTANTS.size())
.mapToObj(
i -> Duration
.between(
ATTEMPT_INSTANTS.get(i - 1),
ATTEMPT_INSTANTS.get(i)
)
.toMillis()
)
.collect(Collectors.toList());
LOG.info("Gaps between attempts (in ms): {}", gaps);
}
@Configuration
@EnableIntegration
@Import(SiPollerApplicationTestEndpoint.class)
static class SiPollerApplicationTestConfig {
@Bean(name = QUEUE_CHANNEL_REGULAR)
public PollableChannel queueChannelRegular() {
return MessageChannels.queue(QUEUE_CHANNEL_REGULAR).get();
}
@Bean(name = QUEUE_CHANNEL_ERROR)
public PollableChannel queueChannelError() {
return MessageChannels.queue(QUEUE_CHANNEL_ERROR).get();
}
@Router(
inputChannel = QUEUE_CHANNEL_ERROR,
poller = @Poller(fixedRate = POLLER_PERIOD_ERROR)
)
public String retryProcessing() {
return QUEUE_CHANNEL_REGULAR;
}
}
@MessageEndpoint
static class SiPollerApplicationTestEndpoint {
@Autowired
@Qualifier(QUEUE_CHANNEL_ERROR)
private PollableChannel channelError;
@ServiceActivator(
inputChannel = QUEUE_CHANNEL_REGULAR,
poller = @Poller(fixedRate = POLLER_PERIOD_REGULAR)
)
public void handleMessage(Message<String> message) {
// Count and time attempts
int numberOfAttempts = NUMBER_OF_ATTEMPTS.getAndIncrement();
ATTEMPT_INSTANTS.add(Instant.now());
// First few times - refuse to process message and bounce it into
// error channel
if (numberOfAttempts < 5) {
channelError.send(message);
return;
}
// After that - process message
NUMBER_OF_SUCCESSES.getAndIncrement();
}
}
}
pom.xml
依赖项是:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.2</version>
<scope>test</scope>
</dependency>
</dependencies>
注意 Poller
s 的配置:
private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds
正常通道应该每半秒轮询一次,而错误通道应该每三秒轮询一次。
该测试模拟消息处理期间的中断:前五次处理消息的尝试被拒绝。此外,测试记录了每次处理尝试的 Instant
。最后,在我的机器上,测试输出:
Gaps between attempts (in ms): [1, 0, 0, 0, 0]
换句话说,在每次失败后几乎立即重试消息。
在我看来,我从根本上误解了 Poller
s 在 Spring 集成中的工作原理。所以我的问题是:
- 为什么轮询器配置与实际轮询频率之间存在如此不一致。
- Spring 集成是否提供了一种方法来实现我所描述的模式?
有两个设置可以影响此行为。
QueueChannel
轮询器默认会清空队列; setMaxMessagesPerPoll(1)
每次投票仅接收一条消息。
此外,默认情况下,QueueChannel
默认超时为 1 秒(1000 毫秒)。
所以第一次投票可能比你想象的要早;将其设置为 0
以在队列中没有消息时立即退出。
我正在使用两个 PollableChannel
构建 Sprint 集成的实现:
- 常规频道
- 错误通道
从常规频道轮询消息并进行处理。如果在处理过程中出现错误(例如,外部服务不可用),则将消息发送到错误通道。它从错误通道重新排队到常规通道,循环继续,直到消息被成功处理。
想法是不经常轮询错误通道,给处理器一些时间(希望)恢复。
我在下面的测试中模拟了这个工作流程:
package com.Whosebug.questions.sipoller;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.MessageBuilder;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.FIVE_MINUTES;
import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
@SpringBootTest
class SiPollerApplicationTests {
private final static Logger LOG = LoggerFactory.getLogger(SiPollerApplicationTests.class);
private final static String QUEUE_CHANNEL_REGULAR = "queueChannelRegular";
private final static String QUEUE_CHANNEL_ERROR = "queueChannelError";
private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds
private final static AtomicInteger NUMBER_OF_ATTEMPTS = new AtomicInteger();
private final static AtomicInteger NUMBER_OF_SUCCESSES = new AtomicInteger();
private final static List<Instant> ATTEMPT_INSTANTS = Collections.synchronizedList(new ArrayList<>());
@Autowired
@Qualifier(QUEUE_CHANNEL_REGULAR)
private PollableChannel channelRegular;
@Test
void testTimingOfMessageProcessing() {
channelRegular.send(MessageBuilder.withPayload("Test message").build());
await()
.atMost(FIVE_MINUTES)
.with()
.pollInterval(ONE_HUNDRED_MILLISECONDS)
.until(
() -> {
if (NUMBER_OF_SUCCESSES.intValue() == 1) {
reportGaps();
return true;
}
return false;
}
);
}
private void reportGaps() {
List<Long> gaps = IntStream
.range(1, ATTEMPT_INSTANTS.size())
.mapToObj(
i -> Duration
.between(
ATTEMPT_INSTANTS.get(i - 1),
ATTEMPT_INSTANTS.get(i)
)
.toMillis()
)
.collect(Collectors.toList());
LOG.info("Gaps between attempts (in ms): {}", gaps);
}
@Configuration
@EnableIntegration
@Import(SiPollerApplicationTestEndpoint.class)
static class SiPollerApplicationTestConfig {
@Bean(name = QUEUE_CHANNEL_REGULAR)
public PollableChannel queueChannelRegular() {
return MessageChannels.queue(QUEUE_CHANNEL_REGULAR).get();
}
@Bean(name = QUEUE_CHANNEL_ERROR)
public PollableChannel queueChannelError() {
return MessageChannels.queue(QUEUE_CHANNEL_ERROR).get();
}
@Router(
inputChannel = QUEUE_CHANNEL_ERROR,
poller = @Poller(fixedRate = POLLER_PERIOD_ERROR)
)
public String retryProcessing() {
return QUEUE_CHANNEL_REGULAR;
}
}
@MessageEndpoint
static class SiPollerApplicationTestEndpoint {
@Autowired
@Qualifier(QUEUE_CHANNEL_ERROR)
private PollableChannel channelError;
@ServiceActivator(
inputChannel = QUEUE_CHANNEL_REGULAR,
poller = @Poller(fixedRate = POLLER_PERIOD_REGULAR)
)
public void handleMessage(Message<String> message) {
// Count and time attempts
int numberOfAttempts = NUMBER_OF_ATTEMPTS.getAndIncrement();
ATTEMPT_INSTANTS.add(Instant.now());
// First few times - refuse to process message and bounce it into
// error channel
if (numberOfAttempts < 5) {
channelError.send(message);
return;
}
// After that - process message
NUMBER_OF_SUCCESSES.getAndIncrement();
}
}
}
pom.xml
依赖项是:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.2</version>
<scope>test</scope>
</dependency>
</dependencies>
注意 Poller
s 的配置:
private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds
正常通道应该每半秒轮询一次,而错误通道应该每三秒轮询一次。
该测试模拟消息处理期间的中断:前五次处理消息的尝试被拒绝。此外,测试记录了每次处理尝试的 Instant
。最后,在我的机器上,测试输出:
Gaps between attempts (in ms): [1, 0, 0, 0, 0]
换句话说,在每次失败后几乎立即重试消息。
在我看来,我从根本上误解了 Poller
s 在 Spring 集成中的工作原理。所以我的问题是:
- 为什么轮询器配置与实际轮询频率之间存在如此不一致。
- Spring 集成是否提供了一种方法来实现我所描述的模式?
有两个设置可以影响此行为。
QueueChannel
轮询器默认会清空队列; setMaxMessagesPerPoll(1)
每次投票仅接收一条消息。
此外,默认情况下,QueueChannel
默认超时为 1 秒(1000 毫秒)。
所以第一次投票可能比你想象的要早;将其设置为 0
以在队列中没有消息时立即退出。