如何使用代码中的新螺栓更新现有的 Storm 拓扑?

How to update an existing Storm topology with new bolts in code?

我正在编写一个使用 Apache Storm v1.1.2、Kafka v0.11.0.1、Zookeeper 3.4.6、Eureka 和 Cloud-Config 的 dockerized Java Spring 应用程序Docker 个由 Docker-Compose 编排的容器。

我通过 KafkaSpout 接收的元组有一个 "value" 字段,它是一个 protobuf 对象。我使用自定义反序列化器从中取出我的对象进行处理。

我有一个基本的应用程序,我有一个 bolts,它打印传入的消息并根据 protobuf 对象中的字段值将它们路由到其他特定的 bolts。我还有 LocalCluster、Config 和 TopologyBuilder 作为 Spring Beans。

目前我在 PostContruct 中设置了所有螺栓,但我需要能够动态添加螺栓,以根据 protobuf 对象的其他字段过滤传入消息并执行基本聚合功能(max/min/windowed 平均值)。

我想使用 REST 控制器执行此操作,但如何才能在不丢失数据的情况下停止和启动拓扑?我也不想从一开始就通过监听Kafka主题来重启拓扑,因为这个系统会收到非常高的负载。

这篇文章看起来很有前途,但我绝对希望整个过程自动化,所以我不会进入 Zookeeper https://community.hortonworks.com/articles/550/unofficial-storm-and-kafka-best-practices-guide.html

如何在代码中编辑现有拓扑以动态添加新螺栓?

你不能。 Storm 拓扑在提交后是静态的。如果您需要根据元组中的字段改变处理,最好的选择是预先提交您需要的所有螺栓。然后,您可以通过使用一个或多个检查元组的螺栓来改变元组在拓扑中采用的路径,并根据元组内容发送到特定流。

例如制作一个 SplitterBolt

public void execute(Tuple input) {
  if (tuple.getIntegerByField("theDecider") == 1) {
    collector.emit("onlyOnes", tuple.getValues());
  } else {
    collector.emit("others", tuple.getValues());
  }
}

您在拓扑结构构建代码中的位置会有类似

的东西
builder.setSpout("kafka-spout", ...);
builder.setBolt("splitter", new SplitterBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("countOnes", new CounterBolt()).shuffleGrouping("splitter", "onlyOnes");
builder.setBolt("countOthers", new CounterBolt()).shuffleGrouping("splitter", "others");