Apache Storm 连接螺栓
Apache Storm JoinBolt
我正在尝试使用 JoinBolt 和以下代码片段 (http://storm.apache.org/releases/1.1.2/Joins.html)
将两个 kafka 数据流(使用 kafka spouts)合并为一个
它说每个 JoinBolt 的传入数据流都必须是在单个字段上分组的字段。一个流应该只使用它被 FieldsGrouped
所在的字段与其他流连接
代码段:
KafkaSpout kafka_spout_1 = SpoutBuilder.buildSpout("127.0.0.1:2181","test-topic-1", "/spout-1", "spout-1");//String zkHosts, String topic, String zkRoot, String spoutId
KafkaSpout kafka_spout_2 = SpoutBuilder.buildSpout("127.0.0.1:2181","test-topic-2", "/spout-2", "spout-2");//String zkHosts, String topic, String zkRoot, String spoutId
topologyBuilder.setSpout("kafka-spout-1", kafka_spout_1, 1);
topologyBuilder.setSpout("kafka-spout-2", kafka_spout_2, 1);
JoinBolt joinBolt = new JoinBolt("kafka-spout-1", "id")
.join("kafka-spout-2", "deptId", "kafka-spout-1")
.select("id,deptId,firstName,deptName")
.withTumblingWindow(new Duration(10, TimeUnit.SECONDS));
topologyBuilder.setBolt("joiner", joinBolt, 1)
.fieldsGrouping("spout-1", new Fields("id"))
.fieldsGrouping("spout-2", new Fields("deptId"));
kafka-spout-1 样本记录 --> {"id" : 1 ,"firstName" : "Alyssa" , "lastName" : "Parker"}
kafka-spout-2 示例记录 --> {"deptId" : 1 ,"deptName" : "Engineering"}
我在使用上面的代码片段
部署拓扑时遇到以下异常
[main] WARN o.a.s.StormSubmitter - Topology submission exception: Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"}
java.lang.RuntimeException: InvalidTopologyException(msg:Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"})
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159)
at BuildTopology.runTopology(BuildTopology.java:71)
at Main.main(Main.java:6)
Caused by: InvalidTopologyException(msg:Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"})
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070)
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047)
at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981)
at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306)
at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290)
at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326)
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260)
... 4 more
如何解决问题?
谢谢,如有任何帮助,我们将不胜感激
如果您正在进行新开发,请考虑使用 storm-kafka-client 而不是 storm-kafka。 Storm-kafka 已弃用。
喷口真的会发出一个叫做 "deptId" 的场吗?
您的配置片段没有提到您设置了 SpoutConfig.scheme
,并且您的示例记录似乎暗示您正在发出包含 "deptId" 字段的 JSON 文档。
Storm 对 JSON 或从 spout 出来的字符串的内容一无所知。您需要定义一个方案,使 spout 发出与记录的其余部分分开的 "deptId" 字段。
以下是其中一个内置方案的相关片段,该方案在单独的字段中发出消息、主题和偏移量:
@Override
public List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
String stringMessage = StringScheme.deserializeString(message);
return new Values(stringMessage, partition.partition, offset);
}
@Override
public Fields getOutputFields() {
return new Fields(STRING_SCHEME_KEY, STRING_SCHEME_PARTITION_KEY, STRING_SCHEME_OFFSET);
}
使用方案执行此操作的另一种方法是在 spout 和 JoinBolt 之间创建一个螺栓,从记录中提取 "deptId" 并将其作为字段与记录一起发出。
我正在尝试使用 JoinBolt 和以下代码片段 (http://storm.apache.org/releases/1.1.2/Joins.html)
将两个 kafka 数据流(使用 kafka spouts)合并为一个它说每个 JoinBolt 的传入数据流都必须是在单个字段上分组的字段。一个流应该只使用它被 FieldsGrouped
所在的字段与其他流连接代码段:
KafkaSpout kafka_spout_1 = SpoutBuilder.buildSpout("127.0.0.1:2181","test-topic-1", "/spout-1", "spout-1");//String zkHosts, String topic, String zkRoot, String spoutId
KafkaSpout kafka_spout_2 = SpoutBuilder.buildSpout("127.0.0.1:2181","test-topic-2", "/spout-2", "spout-2");//String zkHosts, String topic, String zkRoot, String spoutId
topologyBuilder.setSpout("kafka-spout-1", kafka_spout_1, 1);
topologyBuilder.setSpout("kafka-spout-2", kafka_spout_2, 1);
JoinBolt joinBolt = new JoinBolt("kafka-spout-1", "id")
.join("kafka-spout-2", "deptId", "kafka-spout-1")
.select("id,deptId,firstName,deptName")
.withTumblingWindow(new Duration(10, TimeUnit.SECONDS));
topologyBuilder.setBolt("joiner", joinBolt, 1)
.fieldsGrouping("spout-1", new Fields("id"))
.fieldsGrouping("spout-2", new Fields("deptId"));
kafka-spout-1 样本记录 --> {"id" : 1 ,"firstName" : "Alyssa" , "lastName" : "Parker"}
kafka-spout-2 示例记录 --> {"deptId" : 1 ,"deptName" : "Engineering"}
我在使用上面的代码片段
部署拓扑时遇到以下异常[main] WARN o.a.s.StormSubmitter - Topology submission exception: Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"}
java.lang.RuntimeException: InvalidTopologyException(msg:Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"})
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159)
at BuildTopology.runTopology(BuildTopology.java:71)
at Main.main(Main.java:6)
Caused by: InvalidTopologyException(msg:Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"})
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070)
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047)
at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981)
at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306)
at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290)
at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326)
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260)
... 4 more
如何解决问题?
谢谢,如有任何帮助,我们将不胜感激
如果您正在进行新开发,请考虑使用 storm-kafka-client 而不是 storm-kafka。 Storm-kafka 已弃用。
喷口真的会发出一个叫做 "deptId" 的场吗?
您的配置片段没有提到您设置了 SpoutConfig.scheme
,并且您的示例记录似乎暗示您正在发出包含 "deptId" 字段的 JSON 文档。
Storm 对 JSON 或从 spout 出来的字符串的内容一无所知。您需要定义一个方案,使 spout 发出与记录的其余部分分开的 "deptId" 字段。
以下是其中一个内置方案的相关片段,该方案在单独的字段中发出消息、主题和偏移量:
@Override
public List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
String stringMessage = StringScheme.deserializeString(message);
return new Values(stringMessage, partition.partition, offset);
}
@Override
public Fields getOutputFields() {
return new Fields(STRING_SCHEME_KEY, STRING_SCHEME_PARTITION_KEY, STRING_SCHEME_OFFSET);
}
使用方案执行此操作的另一种方法是在 spout 和 JoinBolt 之间创建一个螺栓,从记录中提取 "deptId" 并将其作为字段与记录一起发出。