如何在 C# 中以特定吞吐量调用我的方法?

How to call my method at a particular throughput in C#?

我正在做一些性能测试,所以想以我可以控制的特定吞吐量调用我的方法。通过这种方式,我可以生成一些负载并弄清楚它在各种吞吐量下的行为。

例如:我需要从多个线程以大约 x requests per second 的速率调用我的 doIOStuff 方法,其中 x 大多小于 2000,但在这种情况下它真的无关紧要.它不一定是准确的,所以有一些错误的余地,但总体思路是我需要确保我的方法 doIOStuff 在滑动 window 中执行不超过 x 次y 秒。

下面是我在创建 3 个不同线程时获得的代码,它并行调用 doIOStuff 方法但无法弄清楚如何限制此方法每秒 x 个请求的速率。有没有简单的方法可以控制它?

class Program
{
    static void Main(string[] args)
    {
        var tasks = new List<Task>();

        for(int i = 0; i< 100; i ++)
            tasks.Add(Task.Factory.StartNew(() => doIOStuff(), i));

        Task.WaitAll(tasks.ToArray());
        Console.WriteLine("All threads complete");
    }

    // how to call this method at a particular rate?
    static void doIOStuff()
    {
        // do some IO work
    }
}

我想 运行 将此测试保留一段时间,但在此期间它应该始终仅以特定吞吐量调用该方法。

注:

这只是我产生随机吞吐量的想法,我可以控制它,但如果这不能正确地做事,那么我们应该尽可能以更好、更有效的方式来做,但总体想法是控制我自己的随机吞吐量并为我的方法产生那么大的负载。

假设您启动了 n 个线程并且希望每秒最多调用 m 次。这可以通过让每个线程生成一个介于 0 和 1 之间的随机数来实现,每秒 k 次,并且仅当生成的数字小于 m / n / k

时才调用您的 IO 方法
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;


namespace ConsoleApp7
{
    class Program
    {
        const int m_threads = 100;
        const int n_throughput = 2000;
        const int k_toss_per_second = 2000; // Note that k_toss_per_second x  m_threads >= n_throughput

        static void Main(string[] args)
        {
            var tasks = new List<Task>();



            for (int i = 0; i < m_threads; i++)
                tasks.Add(Task.Factory.StartNew(() => callDoIOStuff()));

            Task.WaitAll(tasks.ToArray());
            Console.WriteLine("All threads complete");
        }


        static void callDoIOStuff()
        {
           int sleep_time = (int) (1000 * 1.0d / k_toss_per_second);
           double threshold = (double) n_throughput / m_threads / k_toss_per_second; 
           Random random = new Random();
           while (true) {
                Thread.Sleep(sleep_time);
                if (random.NextDouble() < threshold)
                    doIOStuff();
            }
        }

        // how to call this method at a particular rate?
        static void doIOStuff()
        {
            // do some IO work
        }
    }
}

您需要注意传递取消标记和任何参数。此代码示例使用全局变量快速而肮脏,可以大大改进。

一种方法是在每个时间段给定的时间量内执行它。

创建 2000 个 Action 类型的实例,它们将代表 DoIOStuff 方法。然后尝试通过此操作启动 2000 个任务,但只有您可以在 1 秒内完成 (TakeWhile)。

var period = TimeSpan.FromSeconds(1);
var maxAmountOfCalls = 2000;

var methods = Enumerable.Range(0, maxAmountOfCalls)
    .Select(_ => doIOStuff)
    .ToArray();

var watch = new Stopwatch();
watch.Start();

var tasks = methods
    .Select(method => Task.Run(method))
    .TakeWhile(task => watch.Elapsed < period)
    .ToArray();

watch.Stop();

await Task.WhenAll(tasks);

如果时间较短,此方法将启动少于所需的量。

您可以使用以下方法对异步 doIOStuffAsync 方法进行压力测试:

public static async Task<long> StressTestAsync(
    Func<CancellationToken, Task> taskFactory,
    TimeSpan duration,
    int concurrencyLimit,
    int tasksStartedPerSecondLimit,
    IProgress<long> progress = default,
    CancellationToken cancellationToken = default)
{
    long successfullyCompletedCount = 0;
    using (var linkedCTS = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken))
    using (var progressTimer = new System.Threading.Timer(_ =>
    {
        progress.Report(Interlocked.Read(ref successfullyCompletedCount));
    }))
    {
        var concurrencySemaphore = new SemaphoreSlim(concurrencyLimit);
        var perSecondSemaphore = new SemaphoreSlim(tasksStartedPerSecondLimit);
        var completionSemaphore = new SemaphoreSlim(0, 1);
        int pendingCount = 1; // The initial 1 represents the while loop
        var exceptions = new ConcurrentQueue<Exception>();
        linkedCTS.CancelAfter(duration);
        if (progress != null)
            progressTimer.Change(1000, 1000); // Report progress every second
        while (true)
        {
            try
            {
                await concurrencySemaphore.WaitAsync(linkedCTS.Token)
                    .ConfigureAwait(false);
                await perSecondSemaphore.WaitAsync(linkedCTS.Token)
                    .ConfigureAwait(false);
            }
            catch (OperationCanceledException) { break; }
            ReleaseSemaphoreAfterOneSecond();
            StartOneTask();
        }
        if (Interlocked.Decrement(ref pendingCount) == 0)
            completionSemaphore.Release();
        await completionSemaphore.WaitAsync().ConfigureAwait(false); // No token
        cancellationToken.ThrowIfCancellationRequested();
        if (!exceptions.IsEmpty) throw new AggregateException(exceptions);

        async void ReleaseSemaphoreAfterOneSecond()
        {
            try
            {
                await Task.Delay(1000, linkedCTS.Token).ConfigureAwait(false);
            }
            catch (OperationCanceledException) { } // Ignore
            finally
            {
                perSecondSemaphore.Release();
            }
        }

        async void StartOneTask()
        {
            Interlocked.Increment(ref pendingCount);
            try
            {
                var task = taskFactory(linkedCTS.Token);
                await task.ConfigureAwait(false);
                Interlocked.Increment(ref successfullyCompletedCount);
            }
            catch (OperationCanceledException) { } // Ignore
            catch (Exception ex)
            {
                exceptions.Enqueue(ex);
                linkedCTS.Cancel();
            }
            finally
            {
                if (Interlocked.Decrement(ref pendingCount) == 0)
                    completionSemaphore.Release();
                concurrencySemaphore.Release();
            }
        }

    }
    return Interlocked.Read(ref successfullyCompletedCount);
}

用法示例:

await StressTestAsync(
    taskFactory: async ct => await doIOStuffAsync(ct),
    duration: TimeSpan.FromSeconds(30),
    concurrencyLimit: 1000,
    tasksStartedPerSecondLimit: 1000);

concurrencyLimit参数是在任何给定时刻可以同时运行的最大任务数。 tasksStartedPerSecondLimit 参数是在任何 1 秒时间跨度内可以启动的最大任务数。这两个限制相互竞争,因此通常只有其中一个会成为吞吐量的限制因素。如果任务很快,concurrencyLimit 将成为限制因素。如果任务很慢,tasksStartedPerSecondLimit 将成为限制因素。

StressTestAsync 不容忍异常。 taskFactory 方法抛出的任何异常,或者如果任何创建的任务在错误状态下完成,都将导致测试终止。

Progress<long> 可以作为可选参数传递,以传播有关当前成功完成的任务数的进度报告。