spring cloud stream kafka - 获取批量数据并错过心跳
spring cloud stream kafka - fetches bulks and misses heartbeats
我正在查看一个 spring 引导服务,它从 apache kafka 读取消息,通过 http 从另一个服务请求消息指示的记录,处理它们,将一些数据保存到数据库中并发布结果转到另一个主题。
这是通过
完成的
@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)
这是在几个服务中完成的,通常工作得很好。唯一的属性组是
spring.cloud.stream.binder.consumer.concurrency=20
主题本身有 20 个分区,应该适合。
当监控来自 kafka 的读取时,我们发现吞吐量非常低且行为异常:
该应用一次最多可读取 500 条消息,然后有 1-2 分钟没有任何消息。在此期间,消费者反复记录它是"missing heartbeats, because the partition was rebalanced"、"reassigning partitions",有时甚至抛出异常说它是"failed to commit, because the polling interval has elapsed"
我们得出结论,这意味着消费者获取了 500 条消息,需要很长时间来处理所有消息,错过了它的时间 window,因此无法将 500 条消息中的任何一条提交给代理 - 这重新分配分区并重新发送相同的消息。
在浏览了线程和文档之后,我找到了 "max.poll.records" 属性,但是关于设置此 属性.
的建议存在冲突
有人说设置在
下
spring.cloud.stream.bindings.consumer.<input>.configuration
有人说
spring.cloud.stream.kafka.binders.consumer-properties
我尝试将两者都设置为 1,但服务行为没有改变。
我如何正确处理这种情况,即消费者无法使用默认设置跟上所需的轮询间隔?
通用 yaml:
spring.cloud.stream.default.group=${spring.application.name}
服务-yaml
spring:
clould:
stream:
default:
consumer.headerMode: embeddedHeaders
producer.headerMode: embeddedHeaders
bindings:
someOutput:
destination: outTopic
someInput:
destination: inTopic
consumer:
concurrency: 30
kafka:
bindings:
consumer:
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this
binder:
consumer-properties:
max.poll.records: 10 # this gets used first
configuration:
max.poll.records: 40 # this get used when the first one is not present
"Ignores this" 始终意味着,如果未设置其他 属性,ConsumerConfiguration 将其最大轮询记录保持默认值 500
编辑:我们离得更近了:
问题与 spring 重试设置了 exponentialBackoffStrategy - 以及一系列有效停止应用程序的错误有关。
我没有得到的是,我们通过向相关主题发布格式错误的消息来强制出现 200 个错误,这导致应用程序读取 200,花费了很长时间(使用旧的重试配置),然后在以下位置提交了所有 200 个错误一次。
如果我们有
,这有什么意义
max.poll.records: 1
concurrency: 1
ackEachRecod = true
enableDlq: true # (which implicitly makes autoCommitOffsets = true according to the docs)
是
spring.cloud.stream.kafka.bindings.consumer.<input>.consumer.configuration.max.poll.records
.
参见the documentation...
Kafka Consumer Properties
The following properties are available for Kafka consumers only and must be prefixed with spring.cloud.stream.kafka.bindings.<channelName>.consumer.
...
configuration
Map with a key/value pair containing generic Kafka consumer properties.
Default: Empty map.
...
您还可以增加 max.poll.interval.ms
.
编辑
我刚刚用 2.1 进行了测试。0.RELEASE - 它按照我的描述工作:
无设置
2019-03-01 08:47:59.560 INFO 44698 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 500
...
默认启动
spring.kafka.consumer.properties.max.poll.records=42
2019-03-01 08:49:49.197 INFO 45044 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 42
...
活页夹默认#1
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.consumer-properties.max.poll.records=43
2019-03-01 08:52:11.469 INFO 45842 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 43
...
活页夹默认#2
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
2019-03-01 08:54:06.211 INFO 46252 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 43
...
绑定默认值
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
2019-03-01 09:02:26.004 INFO 47833 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 44
...
特定绑定
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45
2019-03-01 09:05:01.452 INFO 48330 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 45
...
EDIT2
这是完整的测试应用程序。我只是在 http://start.spring.io 创建了一个新应用并选择了 'Kafka' 和“Cloud Stream.
@SpringBootApplication
@EnableBinding(Sink.class)
public class So54932453Application {
public static void main(String[] args) {
SpringApplication.run(So54932453Application.class, args).close();
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
}
}
和
spring.cloud.stream.bindings.input.group=so54932453
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45
和
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>net.gprussell</groupId>
<artifactId>so54932453</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>so54932453</name>
<description>Demo</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
</project>
我正在查看一个 spring 引导服务,它从 apache kafka 读取消息,通过 http 从另一个服务请求消息指示的记录,处理它们,将一些数据保存到数据库中并发布结果转到另一个主题。
这是通过
完成的@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)
这是在几个服务中完成的,通常工作得很好。唯一的属性组是
spring.cloud.stream.binder.consumer.concurrency=20
主题本身有 20 个分区,应该适合。
当监控来自 kafka 的读取时,我们发现吞吐量非常低且行为异常:
该应用一次最多可读取 500 条消息,然后有 1-2 分钟没有任何消息。在此期间,消费者反复记录它是"missing heartbeats, because the partition was rebalanced"、"reassigning partitions",有时甚至抛出异常说它是"failed to commit, because the polling interval has elapsed"
我们得出结论,这意味着消费者获取了 500 条消息,需要很长时间来处理所有消息,错过了它的时间 window,因此无法将 500 条消息中的任何一条提交给代理 - 这重新分配分区并重新发送相同的消息。
在浏览了线程和文档之后,我找到了 "max.poll.records" 属性,但是关于设置此 属性.
的建议存在冲突有人说设置在
下spring.cloud.stream.bindings.consumer.<input>.configuration
有人说
spring.cloud.stream.kafka.binders.consumer-properties
我尝试将两者都设置为 1,但服务行为没有改变。
我如何正确处理这种情况,即消费者无法使用默认设置跟上所需的轮询间隔?
通用 yaml:
spring.cloud.stream.default.group=${spring.application.name}
服务-yaml
spring:
clould:
stream:
default:
consumer.headerMode: embeddedHeaders
producer.headerMode: embeddedHeaders
bindings:
someOutput:
destination: outTopic
someInput:
destination: inTopic
consumer:
concurrency: 30
kafka:
bindings:
consumer:
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this
binder:
consumer-properties:
max.poll.records: 10 # this gets used first
configuration:
max.poll.records: 40 # this get used when the first one is not present
"Ignores this" 始终意味着,如果未设置其他 属性,ConsumerConfiguration 将其最大轮询记录保持默认值 500
编辑:我们离得更近了:
问题与 spring 重试设置了 exponentialBackoffStrategy - 以及一系列有效停止应用程序的错误有关。
我没有得到的是,我们通过向相关主题发布格式错误的消息来强制出现 200 个错误,这导致应用程序读取 200,花费了很长时间(使用旧的重试配置),然后在以下位置提交了所有 200 个错误一次。
如果我们有
,这有什么意义max.poll.records: 1
concurrency: 1
ackEachRecod = true
enableDlq: true # (which implicitly makes autoCommitOffsets = true according to the docs)
是
spring.cloud.stream.kafka.bindings.consumer.<input>.consumer.configuration.max.poll.records
.
参见the documentation...
Kafka Consumer Properties
The following properties are available for Kafka consumers only and must be prefixed with
spring.cloud.stream.kafka.bindings.<channelName>.consumer.
...
configuration
Map with a key/value pair containing generic Kafka consumer properties.
Default: Empty map.
...
您还可以增加 max.poll.interval.ms
.
编辑
我刚刚用 2.1 进行了测试。0.RELEASE - 它按照我的描述工作:
无设置
2019-03-01 08:47:59.560 INFO 44698 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 500
...
默认启动
spring.kafka.consumer.properties.max.poll.records=42
2019-03-01 08:49:49.197 INFO 45044 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 42
...
活页夹默认#1
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.consumer-properties.max.poll.records=43
2019-03-01 08:52:11.469 INFO 45842 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 43
...
活页夹默认#2
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
2019-03-01 08:54:06.211 INFO 46252 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 43
...
绑定默认值
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
2019-03-01 09:02:26.004 INFO 47833 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 44
...
特定绑定
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45
2019-03-01 09:05:01.452 INFO 48330 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 45
...
EDIT2
这是完整的测试应用程序。我只是在 http://start.spring.io 创建了一个新应用并选择了 'Kafka' 和“Cloud Stream.
@SpringBootApplication
@EnableBinding(Sink.class)
public class So54932453Application {
public static void main(String[] args) {
SpringApplication.run(So54932453Application.class, args).close();
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
}
}
和
spring.cloud.stream.bindings.input.group=so54932453
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45
和
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>net.gprussell</groupId>
<artifactId>so54932453</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>so54932453</name>
<description>Demo</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
</project>