运行 任务与 FIFO 同步的好方法?
Good approach for running Tasks synchronously with FIFO?
目前,我的第一步是使用 async/await 和 .NET 中的任务,我对 运行 异步操作如此简单感到非常兴奋!但是,目前我必须通过 SerialPort 与设备通信。由于同时只能有一个连接,我只是为 运行 所有这些方法写了一些扩展方法,来自不同的 tasks/threads,并以先进先出的顺序同步:
public static class Extensions
{
private readonly static object LockObject = new object();
public static Task<TResult> RunAfter<TResult>(this Task<TResult> task, ConcurrentQueue<Task> others)
=> (Task<TResult>)task.RunAllSynchronously(others);
public static Task RunAfter(this Task task, ConcurrentQueue<Task> others)
=> task.RunAllSynchronously(others);
private static Task RunAllSynchronously(this Task task, ConcurrentQueue<Task> others)
{
if (others == null) throw new ArgumentNullException("The value of " + nameof(others) + " is null!");
lock (LockObject)
{
others.Enqueue(task);
Task currentTask;
while (others.TryDequeue(out currentTask))
{
currentTask.RunSynchronously();
if (currentTask == task) break;
}
}
return task;
}
}
这种方法似乎是个好方法,还是应该区别对待这种情况?
为什么你运行他们同步?
您应该 运行 异步任务并使用 async
和 await
一个一个地执行它们:
Task currentTask;
while (others.TryDequeue(out currentTask))
{
await currentTask;
if (currentTask == task) break;
}
另一方面,查看您的代码,我根本找不到使用lock
(线程同步)的理由。 您针对某些共享资源同步线程(即某些对象可能会或可能不会 read/modified 被多个线程 ).您的方法可以修改为:
private static async Task RunAllAsync(this Task task, ConcurrentQueue<Task> others)
{
// Design by contract rocks ;)
// See: https://msdn.microsoft.com/en-us/library/dd264808(v=vs.110).aspx
Contracts.Requires(task != null);
Contracts.Requires(others != null);
others.Enqueue(task);
// See how I've improved your loop. Since ConcurrentQueue.TryDequeue
// will return false if other thread has called it already, your loop
// should try to dequeue again until it returns true, and it should
// break if dequeued task is the task against which the extension method
// was called or the concurrent queue has no more items, to prevent a
// possible infinite loop
do
{
Task currentTask;
if(others.TryDequeue(out currentTask))
await currentTask;
}
while (currentTask == task || others.Count > 0);
return task;
}
更新
OP 说:
I have possibly forgotten to say, that the ConcurrentQueue is the
resource that should be shared among the threads. I.e.
Task.RunAllSynchronously() is called on every new Task (access to
SerialPort) and this call could be come from a different thread. Also,
I cannot ensure that RunAllSynchronously() is just called, when all
currently running (or queued) tasks are finished (I could, but
therefore i had to use something like lock outside the extension
method, which is not really that nice of having an extension method.
这就是您使用 ConcurrentQueue<T>
的原因。线程安全是在内部管理的。如果您调用 ConcurrentQueue<T>.TryDequeue
并且有多个线程同时调用它,则只有一个线程 获胜 而其他线程将收到 false
作为 return 值和 out
参数不会被赋值。看什么 MSDN says for this:
ConcurrentQueue handles all synchronization internally. If two
threads call TryDequeue at precisely the same moment, neither
operation is blocked. When a conflict is detected between two threads,
one thread has to try again to retrieve the next element, and the
synchronization is handled internally.
TryDequeue tries to remove an element from the queue. If the method is
successful, the item is removed and the method returns true;
otherwise, it returns false. That happens atomically with respect to
other operations on the queue. If the queue was populated with code
such as q.Enqueue("a"); q.Enqueue("b"); q.Enqueue("c"); and two
threads concurrently try to dequeue an element, one thread will
dequeue a and the other thread will dequeue b. Both calls to
TryDequeue will return true, because they were both able to dequeue an
element. If each thread goes back to dequeue an additional element,
one of the threads will dequeue c and return true, whereas the other
thread will find the queue empty and will return false.
首先:
You only benefit from async-await if your program has something else
to do while your tasks are running.
如果您的主线程将启动一个任务,除了等待该任务完成之外什么也不做,您的主线程可以自己完成工作。那甚至会更快。
在你的例子中,我可以想象通过串行线发送比你的处理慢得多。所以我可以想象,当一个线程忙于通过串行线路发送数据时,您的线程可能忙于创建下一个要发送的数据。或者可能有 10 个线程正在创建要一个接一个发送的数据。当然,在后一种情况下,不能保证数据将以何种顺序发送。
Buf 让我们看得更简单:一个线程以自己的速度创建数据,而另一个线程通过串行线路独立发送数据。
这是生产者-消费者模式的尖叫:一个线程是生产者,它生产消费者读取和处理的项目。一段时间后,生产者告诉消费者不再需要数据。
这里面的关键对象是System.Threading.Tasks.Dataflow.BufferBlock。请参阅 MSDN。备注部分说是通过nuget分发的
bufferBlock 实现了两个接口:
- ITargetBlock
<T
> 供生产者将其输出发送到
- ISourceBlock
<T
> 供消费者从中读取输入。
假设您使用 System.IO.Ports.SerialPort 发送数据。唉,这个 class 没有异步支持,所以我们必须自己创建它。假设您要将类型 T 的对象转换为可以通过串行线路发送的格式。代码如下所示:
private void Write(T t)
{
var dataToSend = ConvertToData(t);
serialPort.Write(dataToSend);
}
不是很异步。那么让我们为它创建一个异步函数:
private async Task WriteAsync(T t)
{
return await Task.Run ( () =>
{
var dataToSend = ConvertToData(t);
serialPort.Write(dataToSend);
}
}
或者您可以调用另一个写入函数:
return await Task.Run ( () => Write(t));
Note: if you make sure there is only one thread that will use this function, you don't have to lock it.
现在我们确实有一个异步函数来通过串行线路发送类型 T 的对象,让我们创建一个生产者,它将创建类型 T 的对象并将它们发送到缓冲区块。
我将其设为异步,以便调用线程可以在生成数据时做其他事情:
private BufferBlock<T> bufferBlock = new BufferBlock<T>();
private async Task ProduceAsync()
{
while (objectsToProcessAvailable())
{
T nextObject = GetNextObjectToProcess()
await bufferBlock.SendAsync(nextObject);
}
// nothing to process anymore: mark complete:
bufferBlock.Complete();
}
接收方将由不同的线程完成:
private Task ConsumeAsync()
{
// as long as there is something to process: fetch it and process it
while (await bufferBlock.OutputAvailableAsync())
{
T nextToProcess = await bufferBlock.ReceiveAsync();
// use WriteAsync to send to the serial port:
await WriteAsync(nextToProcess);
}
// if here: no more data to process. Return
}
现在我们只需要一个过程来创建两个线程并等待两个任务完成:
private async Task ProduceConsumeAsync()
{
var taskProducer = ProduceAsync();
// while the producer is busy producing, you can start the consumer:
var taskConsumer = ConsumeAsync();
// while both tasks are busy, you can do other things,
// like keep the UI responsive
// after a while you need to be sure the tasks are finished:
await Task.WhenAll(new Task[] {taskProducer, taskConsumer});
}
Note: because of the bufferBlock it is no problem that the producer is
already producing while the consumer is not started yet.
我们只需要一个启动异步的函数,如果您有一个事件处理程序,只需将其声明为异步即可:
private async void OnButton1_clicked(object sender, ...)
{
await ProduceConsumeAsync()
}
如果没有async函数,就得自己创建一个任务:
private void MyFunction()
{
// start produce consume:
var myTask = Task.Run( () => ProduceConsumeAsync());
// while the task is running, do other things.
// when you need the task to finish:
await myTask;
}
有关消费者-生产者模式的更多信息。请参见 MSDN
在尝试了各种东西后,我找到了一个简单的解决方案,这对我来说应该足够了,并且有点类似于 Matías Fidemraizer:
的解决方案
private static ConcurrentQueue<Task> Tasks { get; } = new ConcurrentQueue<Task>();
public async static Task RunAlone(this Task task)
{
Tasks.Enqueue(task);
do
{
var nextTask = Tasks.First();
if (nextTask == task)
{
nextTask.Start();
await nextTask;
Task deletingTask;
Tasks.TryDequeue(out deletingTask);
break;
}
else
{
nextTask.Wait();
}
} while (Tasks.Any());
}
public async static Task<TResult> RunAlone<TResult>(this Task<TResult> task)
{
TResult result = default(TResult);
Tasks.Enqueue(task);
do
{
var nextTask = Tasks.First();
if (nextTask == task)
{
nextTask.Start();
result = await (Task<TResult>)nextTask;
Task deletingTask;
Tasks.TryDequeue(out deletingTask);
break;
}
else
{
nextTask.Wait();
}
} while (Tasks.Any());
return result;
}
目前,我的第一步是使用 async/await 和 .NET 中的任务,我对 运行 异步操作如此简单感到非常兴奋!但是,目前我必须通过 SerialPort 与设备通信。由于同时只能有一个连接,我只是为 运行 所有这些方法写了一些扩展方法,来自不同的 tasks/threads,并以先进先出的顺序同步:
public static class Extensions
{
private readonly static object LockObject = new object();
public static Task<TResult> RunAfter<TResult>(this Task<TResult> task, ConcurrentQueue<Task> others)
=> (Task<TResult>)task.RunAllSynchronously(others);
public static Task RunAfter(this Task task, ConcurrentQueue<Task> others)
=> task.RunAllSynchronously(others);
private static Task RunAllSynchronously(this Task task, ConcurrentQueue<Task> others)
{
if (others == null) throw new ArgumentNullException("The value of " + nameof(others) + " is null!");
lock (LockObject)
{
others.Enqueue(task);
Task currentTask;
while (others.TryDequeue(out currentTask))
{
currentTask.RunSynchronously();
if (currentTask == task) break;
}
}
return task;
}
}
这种方法似乎是个好方法,还是应该区别对待这种情况?
为什么你运行他们同步?
您应该 运行 异步任务并使用 async
和 await
一个一个地执行它们:
Task currentTask;
while (others.TryDequeue(out currentTask))
{
await currentTask;
if (currentTask == task) break;
}
另一方面,查看您的代码,我根本找不到使用lock
(线程同步)的理由。 您针对某些共享资源同步线程(即某些对象可能会或可能不会 read/modified 被多个线程 ).您的方法可以修改为:
private static async Task RunAllAsync(this Task task, ConcurrentQueue<Task> others)
{
// Design by contract rocks ;)
// See: https://msdn.microsoft.com/en-us/library/dd264808(v=vs.110).aspx
Contracts.Requires(task != null);
Contracts.Requires(others != null);
others.Enqueue(task);
// See how I've improved your loop. Since ConcurrentQueue.TryDequeue
// will return false if other thread has called it already, your loop
// should try to dequeue again until it returns true, and it should
// break if dequeued task is the task against which the extension method
// was called or the concurrent queue has no more items, to prevent a
// possible infinite loop
do
{
Task currentTask;
if(others.TryDequeue(out currentTask))
await currentTask;
}
while (currentTask == task || others.Count > 0);
return task;
}
更新
OP 说:
I have possibly forgotten to say, that the ConcurrentQueue is the resource that should be shared among the threads. I.e. Task.RunAllSynchronously() is called on every new Task (access to SerialPort) and this call could be come from a different thread. Also, I cannot ensure that RunAllSynchronously() is just called, when all currently running (or queued) tasks are finished (I could, but therefore i had to use something like lock outside the extension method, which is not really that nice of having an extension method.
这就是您使用 ConcurrentQueue<T>
的原因。线程安全是在内部管理的。如果您调用 ConcurrentQueue<T>.TryDequeue
并且有多个线程同时调用它,则只有一个线程 获胜 而其他线程将收到 false
作为 return 值和 out
参数不会被赋值。看什么 MSDN says for this:
ConcurrentQueue handles all synchronization internally. If two threads call TryDequeue at precisely the same moment, neither operation is blocked. When a conflict is detected between two threads, one thread has to try again to retrieve the next element, and the synchronization is handled internally.
TryDequeue tries to remove an element from the queue. If the method is successful, the item is removed and the method returns true; otherwise, it returns false. That happens atomically with respect to other operations on the queue. If the queue was populated with code such as q.Enqueue("a"); q.Enqueue("b"); q.Enqueue("c"); and two threads concurrently try to dequeue an element, one thread will dequeue a and the other thread will dequeue b. Both calls to TryDequeue will return true, because they were both able to dequeue an element. If each thread goes back to dequeue an additional element, one of the threads will dequeue c and return true, whereas the other thread will find the queue empty and will return false.
首先:
You only benefit from async-await if your program has something else to do while your tasks are running.
如果您的主线程将启动一个任务,除了等待该任务完成之外什么也不做,您的主线程可以自己完成工作。那甚至会更快。
在你的例子中,我可以想象通过串行线发送比你的处理慢得多。所以我可以想象,当一个线程忙于通过串行线路发送数据时,您的线程可能忙于创建下一个要发送的数据。或者可能有 10 个线程正在创建要一个接一个发送的数据。当然,在后一种情况下,不能保证数据将以何种顺序发送。
Buf 让我们看得更简单:一个线程以自己的速度创建数据,而另一个线程通过串行线路独立发送数据。
这是生产者-消费者模式的尖叫:一个线程是生产者,它生产消费者读取和处理的项目。一段时间后,生产者告诉消费者不再需要数据。
这里面的关键对象是System.Threading.Tasks.Dataflow.BufferBlock。请参阅 MSDN。备注部分说是通过nuget分发的
bufferBlock 实现了两个接口:
- ITargetBlock
<T
> 供生产者将其输出发送到 - ISourceBlock
<T
> 供消费者从中读取输入。
假设您使用 System.IO.Ports.SerialPort 发送数据。唉,这个 class 没有异步支持,所以我们必须自己创建它。假设您要将类型 T 的对象转换为可以通过串行线路发送的格式。代码如下所示:
private void Write(T t)
{
var dataToSend = ConvertToData(t);
serialPort.Write(dataToSend);
}
不是很异步。那么让我们为它创建一个异步函数:
private async Task WriteAsync(T t)
{
return await Task.Run ( () =>
{
var dataToSend = ConvertToData(t);
serialPort.Write(dataToSend);
}
}
或者您可以调用另一个写入函数:
return await Task.Run ( () => Write(t));
Note: if you make sure there is only one thread that will use this function, you don't have to lock it.
现在我们确实有一个异步函数来通过串行线路发送类型 T 的对象,让我们创建一个生产者,它将创建类型 T 的对象并将它们发送到缓冲区块。
我将其设为异步,以便调用线程可以在生成数据时做其他事情:
private BufferBlock<T> bufferBlock = new BufferBlock<T>();
private async Task ProduceAsync()
{
while (objectsToProcessAvailable())
{
T nextObject = GetNextObjectToProcess()
await bufferBlock.SendAsync(nextObject);
}
// nothing to process anymore: mark complete:
bufferBlock.Complete();
}
接收方将由不同的线程完成:
private Task ConsumeAsync()
{
// as long as there is something to process: fetch it and process it
while (await bufferBlock.OutputAvailableAsync())
{
T nextToProcess = await bufferBlock.ReceiveAsync();
// use WriteAsync to send to the serial port:
await WriteAsync(nextToProcess);
}
// if here: no more data to process. Return
}
现在我们只需要一个过程来创建两个线程并等待两个任务完成:
private async Task ProduceConsumeAsync()
{
var taskProducer = ProduceAsync();
// while the producer is busy producing, you can start the consumer:
var taskConsumer = ConsumeAsync();
// while both tasks are busy, you can do other things,
// like keep the UI responsive
// after a while you need to be sure the tasks are finished:
await Task.WhenAll(new Task[] {taskProducer, taskConsumer});
}
Note: because of the bufferBlock it is no problem that the producer is already producing while the consumer is not started yet.
我们只需要一个启动异步的函数,如果您有一个事件处理程序,只需将其声明为异步即可:
private async void OnButton1_clicked(object sender, ...)
{
await ProduceConsumeAsync()
}
如果没有async函数,就得自己创建一个任务:
private void MyFunction()
{
// start produce consume:
var myTask = Task.Run( () => ProduceConsumeAsync());
// while the task is running, do other things.
// when you need the task to finish:
await myTask;
}
有关消费者-生产者模式的更多信息。请参见 MSDN
在尝试了各种东西后,我找到了一个简单的解决方案,这对我来说应该足够了,并且有点类似于 Matías Fidemraizer:
的解决方案private static ConcurrentQueue<Task> Tasks { get; } = new ConcurrentQueue<Task>();
public async static Task RunAlone(this Task task)
{
Tasks.Enqueue(task);
do
{
var nextTask = Tasks.First();
if (nextTask == task)
{
nextTask.Start();
await nextTask;
Task deletingTask;
Tasks.TryDequeue(out deletingTask);
break;
}
else
{
nextTask.Wait();
}
} while (Tasks.Any());
}
public async static Task<TResult> RunAlone<TResult>(this Task<TResult> task)
{
TResult result = default(TResult);
Tasks.Enqueue(task);
do
{
var nextTask = Tasks.First();
if (nextTask == task)
{
nextTask.Start();
result = await (Task<TResult>)nextTask;
Task deletingTask;
Tasks.TryDequeue(out deletingTask);
break;
}
else
{
nextTask.Wait();
}
} while (Tasks.Any());
return result;
}