Spring 启动 Kafka 请求-回复场景

Spring boot Kafka request-reply scenario

我正在实施 request/reply 场景的 POC,以便使用 Kafka 移动基于事件的微服务堆栈。

spring中有 2 个选项。 我想知道哪个更好用。 ReplyingKafkaTemplatecloud-stream

首先是 ReplyingKafkaTemplate ,它可以很容易地配置为有专门的渠道来回复每个实例的主题。 record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, provider.getReplyChannelName().getBytes())); 消费者不需要知道回复主题名称,只需收听一个主题和 returns 给定的回复主题。

@KafkaListener(topics = "${kafka.topic.concat-request}")
@SendTo
public ConcatReply listen(ConcatModel request) {
       .....
}

第二个选项是使用 StreamListenerspring-integrationIntegrationFlows 的组合。应配置网关并过滤回复主题。

@MessagingGateway
public interface StreamGateway {
    @Gateway(requestChannel = START, replyChannel = FILTER, replyTimeout = 5000, requestTimeout = 2000)
    String process(String payload);
}
@Bean
public IntegrationFlow headerEnricherFlow() { 
    return IntegrationFlows.from(START)
            .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
            .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(Channels.INSTANCE_ID ,instanceUUID)) 
            .channel(Channels.REQUEST)
            .get();
}
@Bean
public IntegrationFlow replyFiltererFlow() {
    return IntegrationFlows.from(GatewayChannels.REPLY)
            .filter(Message.class, message -> Channels.INSTANCE_ID.equals(message.getHeaders().get("instanceId")) )
            .channel(FILTER)
            .get();
}

建立回复

@StreamListener(Channels.REQUEST)
@SendTo(Channels.REPLY)
public Message<?> process(Message<String> request) { 

必须指定回复渠道。所以收到的回复主题是根据 instanceID 过滤的,这是一种解决方法(可能会使网络膨胀)。另一方面,通过添加

启用 DLQ 方案
              consumer:
                enableDlq: true

使用 spring 云流在与 RabbitMQ 和其他功能的互操作性方面看起来很有前途,但并未立即正式支持请求回复场景。 Issue 还是open,没有被rejected 也。 (https://github.com/spring-cloud/spring-cloud-stream/issues/1800)

欢迎提出任何建议。

Spring Cloud Stream 不适用于 request/reply;可以做,不简单,还得写代码。

使用 @KafkaListener 框架会为您处理一切。

如果你想让它也能与 RabbitMQ 一起工作,你也可以用 @RabbitListener 注释它。