创建一个可等待的冷任务

Create an Awaitable Cold Task

我有一个异步方法,完成后我希望 运行 另一个方法。如果我简单地调用该方法并添加 .ContinueWith()

就可以正常工作

但是,我有一个新要求,即只有在我能够将任务添加到并发字典时才启动该任务。

我想构建任务,尝试添加它然后启动任务

但是,Task.Start() 似乎立即完成了任务,导致继续操作 运行 并且任何等待.. 不等待。

任何人都可以解释为什么会发生这种情况以及实现我的目标的正确方法吗?

namespace UnitTestProject2
{
    [TestClass]
    public class taskProblem
    {
        [TestMethod]
        public void Test()
        {
            CancellationTokenSource cancel = new CancellationTokenSource();
            ConcurrentDictionary<Guid, Task> tasks = new ConcurrentDictionary<Guid,Task>();
            Guid id = Guid.NewGuid();
            Task t = new Task(async () => await Get(), cancel.Token);
            t.ContinueWith(Complete);
            if (tasks.TryAdd(id, t))
            {
                t.Start();
            }
            else
            {
                //another thread is stopping stuff dont start new tasks
            }

            t.Wait(); //expected to wait for the get function to complete
            Console.WriteLine("end test");
        }

        public async Task Get()
        {
            Console.WriteLine("start task");
            await Task.Delay(10000);
            Console.WriteLine("end task");
        }

        public void Complete(Task t)
        {
            Console.WriteLine("Complete");
        }
    }
}

输出:

start task
end test
Complete

预期输出:

start task
end task
Complete
end test

更新: 似乎无法创建不会在 Task.Start?

立即开始或完成的新任务

您的委托是异步无效的。 async void-methods 是一劳永逸的。

参见模式和反模式总结的第一点:http://rarcher.azurewebsites.net/Post/PostContent/31

也许你可以这样做:

[TestFixture]
public class FIXTURENAMETests {
  [Test]
  public async Task NAME() {
    var tcs = new TaskCompletionSource<bool>();
    Task t = LongRunningStuff(tcs);

    if (CanInsertInDictionary(t)) {
      tcs.SetResult(true);
    } else {
      tcs.SetException(new Exception());
    }

    Trace.WriteLine("waiting for end");

    try {
      await t;
    }
    catch (Exception exception) {
      Trace.WriteLine(exception);
    }

    Trace.WriteLine("end all");
  }

  private bool CanInsertInDictionary(Task task) {
    return true;
  }

  private async Task LongRunningStuff(TaskCompletionSource<bool> tcs) {
    Trace.WriteLine("start");
    try {
      await tcs.Task;
    }
    catch (Exception) {
      return;
    }
    Trace.WriteLine("do long running stuff");
    await Task.Delay(10000);
    Trace.WriteLine("end");
  }
}

首先,ContinueWith 将 return 一个新的 Task,您想等到 Complete 方法完成,但您正在等待第一个任务 t.

所以,要在end test之前输出Complete,你必须等待第二个任务:

Task t = new Task(async () => await Get(), cancel.Token);

// NOTE: t2 is a new Task returned from ContinueWith
Task t2 = t.ContinueWith(Complete);

if (tasks.TryAdd(id, t))
{
    t.Start();
}
else
{
}

// NOTE: Waiting on t2, NOT t
t2.Wait();

Console.WriteLine("end test");

现在输出将是:

start task 
Complete
end test
end task

好的,这仍然不是预期的输出。 end task 应该在 Complete 之前打印。这是因为您的异步操作不可等待:How to await on async delegate

不知道我是否理解正确你的要求。如果是我,我可能会这样做:

新增支持class:

public class TaskEntry
{
    public Task Task { get; set; }
}

然后将您的代码更改为:

Guid id = Guid.NewGuid();

Task task = null;
var entry = new TaskEntry();
if (tasks.TryAdd(id, entry))
{
    entry.Task = Get();

    // Notice this line of code:
    task = entry.Task.ContinueWith(Complete);
}

if (task != null)
{
    task.Wait();
}

Console.WriteLine("end test");

这里我假设TaskEntry不会被其他线程修改。

However, I have a new requirement which is to only start the task if I am able to add it to a concurrent dictionary.

以下是上述问题的一种可能解决方案:

  • Task<T> 换成 Lazy<>
  • 通过GetOrAdd
  • 将惰性任务添加到ConcurrentDictionary
  • 使用 GetOrAdd 的输出等待任务

这样,当 ConcurrentDictionary 正在处理冲突等内部事务时,任务不会启动,所有后续 await 将使用与字典中的给定键。

综上所述,在多个线程向字典添加任务的场景下,下面的代码允许任务主体针对字典中的给定键只执行一次。

代码如下:

[TestMethod]
public void ConcurrentMapLazyTask() {

  Func<Task> Get = async () => {
    Trace.WriteLine("Start task.");
    await Task.Delay(200);
    Trace.WriteLine("End task.");
  };

  Action<Task> Complete = (t) => {
    Trace.WriteLine("Complete.");
  };

  var mp = new ConcurrentDictionary<string, Lazy<Task>>();

  Func<string, Lazy<Task>> valueFactory = (sKey) => {
    var s = Guid.NewGuid().ToString();
    Trace.WriteLine(string.Format("valueFactory called => {0}", s));
    return new Lazy<Task>(() => {
      Trace.WriteLine(string.Format("LazyTask factory called for {0}", s));
      var t = Task.Run(async () => {
        Trace.WriteLine(string.Format("Task.Run executed for {0}", s));
        await Get();
      });
      return Task.WhenAll(t, t.ContinueWith(Complete));
    });
  };

  Func<Task> TestAsync = async () => {
    var lazyTask = mp.GetOrAdd("test", valueFactory);
    await lazyTask.Value;
  };

  Action TestSync = () => {
    TestAsync().Wait();
    Trace.WriteLine("End test.");
  };
  Action TestSyncSlow = () => {
    Thread.Sleep(50);
    TestAsync().Wait();
    Trace.WriteLine("End slow test.");
  };
  //Parallel.Invoke(TestSync);
  Parallel.Invoke(TestSync, TestSyncSlow, TestSync, TestSync);
}

这是输出:

valueFactory called => e35b0e9e-3326-41bb-b3b9-eb4e6dc37391
valueFactory called => e4929d93-4c77-4fbf-8e90-e8d6e4d7b009
LazyTask factory called for e35b0e9e-3326-41bb-b3b9-eb4e6dc37391
valueFactory called => 8863fb7f-1309-4c27-b805-71467022ac74
Task.Run executed for e35b0e9e-3326-41bb-b3b9-eb4e6dc37391
Start task.
End task.
Complete.
End test.
End test.
End slow test.
End test.
  • Task.Run只执行一次
  • Lazy 工厂只被调用过一次
  • GetOrAdd 中使用 valueFactory 参数允许减少创建的惰性对象数量(3 个惰性包装器由 4 次并发调用 GetOrAdd 产生)