服务激活器映射到 kafka 成功通道作为输入通道,但无法在 kafka 发送成功时执行
service activator mapped to kafka success channel as input channel but fails to execute on kafka send success
所以我已经为 Kafka 出站消息适配器配置了成功和失败通道,所以我可以根据 kafka 发布的结果进行一些 post 处理
@Bean
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler() {
KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setHeaderMapper(mapper());
handler.setLoggingEnabled(TRUE);
handler.setTopicExpression(
new SpelExpressionParser()
.parseExpression(
"headers['" + upstreamType + "'] + '_' + headers['" + upstreamTypeInstance + "']"));
handler.setMessageKeyExpression(new SpelExpressionParser().parseExpression("payload['key']"));
handler.setSendSuccessChannel(kafkaPublishSuccessChannel());
handler.setSendFailureChannel(kafkaFailuresChannel());
return handler;
}
然后我将一个服务激活器连接到这个成功通道,它也将成功发送的消息保存到消息存储
@Bean
public SubscribableChannel kafkaPublishSuccessChannel() {
return MessageChannels.direct("kafkaSuccessChannel").get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaSuccessChannel")
public MongoDbStoringMessageHandler mongoDbOutboundGateway() {
MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongoDbFactory);
mongoHandler.setMongoConverter(mongoConverter);
mongoHandler.setLoggingEnabled(TRUE);
SpelExpressionParser parser = new SpelExpressionParser();
mongoHandler.setCollectionNameExpression(
parser.parseExpression(
"headers['" + upstreamType + "'] + '_'+ headers['" + upstreamTypeInstance + "'] + '_' + headers['" + upstreamWebhookSource + "']"));
return mongoHandler;
}
我希望在发布成功但未发生的情况下调用服务激活器,
@Test
public void testPushNotificationIsSavedToMongo(
@Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {
//publish messsge to KAfka TOpic
...
//assert message saved in MongoDB
assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_some-project")).extracting("key")
.containsOnly("JRASERVER-2000");
}
最后一个断言失败,在生产者发布到主题后,在日志中我没有看到对成功通道的任何调用。
正如 Gary 在他的评论中所说,sendSuccessChannel
是在与主 JUnit 运行程序不同的线程上异步调用的。确实是Kafka客户端中Future
完成的回调
因此,要确保所有内容在发送到 Kafka 后都落在 MongoDB 中,您需要比普通的 findAll()
更复杂的断言。您需要在一段时间内多次重复这样的调用,以确保其他线程已完成将消息发送到该通道并将文档存储到 MongoDb 集合中的工作。
为此,我可以推荐一个我们在自己的测试中真正使用的 Awaitility 工具:https://github.com/awaitility/awaitility
所以我已经为 Kafka 出站消息适配器配置了成功和失败通道,所以我可以根据 kafka 发布的结果进行一些 post 处理
@Bean
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler() {
KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setHeaderMapper(mapper());
handler.setLoggingEnabled(TRUE);
handler.setTopicExpression(
new SpelExpressionParser()
.parseExpression(
"headers['" + upstreamType + "'] + '_' + headers['" + upstreamTypeInstance + "']"));
handler.setMessageKeyExpression(new SpelExpressionParser().parseExpression("payload['key']"));
handler.setSendSuccessChannel(kafkaPublishSuccessChannel());
handler.setSendFailureChannel(kafkaFailuresChannel());
return handler;
}
然后我将一个服务激活器连接到这个成功通道,它也将成功发送的消息保存到消息存储
@Bean
public SubscribableChannel kafkaPublishSuccessChannel() {
return MessageChannels.direct("kafkaSuccessChannel").get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaSuccessChannel")
public MongoDbStoringMessageHandler mongoDbOutboundGateway() {
MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongoDbFactory);
mongoHandler.setMongoConverter(mongoConverter);
mongoHandler.setLoggingEnabled(TRUE);
SpelExpressionParser parser = new SpelExpressionParser();
mongoHandler.setCollectionNameExpression(
parser.parseExpression(
"headers['" + upstreamType + "'] + '_'+ headers['" + upstreamTypeInstance + "'] + '_' + headers['" + upstreamWebhookSource + "']"));
return mongoHandler;
}
我希望在发布成功但未发生的情况下调用服务激活器,
@Test
public void testPushNotificationIsSavedToMongo(
@Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {
//publish messsge to KAfka TOpic
...
//assert message saved in MongoDB
assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_some-project")).extracting("key")
.containsOnly("JRASERVER-2000");
}
最后一个断言失败,在生产者发布到主题后,在日志中我没有看到对成功通道的任何调用。
正如 Gary 在他的评论中所说,sendSuccessChannel
是在与主 JUnit 运行程序不同的线程上异步调用的。确实是Kafka客户端中Future
完成的回调
因此,要确保所有内容在发送到 Kafka 后都落在 MongoDB 中,您需要比普通的 findAll()
更复杂的断言。您需要在一段时间内多次重复这样的调用,以确保其他线程已完成将消息发送到该通道并将文档存储到 MongoDb 集合中的工作。
为此,我可以推荐一个我们在自己的测试中真正使用的 Awaitility 工具:https://github.com/awaitility/awaitility