Storm-kafka:在 apache Flux Yaml 拓扑中将 startOffsetTime 设置为 kafka.api.OffsetRequest.LatestTime

Storm-kafka: set startOffsetTime to kafka.api.OffsetRequest.LatestTime in apache Flux Yaml topology

我正在使用 apache flux 研究拓扑。目前,strom 从头开始​​获取消息,但我希望它只从 kafka 获取最新消息。

我正在 YAML 文件中编写拓扑。

这是我的 spoutConfig 的样子:

  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"

  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme"

  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "172.25.33.191:2181"

  - id: "spoutConfig"
    className: "org.apache.storm.kafka.SpoutConfig"
    constructorArgs:
      - ref: "zkHosts"
      - "blockdata"
      - ""
      - "myId"
    properties:
      - name: "scheme"
        ref: "stringMultiScheme"

      - name: "ignoreZkOffsets"
        value: true

      - name: "startOffsetTime"
        ref: "XXXXXXXXX"

现在,我卡住了。如何将 startOffsetTime 设置为正确的功能以仅从 kafka 获取最新消息?

我已经试过 ref:"LatestTime",但无论我在里面放什么,它都会给我错误:

java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value

我相信 Flux 可以处理调用静态工厂方法。

- id: "startingOffsetTime"
  className: "kafka.api.OffsetRequest"
  factory: "LatestTime"

然后在您的 SpoutConfig 定义中使用它,例如

properties:
  - name: "startOffsetTime"
    ref: "startingOffsetTime"

我还没有对此进行测试,但我认为它应该可以工作。不久前合并了调用静态工厂方法的能力 https://issues.apache.org/jira/browse/STORM-2796, but it seems to be missing from the documentation. I've raised an issue to update the docs https://issues.apache.org/jira/browse/STORM-3086

如果您想查看此功能的示例,请查看 https://github.com/apache/storm/blob/master/flux/flux-core/src/test/resources/configs/config-methods-test.yaml#L38