Spring Cloud Stream partitionKeyExpression 计算错误

Spring Cloud Stream partitionKeyExpression wrong calculation

我有一个使用 Kafka 的 Spring 基于 Cloud Stream 的微服务。

我创建了一个有 4 个分区的 kafka 主题。

我在我的 yml 中配置了以下内容:

spring:
  cloud:
    stream:
      bindings:
        SYNC_TABLE:
          content-type: application/json
          partitionKeyExpression: payload.partitionKey
          partitionCount: 4
          destination: ${envTopicPrefix}.LEGACY_TABLE

在我的代码中,我的消息 class 包含(在其超级 class 中)partitionKey 变量:

@Data
@EqualsAndHashCode(callSuper=true)
@ToString(callSuper=true)
public class TransactionResponse extends GeneralOutputMessage{

}

@Data
@ToString
public class GeneralOutputMessage {

    private String operationType;
    private List<String> affectedFields;
    private Object data;
    private String eventId;
    private String eventName;
    private String partitionKey; 
}

我正在将 TransactionsResponse 对象作为消息发送:

final TransactionResponse transactionResponse = handler.handleEvent(event);
if (transactionResponse != null) {
    outputChannels.tableSync().send(MessageBuilder.withPayload(transactionResponse).build());
    log.info("Message Sent: {}", transactionResponse);
}

我的期望是 spring 云流将获取密钥 payload.partitionKey,计算其 hashCode() % 4,并将事件发送到该分区。

然而,逻辑完全是随机的。这里有几个例子:

Math.abs("111615631".hashCode()%4) = 1。但是,消息被发送到分区号 3。

Math.abs("110019882".hashCode()%4) = 2。但是,消息被发送到分区号 0。

Math.abs("943152574".hashCode()%4) = 0。此消息 确实 发送到分区号 0.

Math.abs("943198862".hashCode()%4) = 0。然而,这个消息被发送到分区号2。

我正在使用 Dalston.SR1 发行版。

我在这里错过了什么?

谢谢。

更新:

刚刚尝试使用相同的 partitionKey(但消息正文略有不同)发送相同的事件。即使分区键相同,消息也会发送到两个不同的分区。看起来 Spring Cloud Stream 完全忽略了 partitionKeyExpression。

这是我的错误,我忘记在 yml 中添加 producer: 部分:

spring:
  cloud:
    stream:
      bindings:
        SYNC_TABLE:
          content-type: application/json
          producer:
            partitionKeyExpression: payload.partitionKey
            partitionCount: 4
          destination: ${envTopicPrefix}.LEGACY_TABLE