运行 在特定线程上工作
Run work on specific thread
我想要一个特定的线程,在该单独的线程中排队等待任务和处理任务。该应用程序将根据用户使用情况创建任务并将它们排队到任务队列中。然后单独的线程处理任务。即使队列为空,保持线程活动并使用它来处理排队任务是至关重要的。
我已经尝试了 TaskScheduler
和 BlockingCollection
的几种实现,并将并发限制为只有一个线程,但似乎当队列变空并且任务由其他线程处理时线程被释放。
能否请您至少向我介绍一些如何实现此目标的资源?
tl;博士
试图限制一个特定的线程来处理动态添加到队列中的任务。
编辑 1:
这是使用 WCF 和 .NET Framework 4.6 的实验性 Web 应用程序。在 WCF 库中,我试图用一个线程处理任务来实现这一行为。这个线程必须使用外部 dll 库初始化 prolog,然后使用 prolog 进行工作。如果进程中使用了其他线程,库将抛出 AccessViolationException
。我做了一些研究,这很可能是因为该库中的线程管理不善。我有实施,我到处都有锁并且它有效。我现在正在尝试重新实现并使其异步,这样我就不会用锁定阻塞主线程。
我不在电脑旁,但我今天晚些时候回家后提供了一些代码。
你的方法看起来不错,所以你可能只是犯了一些愚蠢的小错误。
做一个简单的定制其实很容易TaskScheduler
。对于您的情况:
void Main()
{
var cts = new CancellationTokenSource();
var myTs = new SingleThreadTaskScheduler(cts.Token);
myTs.Schedule(() =>
{ Print("Init start"); Thread.Sleep(1000); Print("Init done"); });
myTs.Schedule(() => Print("Work 1"));
myTs.Schedule(() => Print("Work 2"));
myTs.Schedule(() => Print("Work 3"));
var lastOne = myTs.Schedule(() => Print("Work 4"));
Print("Starting TS");
myTs.Start();
// Wait for all of them to complete...
lastOne.GetAwaiter().GetResult();
Thread.Sleep(1000);
// And try to schedule another
myTs.Schedule(() => Print("After emptied")).GetAwaiter().GetResult();
// And shutdown; it's also pretty useful to have the
// TaskScheduler return a "complete task" to await
myTs.Complete();
Print("On main thread again");
}
void Print(string str)
{
Console.WriteLine("{0}: {1}", Thread.CurrentThread.ManagedThreadId, str);
Thread.Sleep(100);
}
public sealed class SingleThreadTaskScheduler : TaskScheduler
{
[ThreadStatic]
private static bool isExecuting;
private readonly CancellationToken cancellationToken;
private readonly BlockingCollection<Task> taskQueue;
public SingleThreadTaskScheduler(CancellationToken cancellationToken)
{
this.cancellationToken = cancellationToken;
this.taskQueue = new BlockingCollection<Task>();
}
public void Start()
{
new Thread(RunOnCurrentThread) { Name = "STTS Thread" }.Start();
}
// Just a helper for the sample code
public Task Schedule(Action action)
{
return
Task.Factory.StartNew
(
action,
CancellationToken.None,
TaskCreationOptions.None,
this
);
}
// You can have this public if you want - just make sure to hide it
private void RunOnCurrentThread()
{
isExecuting = true;
try
{
foreach (var task in taskQueue.GetConsumingEnumerable(cancellationToken))
{
TryExecuteTask(task);
}
}
catch (OperationCanceledException)
{ }
finally
{
isExecuting = false;
}
}
// Signaling this allows the task scheduler to finish after all tasks complete
public void Complete() { taskQueue.CompleteAdding(); }
protected override IEnumerable<Task> GetScheduledTasks() { return null; }
protected override void QueueTask(Task task)
{
try
{
taskQueue.Add(task, cancellationToken);
}
catch (OperationCanceledException)
{ }
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// We'd need to remove the task from queue if it was already queued.
// That would be too hard.
if (taskWasPreviouslyQueued) return false;
return isExecuting && TryExecuteTask(task);
}
}
修改它非常容易,让您可以完全控制任务调度程序实际执行任务的位置 - 事实上,我已经从我使用的之前的任务调度程序中改编了它,它只有 RunOnCurrentThread
方法public.
对于您的情况,您总是只想坚持一个线程,SingleThreadTaskScheduler
中的方法可能更好。虽然这也有它的可取之处:
// On a new thread
try
{
InitializeProlog();
try
{
myTs.RunOnCurrentThread();
}
finally
{
ReleaseProlog();
}
}
catch (Exception ex)
{
// The global handler
}
我想要一个特定的线程,在该单独的线程中排队等待任务和处理任务。该应用程序将根据用户使用情况创建任务并将它们排队到任务队列中。然后单独的线程处理任务。即使队列为空,保持线程活动并使用它来处理排队任务是至关重要的。
我已经尝试了 TaskScheduler
和 BlockingCollection
的几种实现,并将并发限制为只有一个线程,但似乎当队列变空并且任务由其他线程处理时线程被释放。
能否请您至少向我介绍一些如何实现此目标的资源?
tl;博士 试图限制一个特定的线程来处理动态添加到队列中的任务。
编辑 1:
这是使用 WCF 和 .NET Framework 4.6 的实验性 Web 应用程序。在 WCF 库中,我试图用一个线程处理任务来实现这一行为。这个线程必须使用外部 dll 库初始化 prolog,然后使用 prolog 进行工作。如果进程中使用了其他线程,库将抛出 AccessViolationException
。我做了一些研究,这很可能是因为该库中的线程管理不善。我有实施,我到处都有锁并且它有效。我现在正在尝试重新实现并使其异步,这样我就不会用锁定阻塞主线程。
我不在电脑旁,但我今天晚些时候回家后提供了一些代码。
你的方法看起来不错,所以你可能只是犯了一些愚蠢的小错误。
做一个简单的定制其实很容易TaskScheduler
。对于您的情况:
void Main()
{
var cts = new CancellationTokenSource();
var myTs = new SingleThreadTaskScheduler(cts.Token);
myTs.Schedule(() =>
{ Print("Init start"); Thread.Sleep(1000); Print("Init done"); });
myTs.Schedule(() => Print("Work 1"));
myTs.Schedule(() => Print("Work 2"));
myTs.Schedule(() => Print("Work 3"));
var lastOne = myTs.Schedule(() => Print("Work 4"));
Print("Starting TS");
myTs.Start();
// Wait for all of them to complete...
lastOne.GetAwaiter().GetResult();
Thread.Sleep(1000);
// And try to schedule another
myTs.Schedule(() => Print("After emptied")).GetAwaiter().GetResult();
// And shutdown; it's also pretty useful to have the
// TaskScheduler return a "complete task" to await
myTs.Complete();
Print("On main thread again");
}
void Print(string str)
{
Console.WriteLine("{0}: {1}", Thread.CurrentThread.ManagedThreadId, str);
Thread.Sleep(100);
}
public sealed class SingleThreadTaskScheduler : TaskScheduler
{
[ThreadStatic]
private static bool isExecuting;
private readonly CancellationToken cancellationToken;
private readonly BlockingCollection<Task> taskQueue;
public SingleThreadTaskScheduler(CancellationToken cancellationToken)
{
this.cancellationToken = cancellationToken;
this.taskQueue = new BlockingCollection<Task>();
}
public void Start()
{
new Thread(RunOnCurrentThread) { Name = "STTS Thread" }.Start();
}
// Just a helper for the sample code
public Task Schedule(Action action)
{
return
Task.Factory.StartNew
(
action,
CancellationToken.None,
TaskCreationOptions.None,
this
);
}
// You can have this public if you want - just make sure to hide it
private void RunOnCurrentThread()
{
isExecuting = true;
try
{
foreach (var task in taskQueue.GetConsumingEnumerable(cancellationToken))
{
TryExecuteTask(task);
}
}
catch (OperationCanceledException)
{ }
finally
{
isExecuting = false;
}
}
// Signaling this allows the task scheduler to finish after all tasks complete
public void Complete() { taskQueue.CompleteAdding(); }
protected override IEnumerable<Task> GetScheduledTasks() { return null; }
protected override void QueueTask(Task task)
{
try
{
taskQueue.Add(task, cancellationToken);
}
catch (OperationCanceledException)
{ }
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// We'd need to remove the task from queue if it was already queued.
// That would be too hard.
if (taskWasPreviouslyQueued) return false;
return isExecuting && TryExecuteTask(task);
}
}
修改它非常容易,让您可以完全控制任务调度程序实际执行任务的位置 - 事实上,我已经从我使用的之前的任务调度程序中改编了它,它只有 RunOnCurrentThread
方法public.
对于您的情况,您总是只想坚持一个线程,SingleThreadTaskScheduler
中的方法可能更好。虽然这也有它的可取之处:
// On a new thread
try
{
InitializeProlog();
try
{
myTs.RunOnCurrentThread();
}
finally
{
ReleaseProlog();
}
}
catch (Exception ex)
{
// The global handler
}