Storm-kafka:在 apache Flux Yaml 拓扑中将 startOffsetTime 设置为 kafka.api.OffsetRequest.LatestTime
Storm-kafka: set startOffsetTime to kafka.api.OffsetRequest.LatestTime in apache Flux Yaml topology
我正在使用 apache flux 研究拓扑。目前,strom 从头开始获取消息,但我希望它只从 kafka 获取最新消息。
我正在 YAML 文件中编写拓扑。
这是我的 spoutConfig 的样子:
- id: "stringScheme"
className: "org.apache.storm.kafka.StringScheme"
- id: "stringMultiScheme"
className: "org.apache.storm.spout.SchemeAsMultiScheme"
constructorArgs:
- ref: "stringScheme"
- id: "zkHosts"
className: "org.apache.storm.kafka.ZkHosts"
constructorArgs:
- "172.25.33.191:2181"
- id: "spoutConfig"
className: "org.apache.storm.kafka.SpoutConfig"
constructorArgs:
- ref: "zkHosts"
- "blockdata"
- ""
- "myId"
properties:
- name: "scheme"
ref: "stringMultiScheme"
- name: "ignoreZkOffsets"
value: true
- name: "startOffsetTime"
ref: "XXXXXXXXX"
现在,我卡住了。如何将 startOffsetTime 设置为正确的功能以仅从 kafka 获取最新消息?
我已经试过 ref:"LatestTime",但无论我在里面放什么,它都会给我错误:
java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value
我相信 Flux 可以处理调用静态工厂方法。
- id: "startingOffsetTime"
className: "kafka.api.OffsetRequest"
factory: "LatestTime"
然后在您的 SpoutConfig 定义中使用它,例如
properties:
- name: "startOffsetTime"
ref: "startingOffsetTime"
我还没有对此进行测试,但我认为它应该可以工作。不久前合并了调用静态工厂方法的能力 https://issues.apache.org/jira/browse/STORM-2796, but it seems to be missing from the documentation. I've raised an issue to update the docs https://issues.apache.org/jira/browse/STORM-3086。
如果您想查看此功能的示例,请查看 https://github.com/apache/storm/blob/master/flux/flux-core/src/test/resources/configs/config-methods-test.yaml#L38
我正在使用 apache flux 研究拓扑。目前,strom 从头开始获取消息,但我希望它只从 kafka 获取最新消息。
我正在 YAML 文件中编写拓扑。
这是我的 spoutConfig 的样子:
- id: "stringScheme"
className: "org.apache.storm.kafka.StringScheme"
- id: "stringMultiScheme"
className: "org.apache.storm.spout.SchemeAsMultiScheme"
constructorArgs:
- ref: "stringScheme"
- id: "zkHosts"
className: "org.apache.storm.kafka.ZkHosts"
constructorArgs:
- "172.25.33.191:2181"
- id: "spoutConfig"
className: "org.apache.storm.kafka.SpoutConfig"
constructorArgs:
- ref: "zkHosts"
- "blockdata"
- ""
- "myId"
properties:
- name: "scheme"
ref: "stringMultiScheme"
- name: "ignoreZkOffsets"
value: true
- name: "startOffsetTime"
ref: "XXXXXXXXX"
现在,我卡住了。如何将 startOffsetTime 设置为正确的功能以仅从 kafka 获取最新消息?
我已经试过 ref:"LatestTime",但无论我在里面放什么,它都会给我错误:
java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value
我相信 Flux 可以处理调用静态工厂方法。
- id: "startingOffsetTime"
className: "kafka.api.OffsetRequest"
factory: "LatestTime"
然后在您的 SpoutConfig 定义中使用它,例如
properties:
- name: "startOffsetTime"
ref: "startingOffsetTime"
我还没有对此进行测试,但我认为它应该可以工作。不久前合并了调用静态工厂方法的能力 https://issues.apache.org/jira/browse/STORM-2796, but it seems to be missing from the documentation. I've raised an issue to update the docs https://issues.apache.org/jira/browse/STORM-3086。
如果您想查看此功能的示例,请查看 https://github.com/apache/storm/blob/master/flux/flux-core/src/test/resources/configs/config-methods-test.yaml#L38