在Topic的特定分区发送消息

Send message in specific partition of Topic

生产者有什么方法可以将消息发送到代理中主题的特定部分吗?

截至目前,我能够发送具有 2 个分区的主题,但无法控制在特定分区中发送。

Component
@EnableBinding(Source.class)
public class RsvpsKafkaProducer {
  
    private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;

    private final Source source;

    public RsvpsKafkaProducer(Source source) {
        this.source = source;
    }

    public void sendRsvpMessage(WebSocketMessage<?> message) {
        
         System.out.println("sendRsvpMessage");
         source.output()
                .send(MessageBuilder.withPayload(message.getPayload())
                        .build(),
                        SENDING_MESSAGE_TIMEOUT_MS);   
    }
}

application.properties

spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:9093

spring.cloud.stream.bindings.output.destination=meetupTopic
spring.cloud.stream.bindings.output.producer.partitionCount=2

spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.producer.headerMode=raw

有什么方法可以使用 spring 云流来实现吗?我希望一些消息进入 P1 分区,一些消息进入 meetupTopic 内的 P2 分区。

我还没有尝试过,但从 docs 来看,它看起来可能有效。

spring.cloud.stream.bindings.output.producer.partitionSelectorExpression=headers['partitionKey']

然后在发送消息时添加 header partitionKey

MessageBuilder.withPayload(message.getPayload())
                        .setHeader(KafkaHeaders.PARTITION_ID, 23)
                        .build()