将消息重定向到相应的工作 thread/Task

Redirect message to the corresponding working thread/Task

我正在使用 Kafka 并尝试实现一个订阅所有想要的主题的消费者。假设我有 3 个主题(A、B、C)。如何同步处理来自每个主题的消息,但并行处理主题之间的消息。因此,对于A主题,我需要一条一条地处理消息,但同时我需要一条一条地处理来自其他主题的消息。

看来我需要为每个主题单独讨论。您能否建议如何实施?是否有一些现成的解决方案?我的消费者看起来像

while (!cancellationToken.IsCancellationRequested)
{
    ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);

    ... processing here

    _consumer.Commit(consumeResult);
}

我不知道如何实现它,因为还没有在 C# 中使用 asnyc 的经验。发现像 Reactive Extensions https://gist.github.com/omnibs/6b2cbdba2685693448ee6779736a00c2 这样的东西。

当我收到消息时,如何将消息重定向到相应的工作thread/Task?

使用 Confluent.Kafka 1.3.0 包与 Kafka 一起工作

您应该为每个发布消息的主题创建 1 个可观察对象。然后订阅观察者上的每个可用主题。

看看TPL DataFlow, namely the ActionBlock or Channels

var channels = new Dictionary<string, ActionBlock<ConsumeResult<string, string>>>();
foreach (var topic in _consumer.Subscription)
{
    channels.Add(topic, new ActionBlock<ConsumeResult<string, string>>(async consumeResult =>
    {
        ... processing here
        _consumer.Commit(consumeResult);
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }));
}

此代码将为您订阅的所有主题创建 ActionBlock

然后当您收到消息时,将其重定向到相应的频道

while (!cancellationToken.IsCancellationRequested)
{
   ConsumeResult<string, string> consumeResult = _consumer.Consume();
   await channels[consumeResult.Topic].SendAsync(consumeResult);
}

阅读来自@PauloMorgado 回答的文章