如何使用 spring 云流和 kafka 活页夹对某些消息进行优先级排序

How to prioritize some messages using spring cloud stream with the kafka binder

TLDR:我希望能够在同一主题中先处理一些紧急消息。

更多解释:

我正在开发一个相当大的应用程序,它使用许多 spring 微服务,这些微服务依赖于 spring-cloud-stream 和 kafka-binder 在某些任务中相互通信。

计划任务可能会产生大量事件。当这些事件被消费时,可能会产生并发送中间事件,并且可能需要相当长的时间才能消费完所有初始和中间消息并完成所有工作。

在此期间,如果有人要手动发送相同类型的事件,只有在其他一切都完成后才会排队处理。

有人问我是否可以让这些 "manual events" 在所有其他事件之前优先处理。

通常在这种情况下,您会看到人们提倡复制所有主题(例如 topic-normaltopic-urgent),复制 @StreamListener。这似乎是一个相当复杂的选择,因为我们这里涉及到很多不同的主题。我们还有一些 kafka-streams 聚合需要重写以考虑到这一点。

与一位开发人员同事,我们正在考虑为每个主题复制消费者组(consumerGroupNormalconsumerGroupUrgentusing),以便需要紧急消息。然后,我会为每条消息添加优先级 header,复制 StreamListener 并仅在其中一个侦听器中使用 priority: urgent header 的消息,并且仅使用 [= 的消息18=] 在另一个监听器中。

我可能还会创建一个 ChannelInterceptorAdapter 来自动添加和读取此 "priority" header (a bit like Sleuth does it)。

但我仍然需要为绑定器添加大量配置,并且需要复制监听器,添加检查 header 并执行或不执行监听器代码的小代码。

总而言之,对于一些看起来很常见且简单的用例可能对其他人有用的东西来说,这看起来需要做很多工作,所以我想知道是否有更简单的方法来做到这一点?

在每个主题上使用两个消费者组几乎是唯一的方法,并且有两个听众。

但是,您不需要自定义频道拦截器;你可以使用 condition 属性 之类的 "headers['foo'].equals('urgent')".