风暴螺栓的线程安全
Thread safe of storm bolt
我有一个螺栓可以从其他螺栓收集数据。一旦bolt中有足够的数据或者时钟到了,这个bolt会根据收集到的数据做一些耗时的工作。
我的问题是,在耗时的作业中,其他bolt应该一直向这个bolt发送数据,这样会不会造成线程安全的问题? 运行个耗时作业需要加锁吗?
有个类似的post,但是没看懂
考虑到下面的字数统计螺栓,假设它只有一个实例,当一条消息到达该螺栓时,它会立即确认。由于 Map 是空的,因此它将启动一个耗时的作业。与此同时,据我所知,其他 spouts/bolts 现在仍然 运行 并继续向这个螺栓发送消息。但是,由于第一条消息尚未映射,这些新消息将一次又一次地启动耗时的工作。这不会导致线程安全问题吗?
public static class WordCount extends BaseRichBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple) {
collect.ack(tuple); // ack upon receiving the message
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null){
// doing time-consuming job here
count = 0;
}
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
}
Storm Bolts 是线程安全的 (link)。
在螺栓执行耗时操作时将数据发送到螺栓是可以的。您可以在 Storm UI 中调整螺栓的容量,如果遇到问题,只需增加并行度即可。
我有一个螺栓可以从其他螺栓收集数据。一旦bolt中有足够的数据或者时钟到了,这个bolt会根据收集到的数据做一些耗时的工作。
我的问题是,在耗时的作业中,其他bolt应该一直向这个bolt发送数据,这样会不会造成线程安全的问题? 运行个耗时作业需要加锁吗?
有个类似的post
考虑到下面的字数统计螺栓,假设它只有一个实例,当一条消息到达该螺栓时,它会立即确认。由于 Map 是空的,因此它将启动一个耗时的作业。与此同时,据我所知,其他 spouts/bolts 现在仍然 运行 并继续向这个螺栓发送消息。但是,由于第一条消息尚未映射,这些新消息将一次又一次地启动耗时的工作。这不会导致线程安全问题吗?
public static class WordCount extends BaseRichBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple) {
collect.ack(tuple); // ack upon receiving the message
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null){
// doing time-consuming job here
count = 0;
}
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
}
Storm Bolts 是线程安全的 (link)。 在螺栓执行耗时操作时将数据发送到螺栓是可以的。您可以在 Storm UI 中调整螺栓的容量,如果遇到问题,只需增加并行度即可。