在Topic的特定分区发送消息
Send message in specific partition of Topic
生产者有什么方法可以将消息发送到代理中主题的特定部分吗?
截至目前,我能够发送具有 2 个分区的主题,但无法控制在特定分区中发送。
Component
@EnableBinding(Source.class)
public class RsvpsKafkaProducer {
private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;
private final Source source;
public RsvpsKafkaProducer(Source source) {
this.source = source;
}
public void sendRsvpMessage(WebSocketMessage<?> message) {
System.out.println("sendRsvpMessage");
source.output()
.send(MessageBuilder.withPayload(message.getPayload())
.build(),
SENDING_MESSAGE_TIMEOUT_MS);
}
}
application.properties
spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:9093
spring.cloud.stream.bindings.output.destination=meetupTopic
spring.cloud.stream.bindings.output.producer.partitionCount=2
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.producer.headerMode=raw
有什么方法可以使用 spring 云流来实现吗?我希望一些消息进入 P1
分区,一些消息进入 meetupTopic
内的 P2
分区。
我还没有尝试过,但从 docs 来看,它看起来可能有效。
spring.cloud.stream.bindings.output.producer.partitionSelectorExpression=headers['partitionKey']
然后在发送消息时添加 header partitionKey
。
MessageBuilder.withPayload(message.getPayload())
.setHeader(KafkaHeaders.PARTITION_ID, 23)
.build()
生产者有什么方法可以将消息发送到代理中主题的特定部分吗?
截至目前,我能够发送具有 2 个分区的主题,但无法控制在特定分区中发送。
Component
@EnableBinding(Source.class)
public class RsvpsKafkaProducer {
private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;
private final Source source;
public RsvpsKafkaProducer(Source source) {
this.source = source;
}
public void sendRsvpMessage(WebSocketMessage<?> message) {
System.out.println("sendRsvpMessage");
source.output()
.send(MessageBuilder.withPayload(message.getPayload())
.build(),
SENDING_MESSAGE_TIMEOUT_MS);
}
}
application.properties
spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:9093
spring.cloud.stream.bindings.output.destination=meetupTopic
spring.cloud.stream.bindings.output.producer.partitionCount=2
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.producer.headerMode=raw
有什么方法可以使用 spring 云流来实现吗?我希望一些消息进入 P1
分区,一些消息进入 meetupTopic
内的 P2
分区。
我还没有尝试过,但从 docs 来看,它看起来可能有效。
spring.cloud.stream.bindings.output.producer.partitionSelectorExpression=headers['partitionKey']
然后在发送消息时添加 header partitionKey
。
MessageBuilder.withPayload(message.getPayload())
.setHeader(KafkaHeaders.PARTITION_ID, 23)
.build()