如何在@KafkaListeners 中将 groupId 设置为 null
How to set groupId to null in @KafkaListeners
关于:
我正在尝试通过@KafkaListener 阅读紧凑的主题。
我希望每个消费者每次都能阅读整个主题。
我无法为每个消费者生成一个唯一的 groupId。所以我想使用一个空的groupid。
我尝试配置容器和消费者以将 groupId 设置为 null,但均无效。
这是我的容器配置:
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
// Set ackMode to manual and never commit, we are reading from the beginning each time
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.getContainerProperties().setAckOnError(false);
// Remove groupId, we are consuming all partitions here
factory.getContainerProperties().setGroupId(null);
// Enable idle event in order to detect when init phase is over
factory.getContainerProperties().setIdleEventInterval(1000L);
也试过强制消费者配置:
Map<String, Object> consumerProperties = sprinfKafkaProperties.buildConsumerProperties();
// Override group id property to force "null"
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, null);
ConsumerFactory<Object, Object> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
当我将容器 groupId 设置为 null 时,将使用带有侦听器 ID 的默认值。
当我强制消费者使用空 groupId 属性 时,出现错误:
在消费者配置、容器属性或@KafkaListener 注释中找不到 group.id; a group.id 使用群组管理时需要
您不能使用 null group.id
。
group.id
A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.
如果你想每次都从头读起,你可以在容器工厂中添加一个ConsumerAwareRebalanceListener
或者让你的监听器实现ConsumerSeekAware
.
在任何一种情况下,当调用 onPartitionsAssigned
时,从每个 topic/partition 开始查找。
I can not generate an unique groupId each for each consumers.
您可以使用 SpEL 表达式生成 UUID。
编辑
您可以手动分配topics/partitions,然后group.id可以为null。
@SpringBootApplication
public class So56114299Application {
public static void main(String[] args) {
SpringApplication.run(So56114299Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("so56114299", 10, (short) 0);
}
@KafkaListener(topicPartitions = @TopicPartition(topic = "so56114299",
partitions = "#{@finder.partitions('so56114299')}"))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
System.out.println(key + ":" + payload);
}
@Bean
public PartitionFinder finder(ConsumerFactory<String, String> consumerFactory) {
return new PartitionFinder(consumerFactory);
}
public static class PartitionFinder {
public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
this.consumerFactory = consumerFactory;
}
private final ConsumerFactory<String, String> consumerFactory;
public String[] partitions(String topic) {
try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
return consumer.partitionsFor(topic).stream()
.map(pi -> "" + pi.partition())
.toArray(String[]::new);
}
}
}
}
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=manual
关于:
我正在尝试通过@KafkaListener 阅读紧凑的主题。 我希望每个消费者每次都能阅读整个主题。
我无法为每个消费者生成一个唯一的 groupId。所以我想使用一个空的groupid。
我尝试配置容器和消费者以将 groupId 设置为 null,但均无效。
这是我的容器配置:
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
// Set ackMode to manual and never commit, we are reading from the beginning each time
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.getContainerProperties().setAckOnError(false);
// Remove groupId, we are consuming all partitions here
factory.getContainerProperties().setGroupId(null);
// Enable idle event in order to detect when init phase is over
factory.getContainerProperties().setIdleEventInterval(1000L);
也试过强制消费者配置:
Map<String, Object> consumerProperties = sprinfKafkaProperties.buildConsumerProperties();
// Override group id property to force "null"
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, null);
ConsumerFactory<Object, Object> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
当我将容器 groupId 设置为 null 时,将使用带有侦听器 ID 的默认值。
当我强制消费者使用空 groupId 属性 时,出现错误: 在消费者配置、容器属性或@KafkaListener 注释中找不到 group.id; a group.id 使用群组管理时需要
您不能使用 null group.id
。
group.id
A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.
如果你想每次都从头读起,你可以在容器工厂中添加一个ConsumerAwareRebalanceListener
或者让你的监听器实现ConsumerSeekAware
.
在任何一种情况下,当调用 onPartitionsAssigned
时,从每个 topic/partition 开始查找。
I can not generate an unique groupId each for each consumers.
您可以使用 SpEL 表达式生成 UUID。
编辑
您可以手动分配topics/partitions,然后group.id可以为null。
@SpringBootApplication
public class So56114299Application {
public static void main(String[] args) {
SpringApplication.run(So56114299Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("so56114299", 10, (short) 0);
}
@KafkaListener(topicPartitions = @TopicPartition(topic = "so56114299",
partitions = "#{@finder.partitions('so56114299')}"))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
System.out.println(key + ":" + payload);
}
@Bean
public PartitionFinder finder(ConsumerFactory<String, String> consumerFactory) {
return new PartitionFinder(consumerFactory);
}
public static class PartitionFinder {
public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
this.consumerFactory = consumerFactory;
}
private final ConsumerFactory<String, String> consumerFactory;
public String[] partitions(String topic) {
try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
return consumer.partitionsFor(topic).stream()
.map(pi -> "" + pi.partition())
.toArray(String[]::new);
}
}
}
}
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=manual