如果主题不存在,Kafka 生产者会挂起
Kafka producer hangs if topic doesn't exist
我是 Kafka 的新手,正在尝试实现一个简单的生产者,将数据发送到主题。
如果topic不存在,我想异常处理。
private Producer<UUID, Object> producer = createProducer();
private static Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"mybootstrapserveraddress");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "ADAPTER");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
UUIDSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
1000);
props.put(ProducerConfig.RETRIES_CONFIG,
1);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,
1000);
return new KafkaProducer<>(props);
}
public void send(Event event, String topic){
try {
UUID key = UUID.randomUUID();
producer.send(new ProducerRecord<>(topic, key , event), (rm, ex) -> {
if (ex != null) {
log.warn("Error sending message with key {}\n{}", new Object[]{key, ex.getMessage()});
} else {
log.info( "Partition for key-value {} is {}", new Object[]{key, rm.partition()});
}
});
} catch (Exception e) {
log.error("Failed to send message ",e);
} finally {
producer.flush();
}
}
但是,如果主题不存在,则继续轮询消息。 ProducerConfig 的超时和重试将被忽略。
[kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 6 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 7 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 8 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION}
我不想通过 AdminClient 检查主题是否存在。
Kafka 文档 https://kafka.apache.org/documentation/#producerconfigs 没有帮助。
有办法解决这个问题吗?
当主题不存在时,获取元数据的重试应该在 60 秒后结束,默认情况下最后会引发超时异常。
与此相关的生产者配置参数是 max.block.ms
(默认 60000)。
据我所知,除了减少此超时或使用 AdminClient(这是您不想做的事情)之外,没有其他方法可以更早地获得反馈。
如果发布消息有任何问题,Kafka 将抛出 MetadataNotUpdated
异常(它会阻塞 send
方法)。超时可使用以下配置:max.block.ms
。但是,请确保未禁用自动主题创建。
我是 Kafka 的新手,正在尝试实现一个简单的生产者,将数据发送到主题。 如果topic不存在,我想异常处理。
private Producer<UUID, Object> producer = createProducer();
private static Producer createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"mybootstrapserveraddress");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "ADAPTER");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
UUIDSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
1000);
props.put(ProducerConfig.RETRIES_CONFIG,
1);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,
1000);
return new KafkaProducer<>(props);
}
public void send(Event event, String topic){
try {
UUID key = UUID.randomUUID();
producer.send(new ProducerRecord<>(topic, key , event), (rm, ex) -> {
if (ex != null) {
log.warn("Error sending message with key {}\n{}", new Object[]{key, ex.getMessage()});
} else {
log.info( "Partition for key-value {} is {}", new Object[]{key, rm.partition()});
}
});
} catch (Exception e) {
log.error("Failed to send message ",e);
} finally {
producer.flush();
}
}
但是,如果主题不存在,则继续轮询消息。 ProducerConfig 的超时和重试将被忽略。
[kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 6 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 7 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | ADAPTER] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=ADAPTER] Error while fetching metadata with correlation id 8 : {my-example-topic2=UNKNOWN_TOPIC_OR_PARTITION}
我不想通过 AdminClient 检查主题是否存在。 Kafka 文档 https://kafka.apache.org/documentation/#producerconfigs 没有帮助。
有办法解决这个问题吗?
当主题不存在时,获取元数据的重试应该在 60 秒后结束,默认情况下最后会引发超时异常。
与此相关的生产者配置参数是 max.block.ms
(默认 60000)。
据我所知,除了减少此超时或使用 AdminClient(这是您不想做的事情)之外,没有其他方法可以更早地获得反馈。
如果发布消息有任何问题,Kafka 将抛出 MetadataNotUpdated
异常(它会阻塞 send
方法)。超时可使用以下配置:max.block.ms
。但是,请确保未禁用自动主题创建。