在同一个 class 上启用 EAP 和 async/await
Enabling EAP and async/await on same class
我正在尝试创建一个 class 有事件并且可以等待,但总是遇到绊脚石。
首先,我尝试了一个 TransferJob
class,returns 一个 TransferTask
对象,返回时它已经是 运行。这将通过这样的事情来完成:
public abstract class TransferJob
{
public TransferTask Start()
{
return Start(CancellationToken.None);
}
public TransferTask Start(CancellationToken token)
{
TransferTask task = CreateTransferTask();
task.Start(token);
return task;
}
protected abstract TransferTask CreateTransferTask();
}
public abstract class TransferTask
{
public event EventHandler<TransferStatusChangedEventArgs> StatusChanged;
private Task transferTask;
private TransferStatus status;
public TransferStatus Status
{
get { return this.status; }
protected set
{
TransferStatus oldStatus = this.status;
this.status = value;
OnStatusChanged(new TransferStatusChangedEventArgs(oldStatus, value));
}
}
internal void Start(CancellationToken token)
{
this.transferTask = TransferAsync(cancellationToken);
}
protected abstract Task TransferAsync(CancellationToken cancellationToken);
protected virtual void OnStatusChanged(TransferStatusChangedEventArgs txStatusArgs)
{
if (this.StatusChanged != null)
{
this.StatusChanged(this, txStatusArgs);
}
}
public TaskAwaiter GetAwaiter()
{
return this.transferTask.GetAwaiter();
}
}
上面的问题是,如果 TransferTask 完成得非常快,那么 TransferJob.Start() 的用户可能没有时间在它完成之前在返回的 TransferTask 的 StatusChanged 事件上注册他们的事件处理程序。所以我尝试了一种不同的方法,用户必须自己调用 TransferTask 的 Start() 方法。这将使用户有时间在 transferJob.CreateTask()
调用和 transferTask.Start()
调用之间的 TransferTask
上注册他们的事件处理程序:
public abstract class TransferJob
{
public abstract TransferTask CreateTask();
}
public abstract class TransferTask
{
public event EventHandler<TransferStatusChangedEventArgs> StatusChanged;
private Task transferTask;
private TransferStatus status;
public TransferStatus Status
{
get { return this.status; }
protected set
{
TransferStatus oldStatus = this.status;
this.status = value;
OnStatusChanged(new TransferStatusChangedEventArgs(oldStatus, value));
}
}
public void Start(CancellationToken token)
{
this.transferTask = TransferAsync(cancellationToken);
}
protected abstract Task TransferAsync(CancellationToken cancellationToken);
protected virtual void OnStatusChanged(TransferStatusChangedEventArgs txStatusArgs)
{
if (this.StatusChanged != null)
{
this.StatusChanged(this, txStatusArgs);
}
}
public TaskAwaiter GetAwaiter()
{
return this.transferTask.GetAwaiter();
}
}
现在,我有一个不同的问题。如果用户在调用 transferTask.Start();
之前尝试 await transferTask;
,那么他们可能会抛出 NullReferenceException
,因为任务尚未开始(因此分配给 transferTask
场地)。我真的在努力寻找解决这个问题的方法。有办法吗?或者比上面更好的模式?
我在 C# 中尝试混合事件和可等待代码时发现的最佳方法是使用 Reactive Extension (Rx) 库。 From Microsoft:
Reactive Extension (Rx) is a library to compose asynchronous and event-based programs using observable collections and LINQ-style query operators.
您可以执行类似以下操作来解决您的问题。 (我不确定这是否正是您想要完成的,但目标只是演示如何使用 Rx 将事件与异步代码结合起来):
public async Task TransferAndWaitStartedAsync()
{
var transferTask = new TransferTask();
// Prepare the observable before executing the transfer to make sure that the observable sequence will receive the event
// You can use Linq operators to filter only specific events. In this case, I only care about events with Status == StatusCode.Started
var whenStatusChanged = Observable.FromEventPattern<TransferStatusChangedEventArgs>(h, transferTask.StatusChanged += h, h => transferTask.StatusChanged -= h)
.Where(e => e.EventArgs.Status == StatusCode.Started)
.FirstAsync();
// Start the transfer asynchronously
await transferTask.TransferAsync();
// Continuation will complete when receiving the first event that matches the predicate in the observable sequence even if the event was triggered too quickly.
await whenStatusChanged;
}
我发现 Rx 库的所有微妙之处都具有陡峭的学习曲线,但是当您知道如何使用它时,它就是一个非常强大的工具。
我不太相信这是个好主意。只需公开 TAP 模式即可。删除事件以及 transferTask
。 Start
的调用者必须保留该任务并将其传递给任何想要侦听完成的代码。这导致非常干净 API。无可变状态,非常易于理解,支持所有用例。
如果你坚持,你可以创建一个看起来像真的代理任务:
public abstract class TransferTask
{
public event EventHandler<TransferStatusChangedEventArgs> StatusChanged;
private TaskCompletionSource<object> transferTask = new ...; //changed
private TransferStatus status;
public TransferStatus Status
{
get { return this.status; }
protected set
{
TransferStatus oldStatus = this.status;
this.status = value;
OnStatusChanged(new TransferStatusChangedEventArgs(oldStatus, value));
}
}
public Task Start(CancellationToken token)
{
await TransferAsync(cancellationToken);
transferTask.SetResult(null); //complete proxy task
}
protected abstract Task TransferAsync(CancellationToken cancellationToken);
protected virtual void OnStatusChanged(TransferStatusChangedEventArgs txStatusArgs)
{
if (this.StatusChanged != null)
{
this.StatusChanged(this, txStatusArgs);
}
}
public TaskAwaiter GetAwaiter()
{
return this.transferTask.Task.GetAwaiter(); //changed
}
}
现在,transferTask.Task
始终不为空。该任务最终将完成。我很快就把它搞定了,希望思路清晰。
也许,您应该将事件基于 transferTask.Task.ContinueWith(...)
。
我正在尝试创建一个 class 有事件并且可以等待,但总是遇到绊脚石。
首先,我尝试了一个 TransferJob
class,returns 一个 TransferTask
对象,返回时它已经是 运行。这将通过这样的事情来完成:
public abstract class TransferJob
{
public TransferTask Start()
{
return Start(CancellationToken.None);
}
public TransferTask Start(CancellationToken token)
{
TransferTask task = CreateTransferTask();
task.Start(token);
return task;
}
protected abstract TransferTask CreateTransferTask();
}
public abstract class TransferTask
{
public event EventHandler<TransferStatusChangedEventArgs> StatusChanged;
private Task transferTask;
private TransferStatus status;
public TransferStatus Status
{
get { return this.status; }
protected set
{
TransferStatus oldStatus = this.status;
this.status = value;
OnStatusChanged(new TransferStatusChangedEventArgs(oldStatus, value));
}
}
internal void Start(CancellationToken token)
{
this.transferTask = TransferAsync(cancellationToken);
}
protected abstract Task TransferAsync(CancellationToken cancellationToken);
protected virtual void OnStatusChanged(TransferStatusChangedEventArgs txStatusArgs)
{
if (this.StatusChanged != null)
{
this.StatusChanged(this, txStatusArgs);
}
}
public TaskAwaiter GetAwaiter()
{
return this.transferTask.GetAwaiter();
}
}
上面的问题是,如果 TransferTask 完成得非常快,那么 TransferJob.Start() 的用户可能没有时间在它完成之前在返回的 TransferTask 的 StatusChanged 事件上注册他们的事件处理程序。所以我尝试了一种不同的方法,用户必须自己调用 TransferTask 的 Start() 方法。这将使用户有时间在 transferJob.CreateTask()
调用和 transferTask.Start()
调用之间的 TransferTask
上注册他们的事件处理程序:
public abstract class TransferJob
{
public abstract TransferTask CreateTask();
}
public abstract class TransferTask
{
public event EventHandler<TransferStatusChangedEventArgs> StatusChanged;
private Task transferTask;
private TransferStatus status;
public TransferStatus Status
{
get { return this.status; }
protected set
{
TransferStatus oldStatus = this.status;
this.status = value;
OnStatusChanged(new TransferStatusChangedEventArgs(oldStatus, value));
}
}
public void Start(CancellationToken token)
{
this.transferTask = TransferAsync(cancellationToken);
}
protected abstract Task TransferAsync(CancellationToken cancellationToken);
protected virtual void OnStatusChanged(TransferStatusChangedEventArgs txStatusArgs)
{
if (this.StatusChanged != null)
{
this.StatusChanged(this, txStatusArgs);
}
}
public TaskAwaiter GetAwaiter()
{
return this.transferTask.GetAwaiter();
}
}
现在,我有一个不同的问题。如果用户在调用 transferTask.Start();
之前尝试 await transferTask;
,那么他们可能会抛出 NullReferenceException
,因为任务尚未开始(因此分配给 transferTask
场地)。我真的在努力寻找解决这个问题的方法。有办法吗?或者比上面更好的模式?
我在 C# 中尝试混合事件和可等待代码时发现的最佳方法是使用 Reactive Extension (Rx) 库。 From Microsoft:
Reactive Extension (Rx) is a library to compose asynchronous and event-based programs using observable collections and LINQ-style query operators.
您可以执行类似以下操作来解决您的问题。 (我不确定这是否正是您想要完成的,但目标只是演示如何使用 Rx 将事件与异步代码结合起来):
public async Task TransferAndWaitStartedAsync()
{
var transferTask = new TransferTask();
// Prepare the observable before executing the transfer to make sure that the observable sequence will receive the event
// You can use Linq operators to filter only specific events. In this case, I only care about events with Status == StatusCode.Started
var whenStatusChanged = Observable.FromEventPattern<TransferStatusChangedEventArgs>(h, transferTask.StatusChanged += h, h => transferTask.StatusChanged -= h)
.Where(e => e.EventArgs.Status == StatusCode.Started)
.FirstAsync();
// Start the transfer asynchronously
await transferTask.TransferAsync();
// Continuation will complete when receiving the first event that matches the predicate in the observable sequence even if the event was triggered too quickly.
await whenStatusChanged;
}
我发现 Rx 库的所有微妙之处都具有陡峭的学习曲线,但是当您知道如何使用它时,它就是一个非常强大的工具。
我不太相信这是个好主意。只需公开 TAP 模式即可。删除事件以及 transferTask
。 Start
的调用者必须保留该任务并将其传递给任何想要侦听完成的代码。这导致非常干净 API。无可变状态,非常易于理解,支持所有用例。
如果你坚持,你可以创建一个看起来像真的代理任务:
public abstract class TransferTask
{
public event EventHandler<TransferStatusChangedEventArgs> StatusChanged;
private TaskCompletionSource<object> transferTask = new ...; //changed
private TransferStatus status;
public TransferStatus Status
{
get { return this.status; }
protected set
{
TransferStatus oldStatus = this.status;
this.status = value;
OnStatusChanged(new TransferStatusChangedEventArgs(oldStatus, value));
}
}
public Task Start(CancellationToken token)
{
await TransferAsync(cancellationToken);
transferTask.SetResult(null); //complete proxy task
}
protected abstract Task TransferAsync(CancellationToken cancellationToken);
protected virtual void OnStatusChanged(TransferStatusChangedEventArgs txStatusArgs)
{
if (this.StatusChanged != null)
{
this.StatusChanged(this, txStatusArgs);
}
}
public TaskAwaiter GetAwaiter()
{
return this.transferTask.Task.GetAwaiter(); //changed
}
}
现在,transferTask.Task
始终不为空。该任务最终将完成。我很快就把它搞定了,希望思路清晰。
也许,您应该将事件基于 transferTask.Task.ContinueWith(...)
。