在 KafkaBolt 中暴露 Kafka 发布异常
Exposing Kafka publishing exceptions in KafkaBolt
我用的是KafkaBolt
in Storm to publish messages to various Kafka topics. I want to put logging and metrics around the publishing logic so I can create alerts around any exceptions that might be thrown when there's a publishing failure. Exposing those exceptions is done through a Callback
function that's passed into KafkaProducer.send()
,发布成功或失败后执行
问题是 KafkaBolt
完全封装了它的 KafkaProducer
,所以没有办法注入自定义 Callback
,所以如果我想看到任何错误,我必须查看风暴UI。我通过为 KafkaBolt
创建包装器解决了这个问题。反过来,这个包装器将把传递给 KafkaBolt.prepare()
的 OutputCollector
包装在覆盖 OutputCollector.reportError()
行为的自定义 OutputCollector
中。然后我可以在那里添加我自己的日志记录和指标报告代码,然后让它调用原始方法。
这个解决方案似乎完全可以满足我的需要,但奇怪的是 KafkaBolt
使得以编程方式访问这些异常变得如此困难。我想知道我是否遗漏了一些明显的东西,是否有更好的方法来做到这一点。
我认为您没有遗漏任何东西,您可能只是第一个有此需求的人。有人必须解决这个问题并决定修复它:)
如果您想更改螺栓以支持自定义错误处理(例如,通过允许用户按照您的建议提供回调),您可以在 https://issues.apache.org/jira/projects/STORM/issues and make a PR against https://github.com/apache/storm/pulls 提出问题。当然也欢迎您只提出问题,其他人可能会看到它并决定修复它,但您自己提供修复可能会更快。
找到螺栓代码
我用的是KafkaBolt
in Storm to publish messages to various Kafka topics. I want to put logging and metrics around the publishing logic so I can create alerts around any exceptions that might be thrown when there's a publishing failure. Exposing those exceptions is done through a Callback
function that's passed into KafkaProducer.send()
,发布成功或失败后执行
问题是 KafkaBolt
完全封装了它的 KafkaProducer
,所以没有办法注入自定义 Callback
,所以如果我想看到任何错误,我必须查看风暴UI。我通过为 KafkaBolt
创建包装器解决了这个问题。反过来,这个包装器将把传递给 KafkaBolt.prepare()
的 OutputCollector
包装在覆盖 OutputCollector.reportError()
行为的自定义 OutputCollector
中。然后我可以在那里添加我自己的日志记录和指标报告代码,然后让它调用原始方法。
这个解决方案似乎完全可以满足我的需要,但奇怪的是 KafkaBolt
使得以编程方式访问这些异常变得如此困难。我想知道我是否遗漏了一些明显的东西,是否有更好的方法来做到这一点。
我认为您没有遗漏任何东西,您可能只是第一个有此需求的人。有人必须解决这个问题并决定修复它:)
如果您想更改螺栓以支持自定义错误处理(例如,通过允许用户按照您的建议提供回调),您可以在 https://issues.apache.org/jira/projects/STORM/issues and make a PR against https://github.com/apache/storm/pulls 提出问题。当然也欢迎您只提出问题,其他人可能会看到它并决定修复它,但您自己提供修复可能会更快。
找到螺栓代码