具有单个分区的 Kafka Streams 在出错时暂停
Kafka Streams with single partition to pause on error
我有一个带有单个分区的 Kafka 代理。要求是执行以下操作:
- 从此分区读取
- 通过调用 REST 转换消息 API
- 将转换后的消息发布到另一个 REST API
- 将响应消息推送到另一个主题
我正在使用 Kafka Streams 使用以下代码实现此目的
StreamsBuilder builder = new StreamsBuilder();`
KStream<Object, Object> consumerStream = builder.stream(kafkaConfiguration.getConsumerTopic());
consumerStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerStream.to(kafkaConfiguration.getProducerTopic(), Produced.with(lStringKeySerde, lAvroValueSerde));
return builder.build();
以下是我的配置:
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", bootstrapServers));
if (schemaRegistry != null && schemaRegistry.length > 0) {
streamsConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, String.join(",", schemaRegistry));
}
streamsConfig.put(this.keySerializerKeyName, keyStringSerializerClassName);
streamsConfig.put(this.valueSerialzerKeyName, valueAVROSerializerClassName);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
streamsConfig.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
streamsConfig.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
streamsConfig.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
streamsConfig.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,StreamsConfig.OPTIMIZE);
streamsConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionMode);
streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
我一直在寻找一种机制来在我的 KeyValueMapper 中执行以下操作:
- 如果任何 REST API 挂了,我就会捕获异常
- 我希望相同的偏移量一直循环直到系统恢复或暂停消耗直到系统恢复
我检查了以下 link,但它们似乎没有帮助。
How to run kafka streams effectively with single app instance and single topic partitions?
下面 link 讨论了 KafkaTransactionManager 但那是行不通的我猜 KStream 上面的初始化方式
在此方向上的任何帮助/指示将不胜感激。
你想做的事情不被真正支持。在 Kafka Streams 中暂停消费者是不可能的。
如果您使用 KeyValueMapper
循环,您只能 "halt" 处理,但是,对于这种情况,消费者可能会退出消费者组。对于您的情况,使用单个输入主题分区并且无论如何只能在单个 KafkaStreams
实例中有一个线程,因此,它不会影响该组的任何其他成员(因为有 none) .但是,问题是在线程退出组后提交偏移量将失败。因此,在线程重新加入组后,它将获取一个较旧的偏移量并重新处理一些数据(即,您得到重复的数据处理)。为避免退出消费者组,您可以将 max.poll.interval.ms
配置设置为较高的值(甚至可能 Integer.MAX_VALUE
)——假设您在消费者组中只有一个成员,请设置较高的值应该没问题。
另一种选择可能是将 transform()
与状态存储一起使用。如果您无法进行 REST 调用,则将数据放入存储中并稍后重试。这样消费者就不会退出群组。但是,读取新数据永远不会停止,您需要缓冲存储中的所有数据,直到可以再次调用 REST API。您应该能够在 Transformer
中通过 "sleeping" 减慢读取新数据的速度(以减少需要缓冲的数据量)——您只需要确保不违反 max.poll.interval.ms
配置(默认为 30 秒)。
我有一个带有单个分区的 Kafka 代理。要求是执行以下操作:
- 从此分区读取
- 通过调用 REST 转换消息 API
- 将转换后的消息发布到另一个 REST API
- 将响应消息推送到另一个主题
我正在使用 Kafka Streams 使用以下代码实现此目的
StreamsBuilder builder = new StreamsBuilder();`
KStream<Object, Object> consumerStream = builder.stream(kafkaConfiguration.getConsumerTopic());
consumerStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerStream.to(kafkaConfiguration.getProducerTopic(), Produced.with(lStringKeySerde, lAvroValueSerde));
return builder.build();
以下是我的配置:
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", bootstrapServers));
if (schemaRegistry != null && schemaRegistry.length > 0) {
streamsConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, String.join(",", schemaRegistry));
}
streamsConfig.put(this.keySerializerKeyName, keyStringSerializerClassName);
streamsConfig.put(this.valueSerialzerKeyName, valueAVROSerializerClassName);
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
streamsConfig.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
streamsConfig.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
streamsConfig.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
streamsConfig.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,StreamsConfig.OPTIMIZE);
streamsConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionMode);
streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
我一直在寻找一种机制来在我的 KeyValueMapper 中执行以下操作:
- 如果任何 REST API 挂了,我就会捕获异常
- 我希望相同的偏移量一直循环直到系统恢复或暂停消耗直到系统恢复
我检查了以下 link,但它们似乎没有帮助。
How to run kafka streams effectively with single app instance and single topic partitions?
下面 link 讨论了 KafkaTransactionManager 但那是行不通的我猜 KStream 上面的初始化方式
在此方向上的任何帮助/指示将不胜感激。
你想做的事情不被真正支持。在 Kafka Streams 中暂停消费者是不可能的。
如果您使用 KeyValueMapper
循环,您只能 "halt" 处理,但是,对于这种情况,消费者可能会退出消费者组。对于您的情况,使用单个输入主题分区并且无论如何只能在单个 KafkaStreams
实例中有一个线程,因此,它不会影响该组的任何其他成员(因为有 none) .但是,问题是在线程退出组后提交偏移量将失败。因此,在线程重新加入组后,它将获取一个较旧的偏移量并重新处理一些数据(即,您得到重复的数据处理)。为避免退出消费者组,您可以将 max.poll.interval.ms
配置设置为较高的值(甚至可能 Integer.MAX_VALUE
)——假设您在消费者组中只有一个成员,请设置较高的值应该没问题。
另一种选择可能是将 transform()
与状态存储一起使用。如果您无法进行 REST 调用,则将数据放入存储中并稍后重试。这样消费者就不会退出群组。但是,读取新数据永远不会停止,您需要缓冲存储中的所有数据,直到可以再次调用 REST API。您应该能够在 Transformer
中通过 "sleeping" 减慢读取新数据的速度(以减少需要缓冲的数据量)——您只需要确保不违反 max.poll.interval.ms
配置(默认为 30 秒)。