Spring 启动 Kafka 请求-回复场景
Spring boot Kafka request-reply scenario
我正在实施 request/reply 场景的 POC,以便使用 Kafka 移动基于事件的微服务堆栈。
spring中有 2 个选项。
我想知道哪个更好用。 ReplyingKafkaTemplate
或 cloud-stream
首先是 ReplyingKafkaTemplate
,它可以很容易地配置为有专门的渠道来回复每个实例的主题。
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, provider.getReplyChannelName().getBytes()));
消费者不需要知道回复主题名称,只需收听一个主题和 returns 给定的回复主题。
@KafkaListener(topics = "${kafka.topic.concat-request}")
@SendTo
public ConcatReply listen(ConcatModel request) {
.....
}
第二个选项是使用 StreamListener
、spring-integration
和 IntegrationFlows
的组合。应配置网关并过滤回复主题。
@MessagingGateway
public interface StreamGateway {
@Gateway(requestChannel = START, replyChannel = FILTER, replyTimeout = 5000, requestTimeout = 2000)
String process(String payload);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(START)
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(Channels.INSTANCE_ID ,instanceUUID))
.channel(Channels.REQUEST)
.get();
}
@Bean
public IntegrationFlow replyFiltererFlow() {
return IntegrationFlows.from(GatewayChannels.REPLY)
.filter(Message.class, message -> Channels.INSTANCE_ID.equals(message.getHeaders().get("instanceId")) )
.channel(FILTER)
.get();
}
建立回复
@StreamListener(Channels.REQUEST)
@SendTo(Channels.REPLY)
public Message<?> process(Message<String> request) {
必须指定回复渠道。所以收到的回复主题是根据 instanceID 过滤的,这是一种解决方法(可能会使网络膨胀)。另一方面,通过添加
启用 DLQ 方案
consumer:
enableDlq: true
使用 spring 云流在与 RabbitMQ 和其他功能的互操作性方面看起来很有前途,但并未立即正式支持请求回复场景。 Issue 还是open,没有被rejected 也。 (https://github.com/spring-cloud/spring-cloud-stream/issues/1800)
欢迎提出任何建议。
Spring Cloud Stream 不适用于 request/reply;可以做,不简单,还得写代码。
使用 @KafkaListener
框架会为您处理一切。
如果你想让它也能与 RabbitMQ 一起工作,你也可以用 @RabbitListener
注释它。
我正在实施 request/reply 场景的 POC,以便使用 Kafka 移动基于事件的微服务堆栈。
spring中有 2 个选项。
我想知道哪个更好用。 ReplyingKafkaTemplate
或 cloud-stream
首先是 ReplyingKafkaTemplate
,它可以很容易地配置为有专门的渠道来回复每个实例的主题。
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, provider.getReplyChannelName().getBytes()));
消费者不需要知道回复主题名称,只需收听一个主题和 returns 给定的回复主题。
@KafkaListener(topics = "${kafka.topic.concat-request}")
@SendTo
public ConcatReply listen(ConcatModel request) {
.....
}
第二个选项是使用 StreamListener
、spring-integration
和 IntegrationFlows
的组合。应配置网关并过滤回复主题。
@MessagingGateway
public interface StreamGateway {
@Gateway(requestChannel = START, replyChannel = FILTER, replyTimeout = 5000, requestTimeout = 2000)
String process(String payload);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(START)
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(Channels.INSTANCE_ID ,instanceUUID))
.channel(Channels.REQUEST)
.get();
}
@Bean
public IntegrationFlow replyFiltererFlow() {
return IntegrationFlows.from(GatewayChannels.REPLY)
.filter(Message.class, message -> Channels.INSTANCE_ID.equals(message.getHeaders().get("instanceId")) )
.channel(FILTER)
.get();
}
建立回复
@StreamListener(Channels.REQUEST)
@SendTo(Channels.REPLY)
public Message<?> process(Message<String> request) {
必须指定回复渠道。所以收到的回复主题是根据 instanceID 过滤的,这是一种解决方法(可能会使网络膨胀)。另一方面,通过添加
启用 DLQ 方案 consumer:
enableDlq: true
使用 spring 云流在与 RabbitMQ 和其他功能的互操作性方面看起来很有前途,但并未立即正式支持请求回复场景。 Issue 还是open,没有被rejected 也。 (https://github.com/spring-cloud/spring-cloud-stream/issues/1800)
欢迎提出任何建议。
Spring Cloud Stream 不适用于 request/reply;可以做,不简单,还得写代码。
使用 @KafkaListener
框架会为您处理一切。
如果你想让它也能与 RabbitMQ 一起工作,你也可以用 @RabbitListener
注释它。