在回调中发送记录时如何修复 spring-kafka 中的 'Failed to update metadata after xxx ms'
How to fix 'Failed to update metadata after xxx ms' in spring-kafka when send record in callback
spring-kafka无法在回调中发送记录
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
log.error("log error...");
}
@Override
public void onSuccess(SendResult<String, String> result) {
kafkaTemplate.send("anotherTopic", "key", "data");
}
});
当我在 onSuccess() 中调用 kafkaTemplate.send() 时,Kafka 抛出 'failed to update metadata',这不是预期的
您似乎无法在回调线程上执行生产者操作 - kafka-producer-network-thread
- 生产者代码中可能存在一些死锁 - 等待获取将使用同一线程的元数据,因此超时。
您可能需要第二个 KafkaTemaplate
(和生产者工厂,因为默认工厂总是 returns 同一个生产者)。
或者只是在不同的线程上执行第二次发送...
@SpringBootApplication
public class So54492871Application {
private static final ExecutorService exec = Executors.newSingleThreadExecutor();
public static void main(String[] args) {
SpringApplication.run(So54492871Application.class, args);
}
@Bean
public NewTopic topic1() {
return new NewTopic("so54492871-1", 1, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("so54492871-2", 1, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
ListenableFuture<SendResult<String, String>> future = template.send("so54492871-1", "foo");
future.addCallback(result -> {
System.out.println(Thread.currentThread().getName() + ":" + result);
exec.execute(() -> {
ListenableFuture<SendResult<String, String>> future2 = template.send("so54492871-2", "bar");
future2.addCallback(result2 -> {
System.out.println(Thread.currentThread().getName() + ":" + result2);
}, ex -> {
System.out.println(ex.getMessage());
});
});
}, ex -> {
System.out.println(ex.getMessage());
});
System.in.read();
};
}
}
spring-kafka无法在回调中发送记录
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
log.error("log error...");
}
@Override
public void onSuccess(SendResult<String, String> result) {
kafkaTemplate.send("anotherTopic", "key", "data");
}
});
当我在 onSuccess() 中调用 kafkaTemplate.send() 时,Kafka 抛出 'failed to update metadata',这不是预期的
您似乎无法在回调线程上执行生产者操作 - kafka-producer-network-thread
- 生产者代码中可能存在一些死锁 - 等待获取将使用同一线程的元数据,因此超时。
您可能需要第二个 KafkaTemaplate
(和生产者工厂,因为默认工厂总是 returns 同一个生产者)。
或者只是在不同的线程上执行第二次发送...
@SpringBootApplication
public class So54492871Application {
private static final ExecutorService exec = Executors.newSingleThreadExecutor();
public static void main(String[] args) {
SpringApplication.run(So54492871Application.class, args);
}
@Bean
public NewTopic topic1() {
return new NewTopic("so54492871-1", 1, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("so54492871-2", 1, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
ListenableFuture<SendResult<String, String>> future = template.send("so54492871-1", "foo");
future.addCallback(result -> {
System.out.println(Thread.currentThread().getName() + ":" + result);
exec.execute(() -> {
ListenableFuture<SendResult<String, String>> future2 = template.send("so54492871-2", "bar");
future2.addCallback(result2 -> {
System.out.println(Thread.currentThread().getName() + ":" + result2);
}, ex -> {
System.out.println(ex.getMessage());
});
});
}, ex -> {
System.out.println(ex.getMessage());
});
System.in.read();
};
}
}