在多线程环境中满足条件(小于常量)时增加变量

Increment a variable when condition (LessThan a constant) is met in multithreading environment

这是上下文:

我需要 运行 一个方法一个接一个地执行 n 次(不是同时)并且 n 可以由多个线程递增。我想将其限制为 255 次(条件)所以我有下一个代码:

public class MyClass
{
      int number = 0;

      public void Caller()
      {
         Thread thread = new Thread(new ThreadStart(Request));
         thread.Start();
         Thread thread2 = new Thread(new ThreadStart(Request));
         thread2.Start();
         Thread thread3 = new Thread(new ThreadStart(Request));
         thread3.Start();
      }

      public void Request()
      {
         // Condition
         if (number < 255)
         {
            // Queue a new method
            System.Threading.Interlocked.Increment(ref number);
         }

         // If it is the first time that Request is called, then it starts to run Method "number" times
         if (number == 1)
            StartRunnigMethods();
      }

      public void StartRunningMethods()
      {
         while (number > 0)
         {
            Method();
            System.Threading.Interlocked.Decrement(ref number);
         }
      }

      public void Method()
      {
        ...
      }
}

由于多线程的特性,当我修改小于255的数字时,我担心Request方法。所以我实现了一个解决方案,但我不确定它是否是一个好的实现。

修改后的代码:

      public void Request()
      {
         InterlockedIncrementIfLessThan(ref number, 255);

         // It is the fisrt time Request is called
         if( number == 1)
            StartToRunTheMethod();
      }

      public bool InterlockedIncrementIfLessThan(ref int value, int condition)
      {
         int newValue, currentValue;

         do
         {
            int initialValue = value;
            currentValue = Thread.VolatileRead(ref value);
            if (currentValue >= condition) return false;
            newValue = initialValue + 1;
         }
         while (System.Threading.Interlocked.CompareExchange(ref value, newValue, initialValue) != initialValue);
         return true;
      }

执行比较(小于)的最佳方法是什么,如果比较为真则递增变量(数字)?

我是这些主题的新手,所以您可以向我推荐一些好的参考资料作为开始。

通常将 VolatileInterlocked 操作结合起来不是一个好主意。最好使用其中之一,最好使用语义更清晰的 Interlocked class。您可以通过使用 Interlocked.CompareExchange 来实现 Thread.VolatileRead 的功能,执行实际上不会更改基础值的空操作。

另外,直接访问线程间共享的变量也不是一个好主意。 if (number < 255)while (number > 0) 等行是危险信号(表示可能存在错误),应避免使用。

这里有两种方法可以用于共享变量的条件更新,使用 Interlocked class:InterlockedUpdateInterlockedAdd。每个方法都有两个重载,一个是 returns 当前值,一个不是。

public delegate bool InterlockedUpdateDelegate<T>(T existingValue, out T newValue);

public static bool InterlockedUpdate(ref int location1,
    InterlockedUpdateDelegate<int> predicate, out int currentValue)
{
    int value1 = Interlocked.CompareExchange(ref location1, 0, 0);
    while (true)
    {
        bool updateApproved = predicate(value1, out int newValue);
        if (!updateApproved) { currentValue = value1; return false; }
        int value2 = Interlocked.CompareExchange(ref location1, newValue, value1);
        if (value2 == value1) { currentValue = newValue; return true; }
        value1 = value2;
    }
}

public static bool InterlockedUpdate(ref int location1,
    InterlockedUpdateDelegate<int> predicate)
    => InterlockedUpdate(ref location1, predicate, out _);

public static bool InterlockedAdd(ref int location1, int value,
    Func<int, bool> predicate, out int currentValue)
    => InterlockedUpdate(ref location1, (int existingValue, out int newValue) =>
    {
        newValue = existingValue + value;
        return predicate(existingValue);
    }, out currentValue);

public static bool InterlockedAdd(ref int location1, int value,
    Func<int, bool> predicate)
    => InterlockedAdd(ref location1, value, predicate, out _);

用法示例:

public void Request()
{
    bool incremented = InterlockedAdd(ref number, 1, v => v < 255,
        out var currentValue);

    if (incremented && currentValue == 1) StartRunningMethods();
}

public void StartRunningMethods()
{
    while (true)
    {
        bool decremented = InterlockedAdd(ref number, -1, v => v > 0);
        if (!decremented) break;
        Method();
    }
}

更新: 上述InterlockedUpdate的实现是悲观的,因为它假设观察到的变量大部分时间都是陈旧的, 所以它首先请求一个新的值。这是同一方法的乐观版本,它假设观察到的变量是新鲜的 大多数时候。没测过​​,不过我相信optimistic版本在低争用场景下应该会有更好的表现。

public static bool InterlockedUpdate(ref int location1,
    InterlockedUpdateDelegate<int> predicate, out int currentValue)
{
    int value1 = location1; // The value1 may be stale at this point
    bool isFresh = false;
    while (true)
    {
        bool updateApproved = predicate(value1, out int newValue);
        if (!updateApproved)
        {
            if (isFresh) { currentValue = value1; return false; }
            newValue = value1; // Try rewritting the possibly stale value
        }
        int value2 = Interlocked.CompareExchange(ref location1, newValue, value1);
        if (value2 == value1) { currentValue = newValue; return updateApproved; }
        value1 = value2;
        isFresh = true;
    }
}

如果您不坚持使用 Interlocked,并且您的问题不是并行 运行 方法,也不是 运行 它超过 255 次,如您在评论,我建议使用 SemaphoreSlim (或者甚至只是简单的锁定)并将 maxCount 设置为 1:

private static SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
private volatile int counter = 0; // to switch 

public void ShouldNotRunInParallelAndMoreThan255()
{
    if (counter >= 255)
    {
        throw new Exception("Limit reached"); // or just return
    }
    semaphore.Wait(); // or semaphore.WaitAsync() in case of async or long workloads
    try
    {
        if(counter >= 255)  
        {
            throw new Exception("Limit reached"); // or just return;            
        }
        counter++;
        // Do you stuff

    }
    finally
    {
        semaphore.Release();
    }
}