将消息重定向到相应的工作 thread/Task
Redirect message to the corresponding working thread/Task
我正在使用 Kafka 并尝试实现一个订阅所有想要的主题的消费者。假设我有 3 个主题(A、B、C)。如何同步处理来自每个主题的消息,但并行处理主题之间的消息。因此,对于A主题,我需要一条一条地处理消息,但同时我需要一条一条地处理来自其他主题的消息。
看来我需要为每个主题单独讨论。您能否建议如何实施?是否有一些现成的解决方案?我的消费者看起来像
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
... processing here
_consumer.Commit(consumeResult);
}
我不知道如何实现它,因为还没有在 C# 中使用 asnyc 的经验。发现像 Reactive Extensions https://gist.github.com/omnibs/6b2cbdba2685693448ee6779736a00c2 这样的东西。
当我收到消息时,如何将消息重定向到相应的工作thread/Task?
使用 Confluent.Kafka 1.3.0 包与 Kafka 一起工作
您应该为每个发布消息的主题创建 1 个可观察对象。然后订阅观察者上的每个可用主题。
看看TPL DataFlow, namely the ActionBlock or Channels。
var channels = new Dictionary<string, ActionBlock<ConsumeResult<string, string>>>();
foreach (var topic in _consumer.Subscription)
{
channels.Add(topic, new ActionBlock<ConsumeResult<string, string>>(async consumeResult =>
{
... processing here
_consumer.Commit(consumeResult);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }));
}
此代码将为您订阅的所有主题创建 ActionBlock。
然后当您收到消息时,将其重定向到相应的频道
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume();
await channels[consumeResult.Topic].SendAsync(consumeResult);
}
阅读来自@PauloMorgado 回答的文章
我正在使用 Kafka 并尝试实现一个订阅所有想要的主题的消费者。假设我有 3 个主题(A、B、C)。如何同步处理来自每个主题的消息,但并行处理主题之间的消息。因此,对于A主题,我需要一条一条地处理消息,但同时我需要一条一条地处理来自其他主题的消息。
看来我需要为每个主题单独讨论。您能否建议如何实施?是否有一些现成的解决方案?我的消费者看起来像
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
... processing here
_consumer.Commit(consumeResult);
}
我不知道如何实现它,因为还没有在 C# 中使用 asnyc 的经验。发现像 Reactive Extensions https://gist.github.com/omnibs/6b2cbdba2685693448ee6779736a00c2 这样的东西。
当我收到消息时,如何将消息重定向到相应的工作thread/Task?
使用 Confluent.Kafka 1.3.0 包与 Kafka 一起工作
您应该为每个发布消息的主题创建 1 个可观察对象。然后订阅观察者上的每个可用主题。
看看TPL DataFlow, namely the ActionBlock or Channels。
var channels = new Dictionary<string, ActionBlock<ConsumeResult<string, string>>>();
foreach (var topic in _consumer.Subscription)
{
channels.Add(topic, new ActionBlock<ConsumeResult<string, string>>(async consumeResult =>
{
... processing here
_consumer.Commit(consumeResult);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }));
}
此代码将为您订阅的所有主题创建 ActionBlock。
然后当您收到消息时,将其重定向到相应的频道
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume();
await channels[consumeResult.Topic].SendAsync(consumeResult);
}
阅读来自@PauloMorgado 回答的文章