如何使用 apache 风暴元组
How to use apache storm tuple
我刚开始使用 Apache Storm。我阅读了教程并查看了 examples 我的问题是所有示例都使用非常简单的元组(通常是一个带有字符串的文件)。元组是内联创建的(使用 new Values(...))。在我的例子中,我有很多字段 (5..100) 的元组。所以我的问题是如何为每个字段实现具有名称和类型(所有原语)的元组?
有例子吗? (我认为直接实施 "Tuple" 不是一个好主意)
谢谢
创建将所有字段作为值的元组的替代方法是创建一个 bean 并将其传递到元组中。
给出以下 class:
public class DataBean implements Serializable {
private static final long serialVersionUID = 1L;
// add more properties as necessary
int id;
String word;
public DataBean(int id, String word) {
setId(id);
setWord(word);
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
}
一次创建并发出 DataBean:
collector.emit(new Values(bean));
在目标 bolt 中获取 DataBean:
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
DataBean bean = (DataBean)tuple.getValue(0);
// do your bolt processing with the bean
} catch (Exception e) {
LOG.error("WordCountBolt error", e);
collector.reportError(e);
}
}
不要忘记在设置拓扑时使 bean 可序列化并注册:
Config stormConfig = new Config();
stormConfig.registerSerialization(DataBean.class);
// more stuff
StormSubmitter.submitTopology("MyTopologyName", stormConfig, builder.createTopology());
免责声明:Bean 可以很好地进行随机分组。如果你需要做一个fieldsGrouping
,你仍然应该使用原语。例如,在字数统计场景中,您需要按字分组,这样您可能会发出:
collector.emit(new Values(word, bean));
我将按如下方式实现自定义 tuple/value 类型:不是使用成员变量来存储数据,而是将每个属性映射到继承 Values
类型的对象列表中的固定索引.这种方法避免了常规 Bean 的 "field grouping" 问题。
- 它不需要为字段分组添加额外的属性(这很不自然)
- 避免数据重复(减少传送的字节数)
- 它保留了 beans 模式的优势
字数统计示例如下所示:
public class WordCountTuple extends Values {
private final static long serialVersionUID = -4386109322233754497L;
// attribute indexes
/** The index of the word attribute. */
public final static int WRD_IDX = 0;
/** The index of the count attribute. */
public final static int CNT_IDX = 1;
// attribute names
/** The name of the word attribute. */
public final static String WRD_ATT = "word";
/** The name of the count attribute. */
public final static String CNT_ATT = "count";
// required for serialization
public WordCountTuple() {}
public WordCountTuple(String word, int count) {
super.add(WRD_IDX, word);
super.add(CNT_IDX, count);
}
public String getWord() {
return (String)super.get(WRD_IDX);
}
public void setWort(String word) {
super.set(WRD_IDX, word);
}
public int getCount() {
return (Integer)super.get(CNT_IDX);
}
public void setCount(int count) {
super.set(CNT_IDX, count);
}
public static Fields getSchema() {
return new Fields(WRD_ATT, CNT_ATT);
}
}
为避免不一致,使用 "word" 和 "count" 属性的 final static
变量。此外,方法 getSchema()
returns 实现的模式用于在 Spout/Bolt 方法 .declareOutputFields(...)
中声明输出流
对于输出元组,可以直接使用这种类型:
public MyOutBolt implements IRichBolt {
@Override
public void execute(Tuple tuple) {
// some more processing
String word = ...
int cnt = ...
collector.emit(new WordCountTuple(word, cnt));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(WordCountTuple.getSchema());
}
// other methods omitted
}
对于输入元组,我建议采用以下模式:
public MyInBolt implements IRichBolt {
// use a single instance for avoid GC trashing
private final WordCountTuple input = new WordCountTuple();
@Override
public void execute(Tuple tuple) {
this.input.clear();
this.input.addAll(tuple.getValues());
String word = input.getWord();
int count = input.getCount();
// do further processing
}
// other methods omitted
}
MyOutBolt
和MyInBolt
可以这样连接:
TopologyBuilder b = ...
b.setBolt("out", new MyOutBolt());
b.setBolt("in", new MyInBolt()).fieldsGrouping("out", WordCountTuple.WRD_ATT);
使用字段分组非常简单,因为 WordCountTuple
允许单独访问每个属性。
我刚开始使用 Apache Storm。我阅读了教程并查看了 examples 我的问题是所有示例都使用非常简单的元组(通常是一个带有字符串的文件)。元组是内联创建的(使用 new Values(...))。在我的例子中,我有很多字段 (5..100) 的元组。所以我的问题是如何为每个字段实现具有名称和类型(所有原语)的元组?
有例子吗? (我认为直接实施 "Tuple" 不是一个好主意)
谢谢
创建将所有字段作为值的元组的替代方法是创建一个 bean 并将其传递到元组中。
给出以下 class:
public class DataBean implements Serializable {
private static final long serialVersionUID = 1L;
// add more properties as necessary
int id;
String word;
public DataBean(int id, String word) {
setId(id);
setWord(word);
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
}
一次创建并发出 DataBean:
collector.emit(new Values(bean));
在目标 bolt 中获取 DataBean:
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
DataBean bean = (DataBean)tuple.getValue(0);
// do your bolt processing with the bean
} catch (Exception e) {
LOG.error("WordCountBolt error", e);
collector.reportError(e);
}
}
不要忘记在设置拓扑时使 bean 可序列化并注册:
Config stormConfig = new Config();
stormConfig.registerSerialization(DataBean.class);
// more stuff
StormSubmitter.submitTopology("MyTopologyName", stormConfig, builder.createTopology());
免责声明:Bean 可以很好地进行随机分组。如果你需要做一个fieldsGrouping
,你仍然应该使用原语。例如,在字数统计场景中,您需要按字分组,这样您可能会发出:
collector.emit(new Values(word, bean));
我将按如下方式实现自定义 tuple/value 类型:不是使用成员变量来存储数据,而是将每个属性映射到继承 Values
类型的对象列表中的固定索引.这种方法避免了常规 Bean 的 "field grouping" 问题。
- 它不需要为字段分组添加额外的属性(这很不自然)
- 避免数据重复(减少传送的字节数)
- 它保留了 beans 模式的优势
字数统计示例如下所示:
public class WordCountTuple extends Values {
private final static long serialVersionUID = -4386109322233754497L;
// attribute indexes
/** The index of the word attribute. */
public final static int WRD_IDX = 0;
/** The index of the count attribute. */
public final static int CNT_IDX = 1;
// attribute names
/** The name of the word attribute. */
public final static String WRD_ATT = "word";
/** The name of the count attribute. */
public final static String CNT_ATT = "count";
// required for serialization
public WordCountTuple() {}
public WordCountTuple(String word, int count) {
super.add(WRD_IDX, word);
super.add(CNT_IDX, count);
}
public String getWord() {
return (String)super.get(WRD_IDX);
}
public void setWort(String word) {
super.set(WRD_IDX, word);
}
public int getCount() {
return (Integer)super.get(CNT_IDX);
}
public void setCount(int count) {
super.set(CNT_IDX, count);
}
public static Fields getSchema() {
return new Fields(WRD_ATT, CNT_ATT);
}
}
为避免不一致,使用 "word" 和 "count" 属性的 final static
变量。此外,方法 getSchema()
returns 实现的模式用于在 Spout/Bolt 方法 .declareOutputFields(...)
对于输出元组,可以直接使用这种类型:
public MyOutBolt implements IRichBolt {
@Override
public void execute(Tuple tuple) {
// some more processing
String word = ...
int cnt = ...
collector.emit(new WordCountTuple(word, cnt));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(WordCountTuple.getSchema());
}
// other methods omitted
}
对于输入元组,我建议采用以下模式:
public MyInBolt implements IRichBolt {
// use a single instance for avoid GC trashing
private final WordCountTuple input = new WordCountTuple();
@Override
public void execute(Tuple tuple) {
this.input.clear();
this.input.addAll(tuple.getValues());
String word = input.getWord();
int count = input.getCount();
// do further processing
}
// other methods omitted
}
MyOutBolt
和MyInBolt
可以这样连接:
TopologyBuilder b = ...
b.setBolt("out", new MyOutBolt());
b.setBolt("in", new MyInBolt()).fieldsGrouping("out", WordCountTuple.WRD_ATT);
使用字段分组非常简单,因为 WordCountTuple
允许单独访问每个属性。