卡夫卡螺栓之后的风暴螺栓
Storm bolt following a kafka bolt
我有一个 Storm 拓扑,我必须将输出发送到 kafka 并更新 redis 中的值。为此,我有一个 Kafkabolt 和一个 RedisBolt。
下面是我的拓扑结构 -
tp.setSpout("kafkaSpout", kafkaSpout, 3);
tp.setBolt("EvaluatorBolt", evaluatorBolt, 6).shuffleGrouping("kafkaStream");
tp.setBolt("ResultToRedisBolt",ResultsToRedisBolt,3).shuffleGrouping("EvaluatorBolt","ResultStream");
tp.setBolt("ResultToKafkaBolt", ResultsToKafkaBolt, 3).shuffleGrouping("EvaluatorBolt","ResultStream");
问题是两个末端螺栓(Redis 和 Kafka)都在监听来自前面螺栓(ResultStream)的同一个流,因此它们都可能独立失败。我真正需要的是,如果结果在 Kafka 中发布成功,那么只有我更新 Redis 中的值。有没有办法从 kafkaBolt 获得输出流,我可以在其中将消息成功发布到 Kafka?然后我可能可以在我的 RedisBolt 中收听该流并采取相应的行动。
目前是不可能的,除非你修改螺栓代码。您可能最好稍微更改您的设计,因为在将元组写入 Kafka 之后进行额外处理有一些缺点。如果您将元组写入 Kafka 而未能写入 Redis,您将在 Kafka 中得到重复项,因为处理将从 spout 重新开始。
根据您的用例,将结果写入 Kafka,然后让另一个拓扑从 Kafka 读取结果并写入 Redis 可能会更好。
如果您仍然需要能够从螺栓发出新的元组,它应该很容易实现。 Bolt 最近获得了添加自定义 Producer 回调的能力,因此我们可以扩展该机制。
有关上下文,请参阅 https://github.com/apache/storm/pull/2790#issuecomment-411709331 上的讨论。
我有一个 Storm 拓扑,我必须将输出发送到 kafka 并更新 redis 中的值。为此,我有一个 Kafkabolt 和一个 RedisBolt。 下面是我的拓扑结构 -
tp.setSpout("kafkaSpout", kafkaSpout, 3);
tp.setBolt("EvaluatorBolt", evaluatorBolt, 6).shuffleGrouping("kafkaStream");
tp.setBolt("ResultToRedisBolt",ResultsToRedisBolt,3).shuffleGrouping("EvaluatorBolt","ResultStream");
tp.setBolt("ResultToKafkaBolt", ResultsToKafkaBolt, 3).shuffleGrouping("EvaluatorBolt","ResultStream");
问题是两个末端螺栓(Redis 和 Kafka)都在监听来自前面螺栓(ResultStream)的同一个流,因此它们都可能独立失败。我真正需要的是,如果结果在 Kafka 中发布成功,那么只有我更新 Redis 中的值。有没有办法从 kafkaBolt 获得输出流,我可以在其中将消息成功发布到 Kafka?然后我可能可以在我的 RedisBolt 中收听该流并采取相应的行动。
目前是不可能的,除非你修改螺栓代码。您可能最好稍微更改您的设计,因为在将元组写入 Kafka 之后进行额外处理有一些缺点。如果您将元组写入 Kafka 而未能写入 Redis,您将在 Kafka 中得到重复项,因为处理将从 spout 重新开始。
根据您的用例,将结果写入 Kafka,然后让另一个拓扑从 Kafka 读取结果并写入 Redis 可能会更好。
如果您仍然需要能够从螺栓发出新的元组,它应该很容易实现。 Bolt 最近获得了添加自定义 Producer 回调的能力,因此我们可以扩展该机制。
有关上下文,请参阅 https://github.com/apache/storm/pull/2790#issuecomment-411709331 上的讨论。