fetch-min-size & max-poll-records sping kafka 配置没有按预期工作

fetch-min-size & max-poll-records sping kafka configurations does not work as expected

我正在使用 spring kafka 开发一个 Spring 启动应用程序,它会监听 kafka 的单个主题,然后分离各个类别的记录,创建一个 json 文件输出并将其上传到 AWS S3。

我在 Kafka 主题中收到大量数据,我需要确保 json 文件被适当地分块以限制上传到 S3 的 json 数量。

下面是我的 application.yml kafka 消费者配置。

spring:
  kafka:
    consumer:
      group-id: newton
      auto-offset-reset: earliest
      fetch-max-wait: 
        seconds: 1 
      fetch-min-size: 500000000
      max-poll-records: 50000000
      value-deserializer: com.forwarding.application.consumer.model.deserializer.MeasureDeserializer

我已经创建了一个用于连续阅读主题的监听器。

即使有上述配置,我在控制台中收到的记录如下:

   2019-03-27T15:25:56.02+0530 [APP/PROC/WEB/0] OUT 2019-03-27 09:55:56.024  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 56. No Of measures: 60
   2019-03-27T15:25:56.21+0530 [APP/PROC/WEB/2] OUT 2019-03-27 09:55:56.210  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 80. No Of measures: 96
   2019-03-27T15:25:56.56+0530 [APP/PROC/WEB/0] OUT 2019-03-27 09:55:56.560  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 76. No Of measures: 39
   2019-03-27T15:25:56.73+0530 [APP/PROC/WEB/2] OUT 2019-03-27 09:55:56.732  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 77. No Of measures: 66

任何人都可以让我知道可以根据 application.yml 中的配置配置什么来获取接收到的记录吗?

我刚刚复制了您的配置(最大等待时间除外 - 请参阅我使用的语法)并且运行良好...

spring:
  kafka:
    consumer:
      group-id: newton
      auto-offset-reset: earliest
      fetch-max-wait: 1s
      fetch-min-size: 500000000
      max-poll-records: 50000000
2019-03-27 13:43:55.454  INFO 98982 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 1000
    fetch.min.bytes = 500000000
    group.id = newton
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 50000000
    ...

您使用 ...properties 属性.

将不直接支持的任意属性设置为启动属性

例如

spring:
  kafka:
    consumer:
      properties:
        max.poll.interval.ms: 300000

spring:
  kafka:
    consumer:
      properties:
         max:
           poll:
             interval:
               ms: 300000

documentation is here.

The properties supported by auto configuration are shown in Appendix A, Common application properties. Note that, for the most part, these properties (hyphenated or camelCase) map directly to the Apache Kafka dotted properties. Refer to the Apache Kafka documentation for details.

The first few of these properties apply to all components (producers, consumers, admins, and streams) but can be specified at the component level if you wish to use different values. Apache Kafka designates properties with an importance of HIGH, MEDIUM, or LOW. Spring Boot auto-configuration supports all HIGH importance properties, some selected MEDIUM and LOW properties, and any properties that do not have a default value.

Only a subset of the properties supported by Kafka are available directly through the KafkaProperties class. If you wish to configure the producer or consumer with additional properties that are not directly supported, use the following properties:

spring.kafka.properties.prop.one=first

spring.kafka.admin.properties.prop.two=second

spring.kafka.consumer.properties.prop.three=third

spring.kafka.producer.properties.prop.four=fourth

spring.kafka.streams.properties.prop.five=fifth