在同一个 class 上启用 EAP 和 async/await

Enabling EAP and async/await on same class

我正在尝试创建一个 class 有事件并且可以等待,但总是遇到绊脚石。

首先,我尝试了一个 TransferJob class,returns 一个 TransferTask 对象,返回时它已经是 运行。这将通过这样的事情来完成:

public abstract class TransferJob
{
    public TransferTask Start()
    {
        return Start(CancellationToken.None);
    }

    public TransferTask Start(CancellationToken token)
    {
        TransferTask task = CreateTransferTask();
        task.Start(token);

        return task;
    }

    protected abstract TransferTask CreateTransferTask();
}

public abstract class TransferTask
{
    public event EventHandler<TransferStatusChangedEventArgs> StatusChanged;

    private Task transferTask;
    private TransferStatus status;

    public TransferStatus Status
        {
            get { return this.status; }
            protected set
            {
                TransferStatus oldStatus = this.status;
                this.status = value;

                OnStatusChanged(new TransferStatusChangedEventArgs(oldStatus, value));
            }
        }

    internal void Start(CancellationToken token)
    {
        this.transferTask = TransferAsync(cancellationToken);
    }

    protected abstract Task TransferAsync(CancellationToken cancellationToken);

    protected virtual void OnStatusChanged(TransferStatusChangedEventArgs txStatusArgs)
        {
            if (this.StatusChanged != null)
            {
                this.StatusChanged(this, txStatusArgs);
            }
        }

    public TaskAwaiter GetAwaiter()
    {
        return this.transferTask.GetAwaiter();
    }
}

上面的问题是,如果 TransferTask 完成得非常快,那么 TransferJob.Start() 的用户可能没有时间在它完成之前在返回的 TransferTask 的 StatusChanged 事件上注册他们的事件处理程序。所以我尝试了一种不同的方法,用户必须自己调用 TransferTask 的 Start() 方法。这将使用户有时间在 transferJob.CreateTask() 调用和 transferTask.Start() 调用之间的 TransferTask 上注册他们的事件处理程序:

public abstract class TransferJob
{
    public abstract TransferTask CreateTask();
}

public abstract class TransferTask
{
    public event EventHandler<TransferStatusChangedEventArgs> StatusChanged;

    private Task transferTask;
    private TransferStatus status;

    public TransferStatus Status
        {
            get { return this.status; }
            protected set
            {
                TransferStatus oldStatus = this.status;
                this.status = value;

                OnStatusChanged(new TransferStatusChangedEventArgs(oldStatus, value));
            }
        }

    public void Start(CancellationToken token)
    {
        this.transferTask = TransferAsync(cancellationToken);
    }

    protected abstract Task TransferAsync(CancellationToken cancellationToken);

    protected virtual void OnStatusChanged(TransferStatusChangedEventArgs txStatusArgs)
        {
            if (this.StatusChanged != null)
            {
                this.StatusChanged(this, txStatusArgs);
            }
        }

    public TaskAwaiter GetAwaiter()
    {
        return this.transferTask.GetAwaiter();
    }
}

现在,我有一个不同的问题。如果用户在调用 transferTask.Start(); 之前尝试 await transferTask;,那么他们可能会抛出 NullReferenceException,因为任务尚未开始(因此分配给 transferTask 场地)。我真的在努力寻找解决这个问题的方法。有办法吗?或者比上面更好的模式?

我在 C# 中尝试混合事件和可等待代码时发现的最佳方法是使用 Reactive Extension (Rx) 库。 From Microsoft:

Reactive Extension (Rx) is a library to compose asynchronous and event-based programs using observable collections and LINQ-style query operators.

您可以执行类似以下操作来解决您的问题。 (我不确定这是否正是您想要完成的,但目标只是演示如何使用 Rx 将事件与异步代码结合起来):

public async Task TransferAndWaitStartedAsync()
{
    var transferTask = new TransferTask();

    // Prepare the observable before executing the transfer to make sure that the observable sequence will receive the event
    // You can use Linq operators to filter only specific events. In this case, I only care about events with Status == StatusCode.Started 
    var whenStatusChanged = Observable.FromEventPattern<TransferStatusChangedEventArgs>(h, transferTask.StatusChanged += h, h => transferTask.StatusChanged -= h)
                                      .Where(e => e.EventArgs.Status == StatusCode.Started)
                                      .FirstAsync();

     // Start the transfer asynchronously
     await transferTask.TransferAsync(); 

     // Continuation will complete when receiving the first event that matches the predicate in the observable sequence even if the event was triggered too quickly.
     await whenStatusChanged; 
}

我发现 Rx 库的所有微妙之处都具有陡峭的学习曲线,但是当您知道如何使用它时,它就是一个非常强大的工具。

Intro to Rx with lot of examples

Design guidelines

我不太相信这是个好主意。只需公开 TAP 模式即可。删除事件以及 transferTaskStart 的调用者必须保留该任务并将其传递给任何想要侦听完成的代码。这导致非常干净 API。无可变状态,非常易于理解,支持所有用例。

如果你坚持,你可以创建一个看起来像真的代理任务:

public abstract class TransferTask
{
    public event EventHandler<TransferStatusChangedEventArgs> StatusChanged;

    private TaskCompletionSource<object> transferTask = new ...; //changed
    private TransferStatus status;

    public TransferStatus Status
        {
            get { return this.status; }
            protected set
            {
                TransferStatus oldStatus = this.status;
                this.status = value;

                OnStatusChanged(new TransferStatusChangedEventArgs(oldStatus, value));
            }
        }

    public Task Start(CancellationToken token)
    {
        await TransferAsync(cancellationToken);
        transferTask.SetResult(null); //complete proxy task
    }

    protected abstract Task TransferAsync(CancellationToken cancellationToken);

    protected virtual void OnStatusChanged(TransferStatusChangedEventArgs txStatusArgs)
        {
            if (this.StatusChanged != null)
            {
                this.StatusChanged(this, txStatusArgs);
            }
        }

    public TaskAwaiter GetAwaiter()
    {
        return this.transferTask.Task.GetAwaiter(); //changed
    }
}

现在,transferTask.Task 始终不为空。该任务最终将完成。我很快就把它搞定了,希望思路清晰。

也许,您应该将事件基于 transferTask.Task.ContinueWith(...)