后台线程和任务

Background Threads and Tasks

我正在尝试找到从专用后台线程 运行 任务的最佳方法。

使用上下文正在从 Kafka 主题消费并引发异步事件处理程序来处理 ConsumeResult 实例。

Kafka 消费者(下面的 consumer 实例)阻塞线程,直到消息被消费或传递的 CancellationToken 被取消。

consumeThread = new Thread(Consume)
{
    Name = "Kafka Consumer Thread",
    IsBackground = true,
};

这是我想出的Consume方法的实现,由上面的专用线程启动:

private void Consume(object _)
{
    try
    {
        while (!cancellationTokenSource.IsCancellationRequested)
        {
            var consumeResult = consumer.Consume(cancellationTokenSource.Token);

            var consumeResultEventArgs = new ConsumeResultReceivedEventArgs<TKey, TValue>(
                consumer, consumeResult, cancellationTokenSource.Token);

            _ = Task.Run(async () =>
            {
                if (onConsumeResultReceived is null) continue;

                var handlerInstances = onConsumeResultReceived.GetInvocationList();
                foreach (ConsumeResultReceivedEventHandler<TKey, TValue> handlerInstance in handlerInstances)
                {
                    if (cancellationTokenSource.IsCancellationRequested) return;                        
                    await handlerInstance(this, consumeResultEventArgs).ConfigureAwait(false);                            
                }

            }, cancellationTokenSource.Token);
        }
    }
    catch (OperationCanceledException)
    {

    }
    catch (ThreadInterruptedException)
    {

    }
    catch (ThreadAbortException)
    {
        // Aborting a thread is not implemented in .NET Core.
    }
}

我不确定这是从专用线程 运行 任务的推荐方式,因此非常感谢任何建议。

我完全不清楚为什么您需要专用线程。目前的代码启动一个线程,然后该线程阻塞以供使用,然后在线程池线程上引发事件处理程序。

_ = Task.Run 惯用语是“即发即弃”,从某种意义上说这是危险的,因为它会默默地吞下事件引发代码或事件处理程序中的任何异常。

我建议将 Thread 替换为 Task.Run,并直接引发事件处理程序:

consumeTask = Task.Run(ConsumeAsync);

private async Task ConsumeAsync()
{
  while (true)
  {
    var consumeResult = consumer.Consume(cancellationTokenSource.Token);
    var consumeResultEventArgs = new ConsumeResultReceivedEventArgs<TKey, TValue>(
        consumer, consumeResult, cancellationTokenSource.Token);

    if (onConsumeResultReceived is null) continue;

    var handlerInstances = onConsumeResultReceived.GetInvocationList();
    foreach (ConsumeResultReceivedEventHandler<TKey, TValue> handlerInstance in handlerInstances)
    {
      if (cancellationTokenSource.IsCancellationRequested) return;
      await handlerInstance(this, consumeResultEventArgs).ConfigureAwait(false);
    }
  }
}