return in "wait Task<string>.Run (..)" 有时挂起

return in "wait Task<string>.Run (..)" sometimes hangs

这是我的代码的重现,其中 wait Task<string>.Run (..) 中的 return 有时 挂起。如果失败,则主要是在第一次通话时。

我该如何改进它?

using System.Threading.Tasks;
using System.Diagnostics;

private void Button_Click(object sender, RoutedEventArgs e)
{
    // This can be a very huge list
    string[] servers = new string[] { "10.17.100.1", "10.17.100.10", "10.17.100.20" };

    // the max parallel tasks must be limited
    Parallel.ForEach(servers,
        new ParallelOptions { MaxDegreeOfParallelism = 10 },  
        (forServer) =>
    {
        this.Method1Async(forServer).Wait();
    });

    Debug.WriteLine("Finished");
}

private async Task Method1Async(string server)
{
    await this.Method2Async(server);
}

private async Task Method2Async(string server)
{
    Debug.WriteLine("> Method2Async");

    string result = await Task<string>.Run(() =>
    {

        Debug.WriteLine("  Method2Async before return");

        return GetDataFromServer(server);
    });

    Debug.WriteLine("< Method2Async");
}

private string GetDataFromServer(string server)
{
    // any long time running stuff

    Thread.Sleep(10000);

    return "the server data";
}

想要的输出:

> Method2Async
  Method2Async before return
< Method2Async
Finished

return挂起时的输出:

> Method2Async
  Method2Async before return

注意:感谢 Theodor Zoulias 提到这一点:

根据 this question Parallel.ForEach 不会等待任务完成,因此 awaiting 内部操作不会执行任何操作,并且 IsCompleted 将被设置为所有任务启动后立即为真。


ForEach 的操作签名更改为 async 以启用 awaiting。

using System.Threading.Tasks;
using System.Diagnostics;

private void Button_Click(object sender, RoutedEventArgs e)
{
    string[] dummyArray = new string[] { "anyvalue" };

    Parallel.ForEach(dummyArray, async (forDummy) =>
    {
        await this.Method1Async();
    });

    Debug.WriteLine("Finished");
}

private async Task Method1Async()
{
    await this.Method2Async();
}

private async Task Method2Async()
{
    Debug.WriteLine("> Method2Async");

    string result = await Task<string>.Run(() =>
    {
        Debug.WriteLine("  Method2Async before return");
        return "anydata"; // this return sometimes does not "come back" ...
    });

    // ... so this code is never reached
    Debug.WriteLine("< Method2Async" + result);
}

通常,在编写异步代码时,您必须避免同步调用(Wait、Result 等),否则编写异步代码毫无意义。只需删除所有任务、异步和等待,您的代码就会 运行 更快。

此规则的一个例外是当您故意要阻塞线程时,例如在遗留代码中。

编辑:

如果你想等待所有任务完成后再进行下一条语句,你可以使用WhenAll:

private async void Button_Click(object sender, RoutedEventArgs e)
{
    string[] dummyArray = new string[] { "anyvalue" };

    Task[] tasks = dummyArray.Select(async x => await Method1Async()).ToArray();
    await Task.WhenAll(tasks);
}

private async void Button_Click(object sender, RoutedEventArgs e)
{
    string[] dummyArray = new string[] { "anyvalue" };

    Task[] tasks = dummyArray.Select(x => Method1Async()).ToArray();
    await Task.WhenAll(tasks);
}

编辑:

如果你想限制并行任务的数量,那么你可以this:

    public async Task Button_Click()
    {
        string[] servers = new string[] { "1", "2", "3", "4", "5" };

        var maxParallel = 3;
        var throttler = new SemaphoreSlim(initialCount: maxParallel);
        var tasks = servers.Select(async server =>
        {
            try
            {
                await throttler.WaitAsync();
                await Method1Async(server);
            }
            finally
            {
                throttler.Release();
            }
        });
        await Task.WhenAll(tasks);

        Console.WriteLine("Finished");
    }

另一种选择是使用显然非常流行的 AsyncEnumerator NuGet PackageParallelForEachAsync

Awaitable foreach 构造可以通过任何 IEnumerableIAsyncEnumerable.

的迭代来实现
using Dasync.Collections;

string[] servers = new string[] { "10.17.100.1", "10.17.100.10", "10.17.100.20" };

await servers.ParallelForEachAsync<string>(async forServer =>
{
    await this.Method1Async(forServer);

}, maxDegreeOfParallelism: 10);

要收集 return 个值,可以使用 thread safe "bag"

using Dasync.Collections;

string[] servers = new string[] { "10.17.100.1", "10.17.100.10", "10.17.100.20" };

ConcurrentBag<string> bag = new ConcurrentBag<string>();

await severs.ParallelForEachAsync<string>(async forServer =>
{
    string response = await this.Method1Async(forServer);

    bag.Add(response);

}, maxDegreeOfParallelism: 10); 

foreach(string forBagItem in bag)
{
    // evaluate the results
}