在kafka中复用消费者和生产者
multiplexing consumer and producer in kafka
在我的 kafka 消费者线程(高级)中,在我消费了一条消息之后,我正在对该消息应用一些业务逻辑并将其转发到 WS。但是这个 web 服务有时可能会关闭,因为我从 kafka 消费了这个对象并且偏移量向前移动,所以我会错过这个对象。
摆脱这个问题的一种方法是在 zookeeper 中禁用自动提交并通过调用编程来提交偏移量,但我预计这是一个非常昂贵的操作。我将以大约 2000 tps 的速度生产到 kafka,以后可能会增加。
另一种方法——我不确定这是否是个好主意——如果我遇到任何问题,再次将这个消耗的对象生成给 kafka,但我没有看到任何与此相关的 post在我所有的谷歌搜索中。这是一件连小事都不算的事吗?
你能给我一些处理这种情况的见解吗?
谢谢
将此类消息移至错误队列并稍后重试是一种众所周知的方法。
您可以post将失败的消息返回到同一主题或您选择的另一个主题。
如果您使用相同的主题,您将推送主题末尾的消息,并且它们将在其他主题之后被提取(因此如果顺序对您来说很重要,请不要这样做)。此外,如果您在发送消息之前执行的操作不是幂等的,您将需要一些东西来识别这些记录,这样它们就不会执行两次操作。
如果你使用 failed_topic,你可以将你不能发送的消息推送到这个主题,当 WS 再次健康时你需要创建一个消费者来消费那里的所有消息并发送他们到 WS。
希望对您有所帮助!
在我的 kafka 消费者线程(高级)中,在我消费了一条消息之后,我正在对该消息应用一些业务逻辑并将其转发到 WS。但是这个 web 服务有时可能会关闭,因为我从 kafka 消费了这个对象并且偏移量向前移动,所以我会错过这个对象。
摆脱这个问题的一种方法是在 zookeeper 中禁用自动提交并通过调用编程来提交偏移量,但我预计这是一个非常昂贵的操作。我将以大约 2000 tps 的速度生产到 kafka,以后可能会增加。
另一种方法——我不确定这是否是个好主意——如果我遇到任何问题,再次将这个消耗的对象生成给 kafka,但我没有看到任何与此相关的 post在我所有的谷歌搜索中。这是一件连小事都不算的事吗?
你能给我一些处理这种情况的见解吗?
谢谢
将此类消息移至错误队列并稍后重试是一种众所周知的方法。
您可以post将失败的消息返回到同一主题或您选择的另一个主题。
如果您使用相同的主题,您将推送主题末尾的消息,并且它们将在其他主题之后被提取(因此如果顺序对您来说很重要,请不要这样做)。此外,如果您在发送消息之前执行的操作不是幂等的,您将需要一些东西来识别这些记录,这样它们就不会执行两次操作。
如果你使用 failed_topic,你可以将你不能发送的消息推送到这个主题,当 WS 再次健康时你需要创建一个消费者来消费那里的所有消息并发送他们到 WS。
希望对您有所帮助!