KafkaSpout(空闲)产生巨大的网络流量

KafkaSpout (idle) generates a huge network traffic

在使用 KafkaSpout 和几个 Bolt 开发并执行我的 Storm (1.0.1) 拓扑后,我注意到即使在拓扑空闲时也有巨大的网络流量(Kafka 上没有消息,bolt 中没有处理) ).因此,我开始逐条注释我的拓扑结构以找出原因,现在我的主节点中只有 KafkaSpout:

....
final SpoutConfig spoutConfig = new SpoutConfig(
                new ZkHosts(zkHosts, "/brokers"), 
                "files-topic", // topic
                "/kafka", // ZK chroot 
                "consumer-group-name");
     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
     spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
     topologyBuilder.setSpout(
                        "kafka-spout-id, 
                        new KafkaSpout(config),
                        1); 
....

当这个(无用的)拓扑执行时,即使在本地模式下,即使是第一次,网络流量总是增长很多:我看到(在我的 Activity 监视器中)

(重要提示:Kafka 不是 运行 集群,是在同一台机器上运行的单个实例,具有单个主题和单个分区。我刚刚在我的机器上下载了 Kafka,启动它并创建了一个简单的主题。当我在主题中放置消息时,拓扑中的所有内容都可以正常工作)

很明显,原因在KafkaSpout.nextTuple()方法(下),但我不明白为什么,在Kafka中没有任何消息,我应该有这样的流量。有什么我没有考虑到的吗?这是预期的行为吗?我看了 Kafka 日志,ZK 日志,什么都没有,我已经清理了 Kafka 和 ZK 数据,什么都没有,仍然是相同的行为。

@Override
public void nextTuple() {
    List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
    for (int i = 0; i < managers.size(); i++) {

        try {
            // in case the number of managers decreased
            _currPartitionIndex = _currPartitionIndex % managers.size();
            EmitState state = managers.get(_currPartitionIndex).next(_collector);
            if (state != EmitState.EMITTED_MORE_LEFT) {
                _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
            }
            if (state != EmitState.NO_EMITTED) {
                break;
            }
        } catch (FailedFetchException e) {
            LOG.warn("Fetch failed", e);
            _coordinator.refresh();
        }
    }

    long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;

    /*
         As far as the System.currentTimeMillis() is dependent on System clock,
         additional check on negative value of diffWithNow in case of external changes.
     */
    if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) {
        commit();
    }
}

你的bolt收到消息了吗?你的螺栓继承了 BaseRichBolt 吗?

在 Kafaspout 中注释掉那行 m.fail(id.offset) 并检查它。如果您的 bolt 没有确认,那么您的 spout 会认为消息失败并尝试重播相同的消息。

public void fail(Object msgId) {
        KafkaMessageId id = (KafkaMessageId) msgId;
        PartitionManager m = _coordinator.getManager(id.partition);
        if (m != null) {
            //m.fail(id.offset);
        }

也尝试将 nextTuple() 暂停几毫秒并检查。

如果有帮助请告诉我

Put a sleep for one second (1000ms) in the nextTuple() method and observe the traffic now, For example,

@Override
public void nextTuple() {
   try {
       Thread.sleep(1000);
   } catch(Exception ex){
        log.error("Ëxception while sleeping...",e);
   }
   List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
   for (int i = 0; i < managers.size(); i++) {
     ...
     ...
     ...
     ...
}

原因是,kafka 消费者在 pull methodology which means, consumers will pull data from kafka brokers. So in consumer point of view (Kafka Spout) will do a fetch request to the kafka broker continuously which is a TCP network request. So you are facing a huge statistics on the data packet sent/received. Though the consumer doesn't consumes any message, pull request and empty response also will get account into network data packet sent/received statistics. Your network traffic will be less if your sleeping time is high. There are also some network related configurations 的基础上为经纪人和消费者工作。对配置进行研究可能会对您有所帮助。希望对你有帮助。