通过 Spring-Kafka 列出 Kafka 主题
List Kafka Topics via Spring-Kafka
我们想通过 spring-kafka 列出所有 Kafka 主题,以获得类似于 kafka 命令的结果:
bin/kafka-topics.sh --list --zookeeper localhost:2181
当运行下面服务中的getTopics()方法时,我们得到org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
配置:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
服务:
@Service
public class TopicServiceKafkaImpl implements TopicService {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Override
public Set<String> getTopics() {
try (Consumer<String, String> consumer =
consumerFactory.createConsumer()) {
Map<String, List<PartitionInfo>> map = consumer.listTopics();
return map.keySet();
}
}
Kafka 已启动并且 运行 我们可以从我们的应用程序成功向主题发送消息。
您正在连接到 Zookeeper (2181) 而不是 Kafka(默认情况下为 9092)。
Java kafka 客户端不再直接与 ZK 对话。
kafka-topics --list
是一个 shell 脚本,它只是 kafka.admin.TopicCommand
class 的包装器,您可以在其中找到您正在寻找的方法
或者,您也可以使用AdminClient#listTopics
方法
您可以使用管理客户端列出这样的主题
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(properties);
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
System.out.println("topics:" + adminClient.listTopics(listTopicsOptions).names().get());
我们想通过 spring-kafka 列出所有 Kafka 主题,以获得类似于 kafka 命令的结果:
bin/kafka-topics.sh --list --zookeeper localhost:2181
当运行下面服务中的getTopics()方法时,我们得到org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
配置:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
服务:
@Service
public class TopicServiceKafkaImpl implements TopicService {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Override
public Set<String> getTopics() {
try (Consumer<String, String> consumer =
consumerFactory.createConsumer()) {
Map<String, List<PartitionInfo>> map = consumer.listTopics();
return map.keySet();
}
}
Kafka 已启动并且 运行 我们可以从我们的应用程序成功向主题发送消息。
您正在连接到 Zookeeper (2181) 而不是 Kafka(默认情况下为 9092)。
Java kafka 客户端不再直接与 ZK 对话。
kafka-topics --list
是一个 shell 脚本,它只是 kafka.admin.TopicCommand
class 的包装器,您可以在其中找到您正在寻找的方法
或者,您也可以使用AdminClient#listTopics
方法
您可以使用管理客户端列出这样的主题
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(properties);
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
System.out.println("topics:" + adminClient.listTopics(listTopicsOptions).names().get());