Spring Cloud Stream partitionKeyExpression 计算错误
Spring Cloud Stream partitionKeyExpression wrong calculation
我有一个使用 Kafka 的 Spring 基于 Cloud Stream 的微服务。
我创建了一个有 4 个分区的 kafka 主题。
我在我的 yml 中配置了以下内容:
spring:
cloud:
stream:
bindings:
SYNC_TABLE:
content-type: application/json
partitionKeyExpression: payload.partitionKey
partitionCount: 4
destination: ${envTopicPrefix}.LEGACY_TABLE
在我的代码中,我的消息 class 包含(在其超级 class 中)partitionKey 变量:
@Data
@EqualsAndHashCode(callSuper=true)
@ToString(callSuper=true)
public class TransactionResponse extends GeneralOutputMessage{
}
@Data
@ToString
public class GeneralOutputMessage {
private String operationType;
private List<String> affectedFields;
private Object data;
private String eventId;
private String eventName;
private String partitionKey;
}
我正在将 TransactionsResponse 对象作为消息发送:
final TransactionResponse transactionResponse = handler.handleEvent(event);
if (transactionResponse != null) {
outputChannels.tableSync().send(MessageBuilder.withPayload(transactionResponse).build());
log.info("Message Sent: {}", transactionResponse);
}
我的期望是 spring 云流将获取密钥 payload.partitionKey,计算其 hashCode() % 4,并将事件发送到该分区。
然而,逻辑完全是随机的。这里有几个例子:
Math.abs("111615631".hashCode()%4) = 1。但是,消息被发送到分区号 3。
Math.abs("110019882".hashCode()%4) = 2。但是,消息被发送到分区号 0。
Math.abs("943152574".hashCode()%4) = 0。此消息 确实 发送到分区号 0.
Math.abs("943198862".hashCode()%4) = 0。然而,这个消息被发送到分区号2。
我正在使用 Dalston.SR1 发行版。
我在这里错过了什么?
谢谢。
更新:
刚刚尝试使用相同的 partitionKey(但消息正文略有不同)发送相同的事件。即使分区键相同,消息也会发送到两个不同的分区。看起来 Spring Cloud Stream 完全忽略了 partitionKeyExpression。
这是我的错误,我忘记在 yml 中添加 producer: 部分:
spring:
cloud:
stream:
bindings:
SYNC_TABLE:
content-type: application/json
producer:
partitionKeyExpression: payload.partitionKey
partitionCount: 4
destination: ${envTopicPrefix}.LEGACY_TABLE
我有一个使用 Kafka 的 Spring 基于 Cloud Stream 的微服务。
我创建了一个有 4 个分区的 kafka 主题。
我在我的 yml 中配置了以下内容:
spring:
cloud:
stream:
bindings:
SYNC_TABLE:
content-type: application/json
partitionKeyExpression: payload.partitionKey
partitionCount: 4
destination: ${envTopicPrefix}.LEGACY_TABLE
在我的代码中,我的消息 class 包含(在其超级 class 中)partitionKey 变量:
@Data
@EqualsAndHashCode(callSuper=true)
@ToString(callSuper=true)
public class TransactionResponse extends GeneralOutputMessage{
}
@Data
@ToString
public class GeneralOutputMessage {
private String operationType;
private List<String> affectedFields;
private Object data;
private String eventId;
private String eventName;
private String partitionKey;
}
我正在将 TransactionsResponse 对象作为消息发送:
final TransactionResponse transactionResponse = handler.handleEvent(event);
if (transactionResponse != null) {
outputChannels.tableSync().send(MessageBuilder.withPayload(transactionResponse).build());
log.info("Message Sent: {}", transactionResponse);
}
我的期望是 spring 云流将获取密钥 payload.partitionKey,计算其 hashCode() % 4,并将事件发送到该分区。
然而,逻辑完全是随机的。这里有几个例子:
Math.abs("111615631".hashCode()%4) = 1。但是,消息被发送到分区号 3。
Math.abs("110019882".hashCode()%4) = 2。但是,消息被发送到分区号 0。
Math.abs("943152574".hashCode()%4) = 0。此消息 确实 发送到分区号 0.
Math.abs("943198862".hashCode()%4) = 0。然而,这个消息被发送到分区号2。
我正在使用 Dalston.SR1 发行版。
我在这里错过了什么?
谢谢。
更新:
刚刚尝试使用相同的 partitionKey(但消息正文略有不同)发送相同的事件。即使分区键相同,消息也会发送到两个不同的分区。看起来 Spring Cloud Stream 完全忽略了 partitionKeyExpression。
这是我的错误,我忘记在 yml 中添加 producer: 部分:
spring:
cloud:
stream:
bindings:
SYNC_TABLE:
content-type: application/json
producer:
partitionKeyExpression: payload.partitionKey
partitionCount: 4
destination: ${envTopicPrefix}.LEGACY_TABLE