后台线程和任务
Background Threads and Tasks
我正在尝试找到从专用后台线程 运行 任务的最佳方法。
使用上下文正在从 Kafka 主题消费并引发异步事件处理程序来处理 ConsumeResult 实例。
Kafka 消费者(下面的 consumer
实例)阻塞线程,直到消息被消费或传递的 CancellationToken 被取消。
consumeThread = new Thread(Consume)
{
Name = "Kafka Consumer Thread",
IsBackground = true,
};
这是我想出的Consume方法的实现,由上面的专用线程启动:
private void Consume(object _)
{
try
{
while (!cancellationTokenSource.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationTokenSource.Token);
var consumeResultEventArgs = new ConsumeResultReceivedEventArgs<TKey, TValue>(
consumer, consumeResult, cancellationTokenSource.Token);
_ = Task.Run(async () =>
{
if (onConsumeResultReceived is null) continue;
var handlerInstances = onConsumeResultReceived.GetInvocationList();
foreach (ConsumeResultReceivedEventHandler<TKey, TValue> handlerInstance in handlerInstances)
{
if (cancellationTokenSource.IsCancellationRequested) return;
await handlerInstance(this, consumeResultEventArgs).ConfigureAwait(false);
}
}, cancellationTokenSource.Token);
}
}
catch (OperationCanceledException)
{
}
catch (ThreadInterruptedException)
{
}
catch (ThreadAbortException)
{
// Aborting a thread is not implemented in .NET Core.
}
}
我不确定这是从专用线程 运行 任务的推荐方式,因此非常感谢任何建议。
我完全不清楚为什么您需要专用线程。目前的代码启动一个线程,然后该线程阻塞以供使用,然后在线程池线程上引发事件处理程序。
_ = Task.Run
惯用语是“即发即弃”,从某种意义上说这是危险的,因为它会默默地吞下事件引发代码或事件处理程序中的任何异常。
我建议将 Thread
替换为 Task.Run
,并直接引发事件处理程序:
consumeTask = Task.Run(ConsumeAsync);
private async Task ConsumeAsync()
{
while (true)
{
var consumeResult = consumer.Consume(cancellationTokenSource.Token);
var consumeResultEventArgs = new ConsumeResultReceivedEventArgs<TKey, TValue>(
consumer, consumeResult, cancellationTokenSource.Token);
if (onConsumeResultReceived is null) continue;
var handlerInstances = onConsumeResultReceived.GetInvocationList();
foreach (ConsumeResultReceivedEventHandler<TKey, TValue> handlerInstance in handlerInstances)
{
if (cancellationTokenSource.IsCancellationRequested) return;
await handlerInstance(this, consumeResultEventArgs).ConfigureAwait(false);
}
}
}
我正在尝试找到从专用后台线程 运行 任务的最佳方法。
使用上下文正在从 Kafka 主题消费并引发异步事件处理程序来处理 ConsumeResult
Kafka 消费者(下面的 consumer
实例)阻塞线程,直到消息被消费或传递的 CancellationToken 被取消。
consumeThread = new Thread(Consume)
{
Name = "Kafka Consumer Thread",
IsBackground = true,
};
这是我想出的Consume方法的实现,由上面的专用线程启动:
private void Consume(object _)
{
try
{
while (!cancellationTokenSource.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationTokenSource.Token);
var consumeResultEventArgs = new ConsumeResultReceivedEventArgs<TKey, TValue>(
consumer, consumeResult, cancellationTokenSource.Token);
_ = Task.Run(async () =>
{
if (onConsumeResultReceived is null) continue;
var handlerInstances = onConsumeResultReceived.GetInvocationList();
foreach (ConsumeResultReceivedEventHandler<TKey, TValue> handlerInstance in handlerInstances)
{
if (cancellationTokenSource.IsCancellationRequested) return;
await handlerInstance(this, consumeResultEventArgs).ConfigureAwait(false);
}
}, cancellationTokenSource.Token);
}
}
catch (OperationCanceledException)
{
}
catch (ThreadInterruptedException)
{
}
catch (ThreadAbortException)
{
// Aborting a thread is not implemented in .NET Core.
}
}
我不确定这是从专用线程 运行 任务的推荐方式,因此非常感谢任何建议。
我完全不清楚为什么您需要专用线程。目前的代码启动一个线程,然后该线程阻塞以供使用,然后在线程池线程上引发事件处理程序。
_ = Task.Run
惯用语是“即发即弃”,从某种意义上说这是危险的,因为它会默默地吞下事件引发代码或事件处理程序中的任何异常。
我建议将 Thread
替换为 Task.Run
,并直接引发事件处理程序:
consumeTask = Task.Run(ConsumeAsync);
private async Task ConsumeAsync()
{
while (true)
{
var consumeResult = consumer.Consume(cancellationTokenSource.Token);
var consumeResultEventArgs = new ConsumeResultReceivedEventArgs<TKey, TValue>(
consumer, consumeResult, cancellationTokenSource.Token);
if (onConsumeResultReceived is null) continue;
var handlerInstances = onConsumeResultReceived.GetInvocationList();
foreach (ConsumeResultReceivedEventHandler<TKey, TValue> handlerInstance in handlerInstances)
{
if (cancellationTokenSource.IsCancellationRequested) return;
await handlerInstance(this, consumeResultEventArgs).ConfigureAwait(false);
}
}
}