当所有分区都暂停时如何停止容器?
How to stop container when all the partitions are paused?
我有一个用例,我正在根据 API 调用创建动态侦听器。收到某个消息后,我正在单独暂停分区。例如,如果我收到一条偏移量为 100 的消息,那么该分区将被暂停。这将对所有分区完成。
所有分区暂停后,我想停止容器
这是我的代码 -
public class OffsetBasedMessageListener implements ConsumerAwareMessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> consumerRecord, Consumer<?, ?> consumer) {
if (consumerRecord.offset() == 100) {
consumer.pause(Collections.singleton(new TopicPartition(consumerRecord.topic(),
consumerRecord.partition())));
}
}
现在,我配置了一个 EventListener,它将在容器空闲时触发。
@EventListener
public void onIdle(ListenerContainerIdleEvent event) {
Collection<TopicPartition> collection = event.getContainer(ConcurrentMessageListenerContainer.class).getAssignedPartitions();
for (TopicPartition topicPartition: collection) {
System.out.println(
event.getContainer(ConcurrentMessageListenerContainer.class).isPartitionPaused(topicPartition));
}
}
所以,我现在正在做的是检查是否所有分区都已暂停,然后我将停止容器。但是 isPartitionPaused
始终 return 为假,即使它应该 return 为真。
我正在使用 SpringBoot 和 SpringKafka。
有人可以告诉我我做错了什么吗?或者还有其他方法可以实现吗?
谢谢
你不应该直接暂停消费者——容器不知道你已经暂停了它;因此它 returns 错误。
改为在容器上调用 pausePartition
。
请注意,无论哪种方式,消费者都不会真正暂停,直到处理完上一次轮询的所有记录。将 max.poll.records
设置为 1 以立即暂停。
编辑
这对我来说很好用...
@SpringBootApplication
@RestController
public class So67430862Application {
private static final Logger LOG = LoggerFactory.getLogger(So67430862Application.class);
public static void main(String[] args) {
SpringApplication.run(So67430862Application.class, args);
}
Set<ConcurrentMessageListenerContainer<String, String>> containers = ConcurrentHashMap.newKeySet();
int id;
@Autowired
Creator creator;
@PostMapping(path = "/send/{topic}")
public void sendFoo(@PathVariable String topic) {
ConcurrentMessageListenerContainer<String, String> container = this.creator.create(topic);
container.getContainerProperties().setGroupId("group" + ++id);
container.getContainerProperties().setMessageListener(new Listener(container));
this.containers.add(container);
container.start();
}
@EventListener
public void idle(ListenerContainerIdleEvent event) {
ConcurrentMessageListenerContainer<String, String> container =
event.getContainer(ConcurrentMessageListenerContainer.class);
if (this.containers.contains(container)) {
boolean allPaused = container.getAssignedPartitions()
.stream()
.map(part -> container.isPartitionPaused(part))
.allMatch(paused -> paused);
LOG.info("All paused? {}", allPaused);
if (allPaused) {
container.stop(() -> { });
this.containers.remove(container);
}
}
}
// @Bean
// public NewTopic topic() {
// return TopicBuilder.name("so67430862").partitions(5).replicas(1).build();
// }
//
//
// @Bean
// public ApplicationRunner runner(KafkaTemplate<String, String> template) {
// return args -> {
// IntStream.range(0, 5).forEach(p -> {
// IntStream.range(0, 10).forEach(i ->template.send("so67430862", p, null, "test"));
// });
// };
// }
}
@Component
class Creator {
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
Creator(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
this.factory = factory;
}
ConcurrentMessageListenerContainer<String, String> create(String topic) {
return this.factory.createContainer(topic);
}
}
class Listener extends AbstractConsumerSeekAware implements MessageListener<String, String> {
private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
final ConcurrentMessageListenerContainer<String, String> container;
Listener(ConcurrentMessageListenerContainer<String, String> container) {
this.container = container;
}
@Override
public void onMessage(ConsumerRecord<String, String> data) {
LOG.info(ListenerUtils.recordToString(data, true));
if (data.offset() == 5) {
LOG.info("Pausing partition {}", data.partition());
this.container.pausePartition(new TopicPartition(data.topic(), data.partition()));
}
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
spring.kafka.consumer.max-poll-records=1
spring.kafka.listener.idle-event-interval=5000
2021-05-10 11:51:15.792 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@0
2021-05-10 11:51:15.797 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@1
2021-05-10 11:51:15.799 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@2
2021-05-10 11:51:15.801 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@3
2021-05-10 11:51:15.803 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@4
2021-05-10 11:51:15.809 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@5
2021-05-10 11:51:15.809 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 0
2021-05-10 11:51:15.814 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@0
2021-05-10 11:51:15.816 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@1
2021-05-10 11:51:15.819 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@2
2021-05-10 11:51:15.823 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@3
2021-05-10 11:51:15.828 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@4
2021-05-10 11:51:15.831 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@5
2021-05-10 11:51:15.831 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 1
2021-05-10 11:51:15.835 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@0
2021-05-10 11:51:15.838 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@1
2021-05-10 11:51:15.841 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@2
2021-05-10 11:51:15.844 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@3
2021-05-10 11:51:15.846 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@4
2021-05-10 11:51:15.849 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@5
2021-05-10 11:51:15.850 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 4
2021-05-10 11:51:15.854 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@0
2021-05-10 11:51:15.858 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@1
2021-05-10 11:51:15.861 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@2
2021-05-10 11:51:15.864 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@3
2021-05-10 11:51:15.866 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@4
2021-05-10 11:51:15.871 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@5
2021-05-10 11:51:15.871 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 2
2021-05-10 11:51:15.875 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@0
2021-05-10 11:51:15.877 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@1
2021-05-10 11:51:15.880 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@2
2021-05-10 11:51:15.883 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@3
2021-05-10 11:51:15.886 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@4
2021-05-10 11:51:15.889 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@5
2021-05-10 11:51:15.889 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 3
2021-05-10 11:51:20.893 INFO 58682 --- [ consumer-0-C-1] com.example.demo.So67430862Application : All paused? true
2021-05-10 11:51:20.895 INFO 58682 --- [ consumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group1-1, groupId=group1] Revoke previously assigned partitions so67430862-0, so67430862-1, so67430862-4, so67430862-2, so67430862-3
2021-05-10 11:51:20.895 INFO 58682 --- [ consumer-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group1: partitions revoked: [so67430862-0, so67430862-1, so67430862-4, so67430862-2, so67430862-3]
2021-05-10 11:51:20.895 INFO 58682 --- [ consumer-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group1-1, groupId=group1] Member consumer-group1-1-be137c2c-2bbf-4235-a143-15d33a0cfe52 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer unsubscribed from all topics
2021-05-10 11:51:20.896 INFO 58682 --- [ consumer-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group1-1, groupId=group1] Unsubscribed all topics or patterns and assigned partitions
2021-05-10 11:51:20.896 INFO 58682 --- [ consumer-0-C-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService
2021-05-10 11:51:20.897 INFO 58682 --- [ consumer-0-C-1] org.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
2021-05-10 11:51:20.897 INFO 58682 --- [ consumer-0-C-1] org.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2021-05-10 11:51:20.898 INFO 58682 --- [ consumer-0-C-1] org.apache.kafka.common.metrics.Metrics : Metrics reporters closed
2021-05-10 11:51:20.901 INFO 58682 --- [ consumer-0-C-1] o.a.kafka.common.utils.AppInfoParser : App info kafka.consumer for consumer-group1-1 unregistered
2021-05-10 11:51:20.902 INFO 58682 --- [ consumer-0-C-1] essageListenerContainer$ListenerConsumer : group1: Consumer stopped
注释掉的代码创建了一个topic,每个topic有5个分区,10条记录;侦听器在偏移量 5 处暂停每个分区。如您所见,我们没有获得其余记录。
我有一个用例,我正在根据 API 调用创建动态侦听器。收到某个消息后,我正在单独暂停分区。例如,如果我收到一条偏移量为 100 的消息,那么该分区将被暂停。这将对所有分区完成。 所有分区暂停后,我想停止容器
这是我的代码 -
public class OffsetBasedMessageListener implements ConsumerAwareMessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> consumerRecord, Consumer<?, ?> consumer) {
if (consumerRecord.offset() == 100) {
consumer.pause(Collections.singleton(new TopicPartition(consumerRecord.topic(),
consumerRecord.partition())));
}
}
现在,我配置了一个 EventListener,它将在容器空闲时触发。
@EventListener
public void onIdle(ListenerContainerIdleEvent event) {
Collection<TopicPartition> collection = event.getContainer(ConcurrentMessageListenerContainer.class).getAssignedPartitions();
for (TopicPartition topicPartition: collection) {
System.out.println(
event.getContainer(ConcurrentMessageListenerContainer.class).isPartitionPaused(topicPartition));
}
}
所以,我现在正在做的是检查是否所有分区都已暂停,然后我将停止容器。但是 isPartitionPaused
始终 return 为假,即使它应该 return 为真。
我正在使用 SpringBoot 和 SpringKafka。
有人可以告诉我我做错了什么吗?或者还有其他方法可以实现吗?
谢谢
你不应该直接暂停消费者——容器不知道你已经暂停了它;因此它 returns 错误。
改为在容器上调用 pausePartition
。
请注意,无论哪种方式,消费者都不会真正暂停,直到处理完上一次轮询的所有记录。将 max.poll.records
设置为 1 以立即暂停。
编辑
这对我来说很好用...
@SpringBootApplication
@RestController
public class So67430862Application {
private static final Logger LOG = LoggerFactory.getLogger(So67430862Application.class);
public static void main(String[] args) {
SpringApplication.run(So67430862Application.class, args);
}
Set<ConcurrentMessageListenerContainer<String, String>> containers = ConcurrentHashMap.newKeySet();
int id;
@Autowired
Creator creator;
@PostMapping(path = "/send/{topic}")
public void sendFoo(@PathVariable String topic) {
ConcurrentMessageListenerContainer<String, String> container = this.creator.create(topic);
container.getContainerProperties().setGroupId("group" + ++id);
container.getContainerProperties().setMessageListener(new Listener(container));
this.containers.add(container);
container.start();
}
@EventListener
public void idle(ListenerContainerIdleEvent event) {
ConcurrentMessageListenerContainer<String, String> container =
event.getContainer(ConcurrentMessageListenerContainer.class);
if (this.containers.contains(container)) {
boolean allPaused = container.getAssignedPartitions()
.stream()
.map(part -> container.isPartitionPaused(part))
.allMatch(paused -> paused);
LOG.info("All paused? {}", allPaused);
if (allPaused) {
container.stop(() -> { });
this.containers.remove(container);
}
}
}
// @Bean
// public NewTopic topic() {
// return TopicBuilder.name("so67430862").partitions(5).replicas(1).build();
// }
//
//
// @Bean
// public ApplicationRunner runner(KafkaTemplate<String, String> template) {
// return args -> {
// IntStream.range(0, 5).forEach(p -> {
// IntStream.range(0, 10).forEach(i ->template.send("so67430862", p, null, "test"));
// });
// };
// }
}
@Component
class Creator {
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
Creator(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
this.factory = factory;
}
ConcurrentMessageListenerContainer<String, String> create(String topic) {
return this.factory.createContainer(topic);
}
}
class Listener extends AbstractConsumerSeekAware implements MessageListener<String, String> {
private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
final ConcurrentMessageListenerContainer<String, String> container;
Listener(ConcurrentMessageListenerContainer<String, String> container) {
this.container = container;
}
@Override
public void onMessage(ConsumerRecord<String, String> data) {
LOG.info(ListenerUtils.recordToString(data, true));
if (data.offset() == 5) {
LOG.info("Pausing partition {}", data.partition());
this.container.pausePartition(new TopicPartition(data.topic(), data.partition()));
}
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
spring.kafka.consumer.max-poll-records=1
spring.kafka.listener.idle-event-interval=5000
2021-05-10 11:51:15.792 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@0
2021-05-10 11:51:15.797 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@1
2021-05-10 11:51:15.799 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@2
2021-05-10 11:51:15.801 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@3
2021-05-10 11:51:15.803 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@4
2021-05-10 11:51:15.809 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-0@5
2021-05-10 11:51:15.809 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 0
2021-05-10 11:51:15.814 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@0
2021-05-10 11:51:15.816 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@1
2021-05-10 11:51:15.819 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@2
2021-05-10 11:51:15.823 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@3
2021-05-10 11:51:15.828 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@4
2021-05-10 11:51:15.831 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-1@5
2021-05-10 11:51:15.831 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 1
2021-05-10 11:51:15.835 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@0
2021-05-10 11:51:15.838 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@1
2021-05-10 11:51:15.841 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@2
2021-05-10 11:51:15.844 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@3
2021-05-10 11:51:15.846 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@4
2021-05-10 11:51:15.849 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-4@5
2021-05-10 11:51:15.850 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 4
2021-05-10 11:51:15.854 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@0
2021-05-10 11:51:15.858 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@1
2021-05-10 11:51:15.861 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@2
2021-05-10 11:51:15.864 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@3
2021-05-10 11:51:15.866 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@4
2021-05-10 11:51:15.871 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-2@5
2021-05-10 11:51:15.871 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 2
2021-05-10 11:51:15.875 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@0
2021-05-10 11:51:15.877 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@1
2021-05-10 11:51:15.880 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@2
2021-05-10 11:51:15.883 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@3
2021-05-10 11:51:15.886 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@4
2021-05-10 11:51:15.889 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : so67430862-3@5
2021-05-10 11:51:15.889 INFO 58682 --- [ consumer-0-C-1] com.example.demo.Listener : Pausing partition 3
2021-05-10 11:51:20.893 INFO 58682 --- [ consumer-0-C-1] com.example.demo.So67430862Application : All paused? true
2021-05-10 11:51:20.895 INFO 58682 --- [ consumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group1-1, groupId=group1] Revoke previously assigned partitions so67430862-0, so67430862-1, so67430862-4, so67430862-2, so67430862-3
2021-05-10 11:51:20.895 INFO 58682 --- [ consumer-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group1: partitions revoked: [so67430862-0, so67430862-1, so67430862-4, so67430862-2, so67430862-3]
2021-05-10 11:51:20.895 INFO 58682 --- [ consumer-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group1-1, groupId=group1] Member consumer-group1-1-be137c2c-2bbf-4235-a143-15d33a0cfe52 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer unsubscribed from all topics
2021-05-10 11:51:20.896 INFO 58682 --- [ consumer-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group1-1, groupId=group1] Unsubscribed all topics or patterns and assigned partitions
2021-05-10 11:51:20.896 INFO 58682 --- [ consumer-0-C-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService
2021-05-10 11:51:20.897 INFO 58682 --- [ consumer-0-C-1] org.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
2021-05-10 11:51:20.897 INFO 58682 --- [ consumer-0-C-1] org.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2021-05-10 11:51:20.898 INFO 58682 --- [ consumer-0-C-1] org.apache.kafka.common.metrics.Metrics : Metrics reporters closed
2021-05-10 11:51:20.901 INFO 58682 --- [ consumer-0-C-1] o.a.kafka.common.utils.AppInfoParser : App info kafka.consumer for consumer-group1-1 unregistered
2021-05-10 11:51:20.902 INFO 58682 --- [ consumer-0-C-1] essageListenerContainer$ListenerConsumer : group1: Consumer stopped
注释掉的代码创建了一个topic,每个topic有5个分区,10条记录;侦听器在偏移量 5 处暂停每个分区。如您所见,我们没有获得其余记录。