如果 Kafka 和 Zookeeper 不是 运行,为什么 Maven Clean Install 不会完成?

Why won't Maven Clean Install complete if Kafka and Zookeeper are not running?

为了学习 Apache Kafka,我开发了一个 Spring Boot 应用程序,如果我将 POST 请求发送到调用 KafkaTemplate 发送方法的控制器。我是 运行ning Ubuntu 19.04,并在本地成功设置和安装了 KafkaZookeeper。一切正常。

问题发生在我关闭 ZookeeperKafka 时。如果我这样做,那么在启动时,我的应用程序的 Kafka AdminClient 会定期尝试查找兄弟,但会将此消息发送到控制台

Connection to node -1 could not be established. Broker may not be available.

我实施了此处建议的修复 and here 。但是,如果我 运行 一个 maven clean install 那么如果 ZookeeperKafka 不是 运行,则构建永远不会完成。这是为什么?有没有一种方法可以配置应用程序,使其在启动时检查 Kafka 可用性并在服务不可用时优雅地处理?

这是调用 KafkaTemplate

的服务 class
@Autowired
public PingMessageServiceImpl(KafkaTemplate kafkaTemplate, KafkaTopicConfiguration kafkaTopicConfiguration) {
    this.kafkaTemplate = kafkaTemplate;
    this.kafkaTopicConfiguration = kafkaTopicConfiguration;
}

@Override
public void sendMessage(String message) {
    log.info(String.format("Received following ping message %s", message));

    if (!isValidPingRequest(message)) {
        log.warn("Received invalid ping request");
        throw new InvalidPingRequestException();
    }
    log.info(String.format("Sending message=[%s]", message));
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(kafkaTopicConfiguration.getPingTopic(), message);
    future.addCallback(buildListenableFutureCallback(message));
}

private boolean isValidPingRequest(String message) {
    return "ping".equalsIgnoreCase(message);
}

private ListenableFutureCallback<SendResult<String, String>> buildListenableFutureCallback(String message) {
    return new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            log.info(String.format("Sent message=[%s] with offset=[%d]", message, result.getRecordMetadata().offset()));
        }
        @Override
        public void onFailure(Throwable ex) {
            log.info(String.format("Unable to send message=[%s] due to %s", message, ex.getMessage()));
        }
    };
}

这是我用来从属性文件中提取 Kafka 配置属性的配置 class

@NotNull(message = "bootstrapAddress cannot be null")
@NotBlank(message = "bootstrapAddress cannot be blank")
private String bootstrapAddress;

@NotNull(message = "pingTopic cannot be null")
@NotBlank(message = "pingTopic cannot be blank")
private String pingTopic;

@NotNull(message = "reconnectBackoffMs cannot be null")
@NotBlank(message = "reconnectBackoffMs cannot be blank")
@Value("${kafka.reconnect.backoff.ms}")
private String reconnectBackoffMs;

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configurations = new HashMap<>();
    configurations.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configurations.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoffMs);
    return new KafkaAdmin(configurations);
}

@Bean
public NewTopic pingTopic() {
    return new NewTopic(pingTopic, 1, (short) 1);
}

@PostConstruct
private void displayOnStartup() {
    log.info(String.format("bootstrapAddress is %s", bootstrapAddress));
    log.info(String.format("reconnectBackoffMs is %s", reconnectBackoffMs));
}

如果在加载 ApplicationContext spring kafka bean 时进行任何 Spring-boot 集成测试,例如 KafakTemplateKafkaAdmin 将尝试连接 kafka 服务器具有 ymlproperties 文件

中指定的属性

因此,为了避免这种情况,您可以使用 ,这样 kafka bean 将在测试执行期间连接到嵌入式服务器。

或者简单地说,您可以在集成测试用例中使用 @MockBean 注释模拟 KafakTemplateKafkaAdmin bean