Kafka Streams 会忽略无前缀的 ProducerConfig 属性吗?
Does Kafka Streams ignore unprefixed ProducerConfig properties?
我的 Kafka Streams 配置如下所示:
Properties c = new Properties();
c.put(StreamsConfig.APPLICATION_ID_CONFIG, config.kafka.applicationId);
c.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafka.bootstrapServers);
...
c.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, config.kafka.maxRequestSize);
c.put(ProducerConfig.ACKS_CONFIG, "all");
偶然发现this page and was reminded of the StreamsConfig.producerPrefix
method。我没有将此方法应用于我的配置。这是否意味着上面示例中的 ProducerConfig 设置被忽略了?
A quick look to the code 让我觉得我的假设是正确的。有人可以确认吗?
谢谢。
幸运的是我无缘无故地惊慌失措:两种类型的属性都被接受了。以下两行具有相同的效果:
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
getClientPropsWithPrefix
method of Kafka Streams 结合了提供的生产者属性 (ProducerConfig.configNames()
) 和前缀属性 (PRODUCER_PREFIX
)。
作为记录,我编写了以下测试 class 并在 KafkaProducer#doSend
方法中添加了一个调试断点并追溯到 StreamsConfig#getProducerConfigs
:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.junit.ClassRule;
import org.junit.Test;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import static org.assertj.core.api.Assertions.assertThat;
public class ProducerPropertiesPrefixIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
private static final String INPUT_TOPIC = "input";
private static final String OUTPUT_TOPIC = "output";
@Test
public void testPrefix() throws Exception {
CLUSTER.createTopic(INPUT_TOPIC, 1, 1);
CLUSTER.createTopic("ouput", 3, 1);
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
Properties p = new Properties();
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
p.put(ProducerConfig.ACKS_CONFIG, "1");
p.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "2048");
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), p);
kafkaStreams.start();
IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC,
Collections.singleton(KeyValue.pair("test", "value")),
getProducerConfig(), CLUSTER.time);
List<ConsumerRecord<String, String>> consumerRecords =
IntegrationTestUtils.waitUntilMinRecordsReceived(
getConsumerConfig(), OUTPUT_TOPIC, 1, 10000);
assertThat(consumerRecords).hasSize(1);
assertThat(consumerRecords.get(0).key()).isEqualTo("test");
assertThat(consumerRecords.get(0).value()).isEqualTo("value");
kafkaStreams.close(Duration.ofSeconds(5));
}
private Properties getProducerConfig() {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return producerProps;
}
private Properties getConsumerConfig() {
Properties c = new Properties();
c.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
c.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
c.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
c.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return c;
}
}
我的 Kafka Streams 配置如下所示:
Properties c = new Properties();
c.put(StreamsConfig.APPLICATION_ID_CONFIG, config.kafka.applicationId);
c.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafka.bootstrapServers);
...
c.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, config.kafka.maxRequestSize);
c.put(ProducerConfig.ACKS_CONFIG, "all");
偶然发现this page and was reminded of the StreamsConfig.producerPrefix
method。我没有将此方法应用于我的配置。这是否意味着上面示例中的 ProducerConfig 设置被忽略了?
A quick look to the code 让我觉得我的假设是正确的。有人可以确认吗?
谢谢。
幸运的是我无缘无故地惊慌失措:两种类型的属性都被接受了。以下两行具有相同的效果:
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
getClientPropsWithPrefix
method of Kafka Streams 结合了提供的生产者属性 (ProducerConfig.configNames()
) 和前缀属性 (PRODUCER_PREFIX
)。
作为记录,我编写了以下测试 class 并在 KafkaProducer#doSend
方法中添加了一个调试断点并追溯到 StreamsConfig#getProducerConfigs
:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.junit.ClassRule;
import org.junit.Test;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import static org.assertj.core.api.Assertions.assertThat;
public class ProducerPropertiesPrefixIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
private static final String INPUT_TOPIC = "input";
private static final String OUTPUT_TOPIC = "output";
@Test
public void testPrefix() throws Exception {
CLUSTER.createTopic(INPUT_TOPIC, 1, 1);
CLUSTER.createTopic("ouput", 3, 1);
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
Properties p = new Properties();
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
p.put(ProducerConfig.ACKS_CONFIG, "1");
p.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "2048");
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), p);
kafkaStreams.start();
IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC,
Collections.singleton(KeyValue.pair("test", "value")),
getProducerConfig(), CLUSTER.time);
List<ConsumerRecord<String, String>> consumerRecords =
IntegrationTestUtils.waitUntilMinRecordsReceived(
getConsumerConfig(), OUTPUT_TOPIC, 1, 10000);
assertThat(consumerRecords).hasSize(1);
assertThat(consumerRecords.get(0).key()).isEqualTo("test");
assertThat(consumerRecords.get(0).value()).isEqualTo("value");
kafkaStreams.close(Duration.ofSeconds(5));
}
private Properties getProducerConfig() {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return producerProps;
}
private Properties getConsumerConfig() {
Properties c = new Properties();
c.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
c.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
c.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
c.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return c;
}
}