Kafka Streams 会忽略无前缀的 ProducerConfig 属性吗?

Does Kafka Streams ignore unprefixed ProducerConfig properties?

我的 Kafka Streams 配置如下所示:

Properties c = new Properties();
c.put(StreamsConfig.APPLICATION_ID_CONFIG, config.kafka.applicationId);
c.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafka.bootstrapServers);
...
c.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, config.kafka.maxRequestSize);
c.put(ProducerConfig.ACKS_CONFIG, "all");

偶然发现this page and was reminded of the StreamsConfig.producerPrefix method。我没有将此方法应用于我的配置。这是否意味着上面示例中的 ProducerConfig 设置被忽略了?

A quick look to the code 让我觉得我的假设是正确的。有人可以确认吗?

谢谢。

幸运的是我无缘无故地惊慌失措:两种类型的属性都被接受了。以下两行具有相同的效果:

properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");

getClientPropsWithPrefix method of Kafka Streams 结合了提供的生产者属性 (ProducerConfig.configNames()) 和前缀属性 (PRODUCER_PREFIX)。


作为记录,我编写了以下测试 class 并在 KafkaProducer#doSend 方法中添加了一个调试断点并追溯到 StreamsConfig#getProducerConfigs:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.junit.ClassRule;
import org.junit.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import static org.assertj.core.api.Assertions.assertThat;

public class ProducerPropertiesPrefixIntegrationTest {

  @ClassRule
  public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

  private static final String INPUT_TOPIC = "input";
  private static final String OUTPUT_TOPIC = "output";

  @Test
  public void testPrefix() throws Exception {
    CLUSTER.createTopic(INPUT_TOPIC, 1, 1);
    CLUSTER.createTopic("ouput", 3, 1);

    StreamsBuilder streamsBuilder = new StreamsBuilder();
    streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
        .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));

    Properties p = new Properties();
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
    p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());

    p.put(ProducerConfig.ACKS_CONFIG, "1");
    p.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "2048");

    KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), p);
    kafkaStreams.start();

    IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC,
       Collections.singleton(KeyValue.pair("test", "value")), 
       getProducerConfig(), CLUSTER.time);

    List<ConsumerRecord<String, String>> consumerRecords =
       IntegrationTestUtils.waitUntilMinRecordsReceived(
          getConsumerConfig(), OUTPUT_TOPIC, 1, 10000);
    assertThat(consumerRecords).hasSize(1);
    assertThat(consumerRecords.get(0).key()).isEqualTo("test");
    assertThat(consumerRecords.get(0).value()).isEqualTo("value");

    kafkaStreams.close(Duration.ofSeconds(5));
  }

  private Properties getProducerConfig() {
    Properties p = new Properties();
    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return producerProps;
  }

  private Properties getConsumerConfig() {
    Properties c = new Properties();
    c.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    c.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    c.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    c.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return c;
  }
}