Plain Producer 的自定义分区 |卡夫卡流
Custom Partitioner for Plain Producer | Kafka Streams
我有一个 kafka 流应用程序
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
或
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
这是一个 class 用于将消息分发到不同的分区,即使在 kafka 2.4 版本中使用相同的密钥
RoundRobinPartitioner 有这个实现:
public class RoundRobinPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();
public RoundRobinPartitioner() {
}
public void configure(Map<String, ?> configs) {
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}
public void close() {
}
}
我的分区器由完全相同的代码组成,但分区方法实现不同,我的代码块是:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
return Utils.toPositive(nextValue) % numPartitions;
}
当我这样配置时,消息被分发到不同的分区,在两个实现中,但从不使用某些分区。
我有 50 个分区,分区 14 和 34 从未收到消息。我的分区不是 不可用。他们是可用的。当我将 return 分区方法更改为 14 或 34 时,我所有的消息都会转到该分区。可能是什么问题呢 ?两种实现都没有按预期工作。
编辑 1: 我已经尝试使用普通制作人使用 RoundRobinPartitioner。结果是一样的。生产者不能在分区之间平均生产消息,一些分区永远不会被使用。可能是什么原因 ?它不像是缺少配置。
编辑 2: 我调试了 RoundRobinPartitioner 并在 return 处设置了一个断点。当我只生成 1 条消息时,Producer 会生成两次消息。第一次尝试总是 不成功 并且该消息不会进入任何分区。当我点击继续调试时,ConcurrentMap 的索引增加 1。生产者的第二次尝试成功。
partition() 方法被调用了一些我还没有找到的东西。
编辑 3: 这是否与我没有覆盖的 onNewBatch 方法有关?
编辑 4: 此实现适用于 kafka 客户端 2.2,但不适用于 2.4。分区接口没有 onNewBatch 方法。当 key 为 null 2.2 与 2.4 时,DefaultPartitioner 实现发生了变化。会不会跟stick分区有关?
在kafka 2.4客户端版本中使用UniformStickyPartitioner.class。 RoundRobinPartitioner.class 适用于 kafka 2.2 或更低版本。在 2.4 版本中
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, UniformStickyPartitioner.class);
应该使用。我认为这与新的 StickPartitioner 有关。
我有一个 kafka 流应用程序
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
或
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
这是一个 class 用于将消息分发到不同的分区,即使在 kafka 2.4 版本中使用相同的密钥
RoundRobinPartitioner 有这个实现:
public class RoundRobinPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();
public RoundRobinPartitioner() {
}
public void configure(Map<String, ?> configs) {
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}
public void close() {
}
}
我的分区器由完全相同的代码组成,但分区方法实现不同,我的代码块是:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
return Utils.toPositive(nextValue) % numPartitions;
}
当我这样配置时,消息被分发到不同的分区,在两个实现中,但从不使用某些分区。
我有 50 个分区,分区 14 和 34 从未收到消息。我的分区不是 不可用。他们是可用的。当我将 return 分区方法更改为 14 或 34 时,我所有的消息都会转到该分区。可能是什么问题呢 ?两种实现都没有按预期工作。
编辑 1: 我已经尝试使用普通制作人使用 RoundRobinPartitioner。结果是一样的。生产者不能在分区之间平均生产消息,一些分区永远不会被使用。可能是什么原因 ?它不像是缺少配置。
编辑 2: 我调试了 RoundRobinPartitioner 并在 return 处设置了一个断点。当我只生成 1 条消息时,Producer 会生成两次消息。第一次尝试总是 不成功 并且该消息不会进入任何分区。当我点击继续调试时,ConcurrentMap 的索引增加 1。生产者的第二次尝试成功。
partition() 方法被调用了一些我还没有找到的东西。
编辑 3: 这是否与我没有覆盖的 onNewBatch 方法有关?
编辑 4: 此实现适用于 kafka 客户端 2.2,但不适用于 2.4。分区接口没有 onNewBatch 方法。当 key 为 null 2.2 与 2.4 时,DefaultPartitioner 实现发生了变化。会不会跟stick分区有关?
在kafka 2.4客户端版本中使用UniformStickyPartitioner.class。 RoundRobinPartitioner.class 适用于 kafka 2.2 或更低版本。在 2.4 版本中
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, UniformStickyPartitioner.class);
应该使用。我认为这与新的 StickPartitioner 有关。