Apache Storm 连接螺栓

Apache Storm JoinBolt

我正在尝试使用 JoinBolt 和以下代码片段 (http://storm.apache.org/releases/1.1.2/Joins.html)

将两个 kafka 数据流(使用 kafka spouts)合并为一个

它说每个 JoinBolt 的传入数据流都必须是在单个字段上分组的字段。一个流应该只使用它被 FieldsGrouped

所在的字段与其他流连接

代码段:

    KafkaSpout kafka_spout_1 = SpoutBuilder.buildSpout("127.0.0.1:2181","test-topic-1", "/spout-1", "spout-1");//String zkHosts, String topic, String zkRoot, String spoutId

    KafkaSpout kafka_spout_2 = SpoutBuilder.buildSpout("127.0.0.1:2181","test-topic-2", "/spout-2", "spout-2");//String zkHosts, String topic, String zkRoot, String spoutId

    topologyBuilder.setSpout("kafka-spout-1", kafka_spout_1, 1);

    topologyBuilder.setSpout("kafka-spout-2", kafka_spout_2, 1);

    JoinBolt joinBolt = new JoinBolt("kafka-spout-1", "id")
                     .join("kafka-spout-2", "deptId", "kafka-spout-1")             
                     .select("id,deptId,firstName,deptName")
                  .withTumblingWindow(new Duration(10, TimeUnit.SECONDS));

topologyBuilder.setBolt("joiner", joinBolt, 1)
            .fieldsGrouping("spout-1", new Fields("id"))
            .fieldsGrouping("spout-2", new Fields("deptId"));

kafka-spout-1 样本记录 --> {"id" : 1 ,"firstName" : "Alyssa" , "lastName" : "Parker"}

kafka-spout-2 示例记录 --> {"deptId" : 1 ,"deptName" : "Engineering"}

我在使用上面的代码片段

部署拓扑时遇到以下异常
[main] WARN  o.a.s.StormSubmitter - Topology submission exception: Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"}
java.lang.RuntimeException: InvalidTopologyException(msg:Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"})
    at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273)
    at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387)
    at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159)
    at BuildTopology.runTopology(BuildTopology.java:71)
    at Main.main(Main.java:6)
Caused by: InvalidTopologyException(msg:Component: [joiner] subscribes from stream: [default] of component [kafka-spout-2] with non-existent fields: #{"deptId"})
    at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070)
    at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047)
    at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981)
    at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
    at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306)
    at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290)
    at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326)
    at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260)
    ... 4 more

如何解决问题?

谢谢,如有任何帮助,我们将不胜感激

如果您正在进行新开发,请考虑使用 storm-kafka-client 而不是 storm-kafka。 Storm-kafka 已弃用。

喷口真的会发出一个叫做 "deptId" 的场吗?

您的配置片段没有提到您设置了 SpoutConfig.scheme,并且您的示例记录似乎暗示您正在发出包含 "deptId" 字段的 JSON 文档。

Storm 对 JSON 或从 spout 出来的字符串的内容一无所知。您需要定义一个方案,使 spout 发出与记录的其余部分分开的 "deptId" 字段。

以下是其中一个内置方案的相关片段,该方案在单独的字段中发出消息、主题和偏移量:

@Override
    public List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
        String stringMessage = StringScheme.deserializeString(message);
        return new Values(stringMessage, partition.partition, offset);
    }

    @Override
    public Fields getOutputFields() {
        return new Fields(STRING_SCHEME_KEY, STRING_SCHEME_PARTITION_KEY, STRING_SCHEME_OFFSET);
}

参见 https://github.com/apache/storm/blob/v1.2.2/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java 以供参考。

使用方案执行此操作的另一种方法是在 spout 和 JoinBolt 之间创建一个螺栓,从记录中提取 "deptId" 并将其作为字段与记录一起发出。