Spring-Kafka - 如何使用注释为监听器指定分区范围?
Spring-Kafka - How can I specify a range of partitions for Listeners using annotations?
我有3个主题,每个主题有50个分区。我想使用@KafkaListener 来指定不同分区的监听器
我应该为每个主题安排一位听众吗?
我应该为一个主题设置多个侦听器吗?如果可以,我如何为一个主题指定一系列分区?
此外,在 3 个主题中,2 个主题的数据比第三个主题多得多,所以我是否应该为这个主题设置更多的听众才能赶上负载?
以及并发数应该怎么选择?
到目前为止我的代码:
@Configuration
@EnableKafka
public class ConsumerConfig {
// Factory to create the consumer classes
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
}
//
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public Listener listener() {
return new Listener();
}
}
You can also configure POJO listeners with explicit topics and partitions (and, optionally, their initial offsets):
@KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
一个监听器可以监听多个主题,但是如果你有不同的消息量,我建议每个主题一个单独的监听器;否则低容量主题可能无法获得所需的 activity.
编辑
您可以使用 SpEL 表达式生成分区数组。
例如;两个听众,一个获得奇数分区,另一个获得偶数分区可能会像这样配置...
@SpringBootApplication
public class So53588657Application {
public static void main(String[] args) {
SpringApplication.run(So53588657Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("so53588657", 50, (short) 1);
}
@KafkaListener(id = "odd", topicPartitions =
@TopicPartition(topic = "so53588657",
partitions = "#{T(com.example.So53588657Application$SplitParts).odds(50)}"))
public void oddParts(String in) {
// ...
}
@KafkaListener(id = "even", topicPartitions =
@TopicPartition(topic = "so53588657",
partitions = "#{T(com.example.So53588657Application$SplitParts).evens(50)}"))
public void evenParts(String in) {
// ...
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> registry.getListenerContainers()
.forEach(c -> c.getAssignedPartitions().forEach(ap -> System.out.println(ap)));
}
public static class SplitParts {
public static String[] odds(int partitions) {
return split(partitions, i -> i % 2 == 0);
}
public static String[] evens(int partitions) {
return split(partitions, i -> i % 2 == 1);
}
private static String[] split(int partitions, IntPredicate predicate) {
return IntStream.range(0, partitions)
.filter(predicate)
.mapToObj(i -> String.valueOf(i))
.collect(Collectors.toList())
.toArray(new String[0]);
}
}
}
或者您可以将它们作为 属性 中的逗号分隔列表提供并使用
partitions = { "#{'${partition.list}'.split(',')}" })
我有3个主题,每个主题有50个分区。我想使用@KafkaListener 来指定不同分区的监听器
我应该为每个主题安排一位听众吗?
我应该为一个主题设置多个侦听器吗?如果可以,我如何为一个主题指定一系列分区?
此外,在 3 个主题中,2 个主题的数据比第三个主题多得多,所以我是否应该为这个主题设置更多的听众才能赶上负载?
以及并发数应该怎么选择?
到目前为止我的代码:
@Configuration
@EnableKafka
public class ConsumerConfig {
// Factory to create the consumer classes
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
}
//
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public Listener listener() {
return new Listener();
}
}
You can also configure POJO listeners with explicit topics and partitions (and, optionally, their initial offsets):
@KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
一个监听器可以监听多个主题,但是如果你有不同的消息量,我建议每个主题一个单独的监听器;否则低容量主题可能无法获得所需的 activity.
编辑
您可以使用 SpEL 表达式生成分区数组。
例如;两个听众,一个获得奇数分区,另一个获得偶数分区可能会像这样配置...
@SpringBootApplication
public class So53588657Application {
public static void main(String[] args) {
SpringApplication.run(So53588657Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("so53588657", 50, (short) 1);
}
@KafkaListener(id = "odd", topicPartitions =
@TopicPartition(topic = "so53588657",
partitions = "#{T(com.example.So53588657Application$SplitParts).odds(50)}"))
public void oddParts(String in) {
// ...
}
@KafkaListener(id = "even", topicPartitions =
@TopicPartition(topic = "so53588657",
partitions = "#{T(com.example.So53588657Application$SplitParts).evens(50)}"))
public void evenParts(String in) {
// ...
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
return args -> registry.getListenerContainers()
.forEach(c -> c.getAssignedPartitions().forEach(ap -> System.out.println(ap)));
}
public static class SplitParts {
public static String[] odds(int partitions) {
return split(partitions, i -> i % 2 == 0);
}
public static String[] evens(int partitions) {
return split(partitions, i -> i % 2 == 1);
}
private static String[] split(int partitions, IntPredicate predicate) {
return IntStream.range(0, partitions)
.filter(predicate)
.mapToObj(i -> String.valueOf(i))
.collect(Collectors.toList())
.toArray(new String[0]);
}
}
}
或者您可以将它们作为 属性 中的逗号分隔列表提供并使用
partitions = { "#{'${partition.list}'.split(',')}" })