第一次如何使用 apache kafka 集成部署 storm-core 拓扑?
How to deploy storm-core topology with apache kafka integration for first time?
我想获得关于 apache storm 和 kafka 初始设置的帮助。
我能够将拓扑提交到风暴集群,但在 风暴中出现以下错误 ui。
Unable to get offset lags for kafka. Reason: java.lang.IllegalArgumentException: zk-node '/kafka-cluster-1/brokers/topics/myfirsttopic/aadb3eb4-2224-4c18-b8ad-6959a1c9f607' dose not exists. at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOldConsumerOffsetsFromZk(KafkaOffsetLagUtil.java:387) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:268) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.main(KafkaOffsetLagUtil.java:124)
我的代码片段如下。
// Kafka consumer client depends on Zookeeper when finding kafka nodes.
// Zookeeper Host List
String zkConnString = "localhost:2181";
String brokerZkPath = "/kafka-cluster-1/brokers";
String zkRoot = "/kafka-cluster-1/brokers/topics";
String topicName = "myfirsttopic";
/* ****************************************************************** */
/* Topology configuration variable */
/* ****************************************************************** */
/* the number of tasks that should be assigned to execute this bolt */
Integer boltParalismHint = 1;
Integer spoutParalismHint = 1;
/* ****************************************************************** */
/* Build kafka consumer spout */
/* ****************************************************************** */
// Build zookeeper instance
BrokerHosts hosts = new ZkHosts( zkConnString, brokerZkPath );
// Build configuration instance for Spout
SpoutConfig spoutConfig = new SpoutConfig( hosts, topicName, zkRoot + "/" + topicName , UUID.randomUUID().toString() );
spoutConfig.ignoreZkOffsets = true;
// Build Multischeme instance
spoutConfig.scheme = new SchemeAsMultiScheme( new StringScheme() );
// Build Kafka spout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
我引用了 document 并将 ignoreZkOffsets 设置为 true。
If you want to force the spout to ignore any consumer state
information stored in ZooKeeper, then you should set the parameter
KafkaConfig.ignoreZkOffsets to true
但是,从日志来看,kafka spout 似乎正在从 Zookeeper 读取偏移量。
由于它是初始设置,我如何才能停止风暴从 Zookeeper 读取偏移量?
我使用以下版本。
- apache 风暴 1.2.1
- apache kafka kafka_2.12-1.1.0
我没有做任何特别的事情,但在下面的情况下,错误似乎没有出现在 storm ui .
- 在Kafka
中创建主题
- 确保 brokerZkPath 存在于 Zookeeper 中(brokers 目录的路径。在我的情况 /kafka-cluster-1/brokers)
- 确保 zkRootPath 存在于 Zookeeper 中(topics 目录的路径。在我的案例 /kafka-cluster-1/brokers/topics)
- 提交拓扑给storm
我想获得关于 apache storm 和 kafka 初始设置的帮助。
我能够将拓扑提交到风暴集群,但在 风暴中出现以下错误 ui。
Unable to get offset lags for kafka. Reason: java.lang.IllegalArgumentException: zk-node '/kafka-cluster-1/brokers/topics/myfirsttopic/aadb3eb4-2224-4c18-b8ad-6959a1c9f607' dose not exists. at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOldConsumerOffsetsFromZk(KafkaOffsetLagUtil.java:387) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:268) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.main(KafkaOffsetLagUtil.java:124)
我的代码片段如下。
// Kafka consumer client depends on Zookeeper when finding kafka nodes.
// Zookeeper Host List
String zkConnString = "localhost:2181";
String brokerZkPath = "/kafka-cluster-1/brokers";
String zkRoot = "/kafka-cluster-1/brokers/topics";
String topicName = "myfirsttopic";
/* ****************************************************************** */
/* Topology configuration variable */
/* ****************************************************************** */
/* the number of tasks that should be assigned to execute this bolt */
Integer boltParalismHint = 1;
Integer spoutParalismHint = 1;
/* ****************************************************************** */
/* Build kafka consumer spout */
/* ****************************************************************** */
// Build zookeeper instance
BrokerHosts hosts = new ZkHosts( zkConnString, brokerZkPath );
// Build configuration instance for Spout
SpoutConfig spoutConfig = new SpoutConfig( hosts, topicName, zkRoot + "/" + topicName , UUID.randomUUID().toString() );
spoutConfig.ignoreZkOffsets = true;
// Build Multischeme instance
spoutConfig.scheme = new SchemeAsMultiScheme( new StringScheme() );
// Build Kafka spout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
我引用了 document 并将 ignoreZkOffsets 设置为 true。
If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true
但是,从日志来看,kafka spout 似乎正在从 Zookeeper 读取偏移量。
由于它是初始设置,我如何才能停止风暴从 Zookeeper 读取偏移量?
我使用以下版本。
- apache 风暴 1.2.1
- apache kafka kafka_2.12-1.1.0
我没有做任何特别的事情,但在下面的情况下,错误似乎没有出现在 storm ui .
- 在Kafka 中创建主题
- 确保 brokerZkPath 存在于 Zookeeper 中(brokers 目录的路径。在我的情况 /kafka-cluster-1/brokers)
- 确保 zkRootPath 存在于 Zookeeper 中(topics 目录的路径。在我的案例 /kafka-cluster-1/brokers/topics)
- 提交拓扑给storm