优先考虑 Kafka 主题
Prioritizing Kafka topic
我需要完整地读取topic1 中的消息,然后再读取topic2 中的消息。我每天都会收到一次关于这些主题的消息。在阅读主题 1 中的所有消息之前,我设法停止阅读主题 2 中的消息,但这只在服务器启动时发生一次。谁能帮我解决这个问题。
ListenerConfig 代码
@EnableKafka
@Configuration
public class ListenerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
@Bean("kafkaListenerContainerTopic1Factory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic1Factory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setIdleEventInterval(60000L);
factory.setBatchListener(true);
return factory;
}
@Bean("kafkaListenerContainerTopic2Factory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic2Factory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
}
监听器代码
@Service
public class Listener {
private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
@Autowired
private KafkaListenerEndpointRegistry registry;
@KafkaListener(id = "first-listener", topics = "topic1", containerFactory = "kafkaListenerContainerTopic1Factory")
public void receive(@Payload List<String> messages,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
for (int i = 0; i < messages.size(); i++) {
LOG.info("received first='{}' with partition-offset='{}'",
messages.get(i), partitions.get(i) + "-" + offsets.get(i));
}
}
@KafkaListener(id = "second-listener", topics = "topic2", containerFactory = "kafkaListenerContaierTopic2Factory" , autoStartup="false" )
public void receiveRel(@Payload List<String> messages,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
for (int i = 0; i < messages.size(); i++) {
LOG.info("received second='{}' with partition-offset='{}'",
messages.get(i), partitions.get(i) + "-" + offsets.get(i));
}
}
@EventListener()
public void eventHandler(ListenerContainerIdleEvent event) {
LOG.info("Inside event");
this.registry.getListenerContainer("second-listener").start();
}
请帮我解决,因为这个循环应该每天都会发生。完整阅读 topic1 消息,然后从 topic2 读取消息。
Kafka 中的主题是发布记录流的抽象。流自然是无界的,因此它们有起点但没有定义的终点。对于您的情况,首先您需要明确定义 topic1
和 topic2
的末端,以便您可以在需要时 stop/presume 您的消费者。也许您知道每个主题要处理多少条消息,因此您可以使用:position or commmited 停止一个消费者并在那一刻假定另一个消费者。或者,如果您使用的是流式框架,他们通常有一个会话 window,其中框架通过 activity 的会话检测组元素。您也可以更愿意将该逻辑放入应用程序端,这样您就不需要 stop/start 任何消费者线程。
您已经在使用空闲事件侦听器来启动第二个侦听器 - 它也应该停止第一个侦听器。
当第二个监听器空闲时;停止它。
您应该检查事件针对哪个容器,以决定停止哪个容器and/or启动。
然后,使用 TaskScheduler
,安排第一个侦听器的 start()
在您希望它启动的下一个时间。
我需要完整地读取topic1 中的消息,然后再读取topic2 中的消息。我每天都会收到一次关于这些主题的消息。在阅读主题 1 中的所有消息之前,我设法停止阅读主题 2 中的消息,但这只在服务器启动时发生一次。谁能帮我解决这个问题。
ListenerConfig 代码
@EnableKafka
@Configuration
public class ListenerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
@Bean("kafkaListenerContainerTopic1Factory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic1Factory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setIdleEventInterval(60000L);
factory.setBatchListener(true);
return factory;
}
@Bean("kafkaListenerContainerTopic2Factory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic2Factory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
}
监听器代码
@Service
public class Listener {
private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
@Autowired
private KafkaListenerEndpointRegistry registry;
@KafkaListener(id = "first-listener", topics = "topic1", containerFactory = "kafkaListenerContainerTopic1Factory")
public void receive(@Payload List<String> messages,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
for (int i = 0; i < messages.size(); i++) {
LOG.info("received first='{}' with partition-offset='{}'",
messages.get(i), partitions.get(i) + "-" + offsets.get(i));
}
}
@KafkaListener(id = "second-listener", topics = "topic2", containerFactory = "kafkaListenerContaierTopic2Factory" , autoStartup="false" )
public void receiveRel(@Payload List<String> messages,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
for (int i = 0; i < messages.size(); i++) {
LOG.info("received second='{}' with partition-offset='{}'",
messages.get(i), partitions.get(i) + "-" + offsets.get(i));
}
}
@EventListener()
public void eventHandler(ListenerContainerIdleEvent event) {
LOG.info("Inside event");
this.registry.getListenerContainer("second-listener").start();
}
请帮我解决,因为这个循环应该每天都会发生。完整阅读 topic1 消息,然后从 topic2 读取消息。
Kafka 中的主题是发布记录流的抽象。流自然是无界的,因此它们有起点但没有定义的终点。对于您的情况,首先您需要明确定义 topic1
和 topic2
的末端,以便您可以在需要时 stop/presume 您的消费者。也许您知道每个主题要处理多少条消息,因此您可以使用:position or commmited 停止一个消费者并在那一刻假定另一个消费者。或者,如果您使用的是流式框架,他们通常有一个会话 window,其中框架通过 activity 的会话检测组元素。您也可以更愿意将该逻辑放入应用程序端,这样您就不需要 stop/start 任何消费者线程。
您已经在使用空闲事件侦听器来启动第二个侦听器 - 它也应该停止第一个侦听器。
当第二个监听器空闲时;停止它。
您应该检查事件针对哪个容器,以决定停止哪个容器and/or启动。
然后,使用 TaskScheduler
,安排第一个侦听器的 start()
在您希望它启动的下一个时间。