制作 apache 风暴拓扑以使用来自 kafka 的最新偏移量
Make apache storm topology to use latest offset from kafka
我有一个 kafkaspout,2 个 bolts 处理数据,2 个 bolts 存储处理后的数据 mongodb
我正在使用 apache flux 来创建拓扑,我在其中将数据从 kafka 读入 spout。一切都很好 运行ning 但每次我 运行 拓扑时,它从一开始就处理 kafka 中的所有消息。
一旦它处理了所有的消息,它就不会等待更多的消息和崩溃。
如何使风暴拓扑仅处理最新消息。
这是我的拓扑文件.yaml
name: "kafka-topology"
components:
# MongoDB mapper
- id: "block-mapper"
className: "org.apache.storm.mongodb.common.mapper.SimpleMongoMapper"
configMethods:
- name: "withFields"
args: # The following are the tuple fields to map to a MongoDB document
- ["block"]
# MongoDB mapper
- id: "transaction-mapper"
className: "org.apache.storm.mongodb.common.mapper.SimpleMongoMapper"
configMethods:
- name: "withFields"
args: # The following are the tuple fields to map to a MongoDB document
- ["transaction"]
- id: "stringScheme"
className: "org.apache.storm.kafka.StringScheme"
- id: "stringMultiScheme"
className: "org.apache.storm.spout.SchemeAsMultiScheme"
constructorArgs:
- ref: "stringScheme"
- id: "zkHosts"
className: "org.apache.storm.kafka.ZkHosts"
constructorArgs:
- "172.25.33.191:2181"
- id: "spoutConfig"
className: "org.apache.storm.kafka.SpoutConfig"
constructorArgs:
# brokerHosts
- ref: "zkHosts"
# topic
- "blockdata"
# zkRoot
- ""
# id
- "myId"
properties:
- name: "scheme"
ref: "stringMultiScheme"
- name: "ignoreZkOffsets"
value: flase
config:
topology.workers: 1
# ...
# spout definitions
spouts:
- id: "kafka-spout"
className: "org.apache.storm.kafka.KafkaSpout"
constructorArgs:
- ref: "spoutConfig"
parallelism: 1
# bolt definitions
bolts:
- id: "blockprocessing-bolt"
className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
constructorArgs:
# command line
- ["python", "process-bolt.py"]
# output fields
- ["block"]
parallelism: 1
# ...
- id: "transprocessing-bolt"
className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
constructorArgs:
# command line
- ["python", "trans-bolt.py"]
# output fields
- ["transaction"]
parallelism: 1
# ...
- id: "mongoBlock-bolt"
className: "org.apache.storm.mongodb.bolt.MongoInsertBolt"
constructorArgs:
- "mongodb://172.25.33.205:27017/testdb"
- "block"
- ref: "block-mapper"
parallelism: 1
# ...
- id: "mongoTrans-bolt"
className: "org.apache.storm.mongodb.bolt.MongoInsertBolt"
constructorArgs:
- "mongodb://172.25.33.205:27017/testdb"
- "transaction"
- ref: "transaction-mapper"
parallelism: 1
# ...
- id: "log"
className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
parallelism: 1
# ...
#stream definitions
# stream definitions define connections between spouts and bolts.
# note that such connections can be cyclical
# custom stream groupings are also supported
streams:
- name: "kafka --> block-Processing" # name isn't used (placeholder for logging, UI, etc.)
from: "kafka-spout"
to: "blockprocessing-bolt"
grouping:
type: SHUFFLE
- name: "kafka --> transaction-processing" # name isn't used (placeholder for logging, UI, etc.)
from: "kafka-spout"
to: "transprocessing-bolt"
grouping:
type: SHUFFLE
- name: "block --> mongo"
from: "blockprocessing-bolt"
to: "mongoBlock-bolt"
grouping:
type: SHUFFLE
- name: "transaction --> mongo"
from: "transprocessing-bolt"
to: "mongoTrans-bolt"
grouping:
type: SHUFFLE
我试过将 属性 添加到 spoutconfig 以仅像这样获取最新消息
- id: "spoutConfig"
className: "org.apache.storm.kafka.SpoutConfig"
constructorArgs:
- ref: "zkHosts"
- "blockdata"
- ""
- "myId"
properties:
- name: "scheme"
ref: "stringMultiScheme"
- name: "startOffsetTime"
ref: "EarliestTime"
- name: "forceFromStart"
value: false
但是无论我在 startOffsetTime
的 ref 中放置什么,它都会给出 error
Exception in thread "main" java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value
您需要将 startOffsetTime 设置为 kafka.api.OffsetRequest.LatestTime。正如您在 https://github.com/apache/storm/tree/64af629a19a82591dbf3428f7fd6b02f39e0723f/external/storm-kafka#kafkaconfig 看到的,默认设置将转到可用的最早偏移量。
您遇到的异常似乎无关。它看起来像 Curator/Zookeeper 不兼容。
编辑:我认为您遇到了这个问题 https://issues.apache.org/jira/browse/STORM-2978。 1.2.2应该快出来了,等发布了再升级吧。
编辑编辑:如果您想在不升级的情况下解决它,请为您的拓扑编辑 pom,使其包含对 Zookeeper 3.4 而不是 3.5 的依赖。
我有一个 kafkaspout,2 个 bolts 处理数据,2 个 bolts 存储处理后的数据 mongodb
我正在使用 apache flux 来创建拓扑,我在其中将数据从 kafka 读入 spout。一切都很好 运行ning 但每次我 运行 拓扑时,它从一开始就处理 kafka 中的所有消息。 一旦它处理了所有的消息,它就不会等待更多的消息和崩溃。
如何使风暴拓扑仅处理最新消息。
这是我的拓扑文件.yaml
name: "kafka-topology"
components:
# MongoDB mapper
- id: "block-mapper"
className: "org.apache.storm.mongodb.common.mapper.SimpleMongoMapper"
configMethods:
- name: "withFields"
args: # The following are the tuple fields to map to a MongoDB document
- ["block"]
# MongoDB mapper
- id: "transaction-mapper"
className: "org.apache.storm.mongodb.common.mapper.SimpleMongoMapper"
configMethods:
- name: "withFields"
args: # The following are the tuple fields to map to a MongoDB document
- ["transaction"]
- id: "stringScheme"
className: "org.apache.storm.kafka.StringScheme"
- id: "stringMultiScheme"
className: "org.apache.storm.spout.SchemeAsMultiScheme"
constructorArgs:
- ref: "stringScheme"
- id: "zkHosts"
className: "org.apache.storm.kafka.ZkHosts"
constructorArgs:
- "172.25.33.191:2181"
- id: "spoutConfig"
className: "org.apache.storm.kafka.SpoutConfig"
constructorArgs:
# brokerHosts
- ref: "zkHosts"
# topic
- "blockdata"
# zkRoot
- ""
# id
- "myId"
properties:
- name: "scheme"
ref: "stringMultiScheme"
- name: "ignoreZkOffsets"
value: flase
config:
topology.workers: 1
# ...
# spout definitions
spouts:
- id: "kafka-spout"
className: "org.apache.storm.kafka.KafkaSpout"
constructorArgs:
- ref: "spoutConfig"
parallelism: 1
# bolt definitions
bolts:
- id: "blockprocessing-bolt"
className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
constructorArgs:
# command line
- ["python", "process-bolt.py"]
# output fields
- ["block"]
parallelism: 1
# ...
- id: "transprocessing-bolt"
className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
constructorArgs:
# command line
- ["python", "trans-bolt.py"]
# output fields
- ["transaction"]
parallelism: 1
# ...
- id: "mongoBlock-bolt"
className: "org.apache.storm.mongodb.bolt.MongoInsertBolt"
constructorArgs:
- "mongodb://172.25.33.205:27017/testdb"
- "block"
- ref: "block-mapper"
parallelism: 1
# ...
- id: "mongoTrans-bolt"
className: "org.apache.storm.mongodb.bolt.MongoInsertBolt"
constructorArgs:
- "mongodb://172.25.33.205:27017/testdb"
- "transaction"
- ref: "transaction-mapper"
parallelism: 1
# ...
- id: "log"
className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
parallelism: 1
# ...
#stream definitions
# stream definitions define connections between spouts and bolts.
# note that such connections can be cyclical
# custom stream groupings are also supported
streams:
- name: "kafka --> block-Processing" # name isn't used (placeholder for logging, UI, etc.)
from: "kafka-spout"
to: "blockprocessing-bolt"
grouping:
type: SHUFFLE
- name: "kafka --> transaction-processing" # name isn't used (placeholder for logging, UI, etc.)
from: "kafka-spout"
to: "transprocessing-bolt"
grouping:
type: SHUFFLE
- name: "block --> mongo"
from: "blockprocessing-bolt"
to: "mongoBlock-bolt"
grouping:
type: SHUFFLE
- name: "transaction --> mongo"
from: "transprocessing-bolt"
to: "mongoTrans-bolt"
grouping:
type: SHUFFLE
我试过将 属性 添加到 spoutconfig 以仅像这样获取最新消息
- id: "spoutConfig"
className: "org.apache.storm.kafka.SpoutConfig"
constructorArgs:
- ref: "zkHosts"
- "blockdata"
- ""
- "myId"
properties:
- name: "scheme"
ref: "stringMultiScheme"
- name: "startOffsetTime"
ref: "EarliestTime"
- name: "forceFromStart"
value: false
但是无论我在 startOffsetTime
的 ref 中放置什么,它都会给出 errorException in thread "main" java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value
您需要将 startOffsetTime 设置为 kafka.api.OffsetRequest.LatestTime。正如您在 https://github.com/apache/storm/tree/64af629a19a82591dbf3428f7fd6b02f39e0723f/external/storm-kafka#kafkaconfig 看到的,默认设置将转到可用的最早偏移量。
您遇到的异常似乎无关。它看起来像 Curator/Zookeeper 不兼容。
编辑:我认为您遇到了这个问题 https://issues.apache.org/jira/browse/STORM-2978。 1.2.2应该快出来了,等发布了再升级吧。
编辑编辑:如果您想在不升级的情况下解决它,请为您的拓扑编辑 pom,使其包含对 Zookeeper 3.4 而不是 3.5 的依赖。