如何启动N个任务(或线程)以运行同样的方法可能return无用的结果?

How to launch N tasks (or threads) to run the same method that may return useless results?

我有一些 returns 和 Maybe<T> 的随机函数。当它产生有用的结果时,Maybe 包含该结果。

Maybe<T>是这样实现的:

public readonly struct Maybe<T> {

    public readonly bool ContainsValue;
    public readonly T Value;

    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    public Maybe(bool containsValue, T value) {
        ContainsValue = containsValue;
        Value = value;
    }

    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    public static Maybe<T> Just(T value) {
        return new Maybe<T>(
            containsValue: true,
            value: value);
    }

    public static Maybe<T> Empty { get; } = new Maybe<T>(
        containsValue: false,
        value: default
        );

    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    public static implicit operator Maybe<T>(T value) => Maybe.Just(value);
}

我想为 运行 FuncThatMayFail() 创建 N 个任务,其中 N = Environment.ProcessorCount。当第一个任务/线程实际获得有用的结果时,它会停止并告诉其他任务/线程也停止。

我目前的做法是这样的:

public static Maybe<T> RunParallel<T>(int maximumRetries, Func<Maybe<T>> func) {
    if (maximumRetries < 0)
        throw new ArgumentOutOfRangeException(nameof(maximumRetries) + " must be >= 0");
    if (func == null)
        throw new ArgumentNullException(nameof(func));

    var retries = 0;
    var tasks = new Task<Maybe<T>>[Environment.ProcessorCount];
    var finished = 0;

    for (int i = 0; i < tasks.Length; i++) {
        tasks[i] = Task.Run(() => {
            while (true) {
                if (retries >= maximumRetries || finished > 0)
                    return Maybe<T>.Empty;

                var attempt = func();
                if (attempt.ContainsValue) {
                    Interlocked.Increment(ref finished);
                    return attempt;
                } else {
                    Interlocked.Increment(ref retries);
                }
            }
        });
    }

    Task.WaitAny(tasks);

    for (int i = 0; i < tasks.Length; i++) {
        var t = tasks[i];
        if (t.IsCompletedSuccessfully && t.Result.ContainsValue)
            return t.Result;
    }

    return Maybe<T>.Empty;
}

我将此发布在 codereview 上寻求改进建议并得到 none。 我觉得这段代码很丑陋,可能有更好的方法来做到这一点。 有没有更优雅的(不使用外部库)来实现这一点? 我正在使用针对 .Net Core 2.2

的 C# 7.2

我已更新代码并将其发布在下方。它没有经过测试,但答案就在那里。您应该可以 运行 原样,但如果不能,请从中取出您需要的东西。

  • 首先您需要添加一个 CancellationTokenSource 并将 Token 传递给 Task(s) 开始,以便您可以向他们发出何时停止的信号(来自框架 视角)。
  • 然后您需要在 while 循环中自己监控 CancellationTokenSource 以手动停止任务。
  • Task.WaitAny returns Task 的索引 已完成,因此您无需遍历它们即可找到它。
  • 您也已经 returning Maybe<T>.Empty 如果 Task 没有结果就结束所以不需要测试ContainsValue;只是 return Result.

代码在下面并记录了我所做的更改。

//Make a cancellation token source to signal other tasks to cancel.
CancellationTokenSource cts = new CancellationTokenSource();
for (int i = 0; i < tasks.Length; i++)
{
    tasks[i] = Task.Run(() => {
        while (!cts.IsCancellationRequested) //Monitor for the cancellation token source to signal canceled.
        {
            if (retries >= maximumRetries || finished > 0)
                return Maybe<T>.Empty;

            var attempt = func();
            if (attempt.ContainsValue)
            {
                Interlocked.Increment(ref finished);
                return attempt;
            }
            else
            {
                Interlocked.Increment(ref retries);
            }
        }
        return Maybe<T>.Empty;
    }, cts.Token); //Add the token to the task.
}

var completedTaskIndex = Task.WaitAny(tasks); //Task.WaitAny gives you the index of the Task that did complete.
cts.Cancel(); //Signal the remaining tasks to complete.
var completedTask = tasks[completedTaskIndex]; //Get the task that completed.
return completedTask.Result; //You're returning Maybe<T>.Emtpy from the Task if it fails so no need to check ContainsValue; just return the result.