KafkaSpout(空闲)产生巨大的网络流量
KafkaSpout (idle) generates a huge network traffic
在使用 KafkaSpout 和几个 Bolt 开发并执行我的 Storm (1.0.1) 拓扑后,我注意到即使在拓扑空闲时也有巨大的网络流量(Kafka 上没有消息,bolt 中没有处理) ).因此,我开始逐条注释我的拓扑结构以找出原因,现在我的主节点中只有 KafkaSpout:
....
final SpoutConfig spoutConfig = new SpoutConfig(
new ZkHosts(zkHosts, "/brokers"),
"files-topic", // topic
"/kafka", // ZK chroot
"consumer-group-name");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
topologyBuilder.setSpout(
"kafka-spout-id,
new KafkaSpout(config),
1);
....
当这个(无用的)拓扑执行时,即使在本地模式下,即使是第一次,网络流量总是增长很多:我看到(在我的 Activity 监视器中)
- 平均 432 KB 数据received/sec
- 几个小时后,拓扑结构为 运行(空闲)接收到的数据为 1.26GB,发送的数据为 1GB
(重要提示:Kafka 不是 运行 集群,是在同一台机器上运行的单个实例,具有单个主题和单个分区。我刚刚在我的机器上下载了 Kafka,启动它并创建了一个简单的主题。当我在主题中放置消息时,拓扑中的所有内容都可以正常工作)
很明显,原因在KafkaSpout.nextTuple()
方法(下),但我不明白为什么,在Kafka中没有任何消息,我应该有这样的流量。有什么我没有考虑到的吗?这是预期的行为吗?我看了 Kafka 日志,ZK 日志,什么都没有,我已经清理了 Kafka 和 ZK 数据,什么都没有,仍然是相同的行为。
@Override
public void nextTuple() {
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
try {
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size();
EmitState state = managers.get(_currPartitionIndex).next(_collector);
if (state != EmitState.EMITTED_MORE_LEFT) {
_currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
}
if (state != EmitState.NO_EMITTED) {
break;
}
} catch (FailedFetchException e) {
LOG.warn("Fetch failed", e);
_coordinator.refresh();
}
}
long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;
/*
As far as the System.currentTimeMillis() is dependent on System clock,
additional check on negative value of diffWithNow in case of external changes.
*/
if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) {
commit();
}
}
你的bolt收到消息了吗?你的螺栓继承了 BaseRichBolt 吗?
在 Kafaspout 中注释掉那行 m.fail(id.offset) 并检查它。如果您的 bolt 没有确认,那么您的 spout 会认为消息失败并尝试重播相同的消息。
public void fail(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
//m.fail(id.offset);
}
也尝试将 nextTuple() 暂停几毫秒并检查。
如果有帮助请告诉我
Put a sleep for one second (1000ms) in the nextTuple() method and observe the traffic now, For example,
@Override
public void nextTuple() {
try {
Thread.sleep(1000);
} catch(Exception ex){
log.error("Ëxception while sleeping...",e);
}
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
...
...
...
...
}
原因是,kafka 消费者在 pull methodology which means, consumers will pull data from kafka brokers. So in consumer point of view (Kafka Spout) will do a fetch request to the kafka broker continuously which is a TCP network request. So you are facing a huge statistics on the data packet sent/received. Though the consumer doesn't consumes any message, pull request and empty response also will get account into network data packet sent/received statistics.
Your network traffic will be less if your sleeping time is high. There are also some network related configurations 的基础上为经纪人和消费者工作。对配置进行研究可能会对您有所帮助。希望对你有帮助。
在使用 KafkaSpout 和几个 Bolt 开发并执行我的 Storm (1.0.1) 拓扑后,我注意到即使在拓扑空闲时也有巨大的网络流量(Kafka 上没有消息,bolt 中没有处理) ).因此,我开始逐条注释我的拓扑结构以找出原因,现在我的主节点中只有 KafkaSpout:
....
final SpoutConfig spoutConfig = new SpoutConfig(
new ZkHosts(zkHosts, "/brokers"),
"files-topic", // topic
"/kafka", // ZK chroot
"consumer-group-name");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
topologyBuilder.setSpout(
"kafka-spout-id,
new KafkaSpout(config),
1);
....
当这个(无用的)拓扑执行时,即使在本地模式下,即使是第一次,网络流量总是增长很多:我看到(在我的 Activity 监视器中)
- 平均 432 KB 数据received/sec
- 几个小时后,拓扑结构为 运行(空闲)接收到的数据为 1.26GB,发送的数据为 1GB
(重要提示:Kafka 不是 运行 集群,是在同一台机器上运行的单个实例,具有单个主题和单个分区。我刚刚在我的机器上下载了 Kafka,启动它并创建了一个简单的主题。当我在主题中放置消息时,拓扑中的所有内容都可以正常工作)
很明显,原因在KafkaSpout.nextTuple()
方法(下),但我不明白为什么,在Kafka中没有任何消息,我应该有这样的流量。有什么我没有考虑到的吗?这是预期的行为吗?我看了 Kafka 日志,ZK 日志,什么都没有,我已经清理了 Kafka 和 ZK 数据,什么都没有,仍然是相同的行为。
@Override
public void nextTuple() {
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
try {
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size();
EmitState state = managers.get(_currPartitionIndex).next(_collector);
if (state != EmitState.EMITTED_MORE_LEFT) {
_currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
}
if (state != EmitState.NO_EMITTED) {
break;
}
} catch (FailedFetchException e) {
LOG.warn("Fetch failed", e);
_coordinator.refresh();
}
}
long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;
/*
As far as the System.currentTimeMillis() is dependent on System clock,
additional check on negative value of diffWithNow in case of external changes.
*/
if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) {
commit();
}
}
你的bolt收到消息了吗?你的螺栓继承了 BaseRichBolt 吗?
在 Kafaspout 中注释掉那行 m.fail(id.offset) 并检查它。如果您的 bolt 没有确认,那么您的 spout 会认为消息失败并尝试重播相同的消息。
public void fail(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
//m.fail(id.offset);
}
也尝试将 nextTuple() 暂停几毫秒并检查。
如果有帮助请告诉我
Put a sleep for one second (1000ms) in the nextTuple() method and observe the traffic now, For example,
@Override
public void nextTuple() {
try {
Thread.sleep(1000);
} catch(Exception ex){
log.error("Ëxception while sleeping...",e);
}
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
...
...
...
...
}
原因是,kafka 消费者在 pull methodology which means, consumers will pull data from kafka brokers. So in consumer point of view (Kafka Spout) will do a fetch request to the kafka broker continuously which is a TCP network request. So you are facing a huge statistics on the data packet sent/received. Though the consumer doesn't consumes any message, pull request and empty response also will get account into network data packet sent/received statistics.
Your network traffic will be less if your sleeping time is high. There are also some network related configurations 的基础上为经纪人和消费者工作。对配置进行研究可能会对您有所帮助。希望对你有帮助。