Spring-kafah 和 kafah 0.10
Spring-kafka and kafka 0.10
我目前正在尝试使用 kafka 和 spring-kafka 来接收消息。
但是我无法为同一主题执行多个消费者并且有几个问题:
1 - 我的消费者往往会在一段时间后断开连接并且无法重新连接
我的消费者经常收到以下警告:
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.poc.crawler.kafka.KafkaListener : Consuming {"some-stuff": "yes"} from topic [job15]
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.p.c.w.services.impl.CrawlingService : Start of crawling
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.p.c.w.services.impl.CrawlingService : Url has already been treated ==> skipping
2017-09-06 15:32:35.054 WARN 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Auto-commit of offsets {job15-3=OffsetAndMetadata{offset=11547, metadata=''}, job15-2=OffsetAndMetadata{offset=15550, metadata=''}} failed for group group-3: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [job15-3, job15-2] for group group-3
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] s.k.l.ConcurrentMessageListenerContainer : partitions revoked:[job15-3, job15-2]
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group group-3
这会导致消费者停下来等待几秒钟。
如消息中所述,我将消费者 session.timeout.ms
增加到 30000
。我仍然收到消息。
正如您在提供的日志中看到的那样,断开连接发生在记录完成其过程之后。
所以...... 30 多岁之前的很多事情。
2- 两个消费者应用程序经常收到相同的消息
在查看我的消费者日志时,我发现他们倾向于处理相同的消息。我知道 Kafka 是 at-least-once
但我从没想过我会遇到很多重复。
希望我使用 redis,但我可能误解了我需要做的一些调整/属性。
代码
注意: 我将 ConcurrentMessageListenerContainer
与 auto-commit=true
一起使用,但 运行 与 1 个线程一起使用。我只是启动了同一个应用程序的几个实例,因为消费者使用的服务不是线程安全的。
KafkaContext.java
@Slf4j
@Configuration
@EnableConfigurationProperties(value = KafkaConfig.class)
class KafkaContext {
@Bean(destroyMethod = "stop")
public ConcurrentMessageListenerContainer kafkaInListener(IKafkaListener listener, KafkaConfig config) {
final ContainerProperties containerProperties =
new ContainerProperties(config.getIn().getTopic());
containerProperties.setMessageListener(listener);
final DefaultKafkaConsumerFactory<Integer, String> defaultKafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(consumerConfigs(config));
final ConcurrentMessageListenerContainer messageListenerContainer =
new ConcurrentMessageListenerContainer<>(defaultKafkaConsumerFactory, containerProperties);
messageListenerContainer.setConcurrency(config.getConcurrency());
messageListenerContainer.setAutoStartup(false);
return messageListenerContainer;
}
private Map<String, Object> consumerConfigs(KafkaConfig config) {
final String kafkaHost = config.getHost() + ":" + config.getPort();
log.info("Crawler_Worker connecting to kafka at {} with consumerGroup {}", kafkaHost, config.getIn().getGroupId());
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getIn().getGroupId());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonNextSerializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);
return props;
}
}
监听器
@Slf4j
@Component
class KafkaListener implements IKafkaListener {
private final ICrawlingService crawlingService;
@Autowired
public KafkaListener(ICrawlingService crawlingService) {
this.crawlingService = crawlingService;
}
@Override
public void onMessage(ConsumerRecord<Integer, Next> consumerRecord) {
log.info("Consuming {} from topic [{}]", JSONObject.wrap(consumerRecord.value()), consumerRecord.topic());
consumerService.apply(consumerRecord.value());
}
}
这里的主要问题是您的消费者群体正在不断重新平衡。您关于增加 session.timeout.ms
的说法是正确的,但我没有在您的配置中看到此配置。尝试删除:
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);
和设置:
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
您可以增加 MAX_POLL_RECORDS_CONFIG
以获得更好的与经纪人沟通的表现。但是,如果您只在一个线程中处理消息,则将此值保持在较低水平会更安全。
我目前正在尝试使用 kafka 和 spring-kafka 来接收消息。
但是我无法为同一主题执行多个消费者并且有几个问题:
1 - 我的消费者往往会在一段时间后断开连接并且无法重新连接
我的消费者经常收到以下警告:
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.poc.crawler.kafka.KafkaListener : Consuming {"some-stuff": "yes"} from topic [job15]
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.p.c.w.services.impl.CrawlingService : Start of crawling
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.p.c.w.services.impl.CrawlingService : Url has already been treated ==> skipping
2017-09-06 15:32:35.054 WARN 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Auto-commit of offsets {job15-3=OffsetAndMetadata{offset=11547, metadata=''}, job15-2=OffsetAndMetadata{offset=15550, metadata=''}} failed for group group-3: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [job15-3, job15-2] for group group-3
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] s.k.l.ConcurrentMessageListenerContainer : partitions revoked:[job15-3, job15-2]
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group group-3
这会导致消费者停下来等待几秒钟。
如消息中所述,我将消费者 session.timeout.ms
增加到 30000
。我仍然收到消息。
正如您在提供的日志中看到的那样,断开连接发生在记录完成其过程之后。
所以...... 30 多岁之前的很多事情。
2- 两个消费者应用程序经常收到相同的消息
在查看我的消费者日志时,我发现他们倾向于处理相同的消息。我知道 Kafka 是 at-least-once
但我从没想过我会遇到很多重复。
希望我使用 redis,但我可能误解了我需要做的一些调整/属性。
代码
注意: 我将 ConcurrentMessageListenerContainer
与 auto-commit=true
一起使用,但 运行 与 1 个线程一起使用。我只是启动了同一个应用程序的几个实例,因为消费者使用的服务不是线程安全的。
KafkaContext.java
@Slf4j
@Configuration
@EnableConfigurationProperties(value = KafkaConfig.class)
class KafkaContext {
@Bean(destroyMethod = "stop")
public ConcurrentMessageListenerContainer kafkaInListener(IKafkaListener listener, KafkaConfig config) {
final ContainerProperties containerProperties =
new ContainerProperties(config.getIn().getTopic());
containerProperties.setMessageListener(listener);
final DefaultKafkaConsumerFactory<Integer, String> defaultKafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(consumerConfigs(config));
final ConcurrentMessageListenerContainer messageListenerContainer =
new ConcurrentMessageListenerContainer<>(defaultKafkaConsumerFactory, containerProperties);
messageListenerContainer.setConcurrency(config.getConcurrency());
messageListenerContainer.setAutoStartup(false);
return messageListenerContainer;
}
private Map<String, Object> consumerConfigs(KafkaConfig config) {
final String kafkaHost = config.getHost() + ":" + config.getPort();
log.info("Crawler_Worker connecting to kafka at {} with consumerGroup {}", kafkaHost, config.getIn().getGroupId());
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getIn().getGroupId());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonNextSerializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);
return props;
}
}
监听器
@Slf4j
@Component
class KafkaListener implements IKafkaListener {
private final ICrawlingService crawlingService;
@Autowired
public KafkaListener(ICrawlingService crawlingService) {
this.crawlingService = crawlingService;
}
@Override
public void onMessage(ConsumerRecord<Integer, Next> consumerRecord) {
log.info("Consuming {} from topic [{}]", JSONObject.wrap(consumerRecord.value()), consumerRecord.topic());
consumerService.apply(consumerRecord.value());
}
}
这里的主要问题是您的消费者群体正在不断重新平衡。您关于增加 session.timeout.ms
的说法是正确的,但我没有在您的配置中看到此配置。尝试删除:
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);
和设置:
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
您可以增加 MAX_POLL_RECORDS_CONFIG
以获得更好的与经纪人沟通的表现。但是,如果您只在一个线程中处理消息,则将此值保持在较低水平会更安全。