服务激活器映射到 kafka 成功通道作为输入通道,但无法在 kafka 发送成功时执行

service activator mapped to kafka success channel as input channel but fails to execute on kafka send success

所以我已经为 Kafka 出站消息适配器配置了成功和失败通道,所以我可以根据 kafka 发布的结果进行一些 post 处理

@Bean
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler() {
    KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setHeaderMapper(mapper());
    handler.setLoggingEnabled(TRUE);
    handler.setTopicExpression(
            new SpelExpressionParser()
                    .parseExpression(
                            "headers['" + upstreamType + "'] + '_' + headers['" + upstreamTypeInstance + "']"));
    handler.setMessageKeyExpression(new SpelExpressionParser().parseExpression("payload['key']"));
    handler.setSendSuccessChannel(kafkaPublishSuccessChannel());
    handler.setSendFailureChannel(kafkaFailuresChannel());
    return handler;
} 

然后我将一个服务激活器连接到这个成功通道,它也将成功发送的消息保存到消息存储

@Bean
public SubscribableChannel kafkaPublishSuccessChannel() {
    return MessageChannels.direct("kafkaSuccessChannel").get();
}

@Bean
@ServiceActivator(inputChannel = "kafkaSuccessChannel")
public MongoDbStoringMessageHandler mongoDbOutboundGateway() {
    MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongoDbFactory);
    mongoHandler.setMongoConverter(mongoConverter);
    mongoHandler.setLoggingEnabled(TRUE);
    SpelExpressionParser parser = new SpelExpressionParser();
    mongoHandler.setCollectionNameExpression(
            parser.parseExpression(
                    "headers['" + upstreamType + "'] + '_'+ headers['" + upstreamTypeInstance + "'] + '_' + headers['" + upstreamWebhookSource + "']"));
    return mongoHandler;
}

我希望在发布成功但未发生的情况下调用服务激活器,

@Test
public void testPushNotificationIsSavedToMongo(
        @Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {

    //publish messsge to KAfka TOpic
      ...
    //assert message saved in MongoDB
    assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_some-project")).extracting("key")
            .containsOnly("JRASERVER-2000");
}

最后一个断言失败,在生产者发布到主题后,在日志中我没有看到对成功通道的任何调用。

正如 Gary 在他的评论中所说,sendSuccessChannel 是在与主 JUnit 运行程序不同的线程上异步调用的。确实是Kafka客户端中Future完成的回调

因此,要确保所有内容在发送到 Kafka 后都落在 MongoDB 中,您需要比普通的 findAll() 更复杂的断言。您需要在一段时间内多次重复这样的调用,以确保其他线程已完成将消息发送到该通道并将文档存储到 MongoDb 集合中的工作。

为此,我可以推荐一个我们在自己的测试中真正使用的 Awaitility 工具:https://github.com/awaitility/awaitility