在多线程环境中满足条件(小于常量)时增加变量
Increment a variable when condition (LessThan a constant) is met in multithreading environment
这是上下文:
我需要 运行 一个方法一个接一个地执行 n 次(不是同时)并且 n 可以由多个线程递增。我想将其限制为 255 次(条件)所以我有下一个代码:
public class MyClass
{
int number = 0;
public void Caller()
{
Thread thread = new Thread(new ThreadStart(Request));
thread.Start();
Thread thread2 = new Thread(new ThreadStart(Request));
thread2.Start();
Thread thread3 = new Thread(new ThreadStart(Request));
thread3.Start();
}
public void Request()
{
// Condition
if (number < 255)
{
// Queue a new method
System.Threading.Interlocked.Increment(ref number);
}
// If it is the first time that Request is called, then it starts to run Method "number" times
if (number == 1)
StartRunnigMethods();
}
public void StartRunningMethods()
{
while (number > 0)
{
Method();
System.Threading.Interlocked.Decrement(ref number);
}
}
public void Method()
{
...
}
}
由于多线程的特性,当我修改小于255的数字时,我担心Request方法。所以我实现了一个解决方案,但我不确定它是否是一个好的实现。
修改后的代码:
public void Request()
{
InterlockedIncrementIfLessThan(ref number, 255);
// It is the fisrt time Request is called
if( number == 1)
StartToRunTheMethod();
}
public bool InterlockedIncrementIfLessThan(ref int value, int condition)
{
int newValue, currentValue;
do
{
int initialValue = value;
currentValue = Thread.VolatileRead(ref value);
if (currentValue >= condition) return false;
newValue = initialValue + 1;
}
while (System.Threading.Interlocked.CompareExchange(ref value, newValue, initialValue) != initialValue);
return true;
}
执行比较(小于)的最佳方法是什么,如果比较为真则递增变量(数字)?
我是这些主题的新手,所以您可以向我推荐一些好的参考资料作为开始。
通常将 Volatile
和 Interlocked
操作结合起来不是一个好主意。最好使用其中之一,最好使用语义更清晰的 Interlocked
class。您可以通过使用 Interlocked.CompareExchange
来实现 Thread.VolatileRead
的功能,执行实际上不会更改基础值的空操作。
另外,直接访问线程间共享的变量也不是一个好主意。 if (number < 255)
或 while (number > 0)
等行是危险信号(表示可能存在错误),应避免使用。
这里有两种方法可以用于共享变量的条件更新,使用 Interlocked
class:InterlockedUpdate
和 InterlockedAdd
。每个方法都有两个重载,一个是 returns 当前值,一个不是。
public delegate bool InterlockedUpdateDelegate<T>(T existingValue, out T newValue);
public static bool InterlockedUpdate(ref int location1,
InterlockedUpdateDelegate<int> predicate, out int currentValue)
{
int value1 = Interlocked.CompareExchange(ref location1, 0, 0);
while (true)
{
bool updateApproved = predicate(value1, out int newValue);
if (!updateApproved) { currentValue = value1; return false; }
int value2 = Interlocked.CompareExchange(ref location1, newValue, value1);
if (value2 == value1) { currentValue = newValue; return true; }
value1 = value2;
}
}
public static bool InterlockedUpdate(ref int location1,
InterlockedUpdateDelegate<int> predicate)
=> InterlockedUpdate(ref location1, predicate, out _);
public static bool InterlockedAdd(ref int location1, int value,
Func<int, bool> predicate, out int currentValue)
=> InterlockedUpdate(ref location1, (int existingValue, out int newValue) =>
{
newValue = existingValue + value;
return predicate(existingValue);
}, out currentValue);
public static bool InterlockedAdd(ref int location1, int value,
Func<int, bool> predicate)
=> InterlockedAdd(ref location1, value, predicate, out _);
用法示例:
public void Request()
{
bool incremented = InterlockedAdd(ref number, 1, v => v < 255,
out var currentValue);
if (incremented && currentValue == 1) StartRunningMethods();
}
public void StartRunningMethods()
{
while (true)
{
bool decremented = InterlockedAdd(ref number, -1, v => v > 0);
if (!decremented) break;
Method();
}
}
更新: 上述InterlockedUpdate
的实现是悲观的,因为它假设观察到的变量大部分时间都是陈旧的,
所以它首先请求一个新的值。这是同一方法的乐观版本,它假设观察到的变量是新鲜的
大多数时候。没测过,不过我相信optimistic版本在低争用场景下应该会有更好的表现。
public static bool InterlockedUpdate(ref int location1,
InterlockedUpdateDelegate<int> predicate, out int currentValue)
{
int value1 = location1; // The value1 may be stale at this point
bool isFresh = false;
while (true)
{
bool updateApproved = predicate(value1, out int newValue);
if (!updateApproved)
{
if (isFresh) { currentValue = value1; return false; }
newValue = value1; // Try rewritting the possibly stale value
}
int value2 = Interlocked.CompareExchange(ref location1, newValue, value1);
if (value2 == value1) { currentValue = newValue; return updateApproved; }
value1 = value2;
isFresh = true;
}
}
如果您不坚持使用 Interlocked
,并且您的问题不是并行 运行 方法,也不是 运行 它超过 255 次,如您在评论,我建议使用 SemaphoreSlim (或者甚至只是简单的锁定)并将 maxCount 设置为 1:
private static SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
private volatile int counter = 0; // to switch
public void ShouldNotRunInParallelAndMoreThan255()
{
if (counter >= 255)
{
throw new Exception("Limit reached"); // or just return
}
semaphore.Wait(); // or semaphore.WaitAsync() in case of async or long workloads
try
{
if(counter >= 255)
{
throw new Exception("Limit reached"); // or just return;
}
counter++;
// Do you stuff
}
finally
{
semaphore.Release();
}
}
这是上下文:
我需要 运行 一个方法一个接一个地执行 n 次(不是同时)并且 n 可以由多个线程递增。我想将其限制为 255 次(条件)所以我有下一个代码:
public class MyClass
{
int number = 0;
public void Caller()
{
Thread thread = new Thread(new ThreadStart(Request));
thread.Start();
Thread thread2 = new Thread(new ThreadStart(Request));
thread2.Start();
Thread thread3 = new Thread(new ThreadStart(Request));
thread3.Start();
}
public void Request()
{
// Condition
if (number < 255)
{
// Queue a new method
System.Threading.Interlocked.Increment(ref number);
}
// If it is the first time that Request is called, then it starts to run Method "number" times
if (number == 1)
StartRunnigMethods();
}
public void StartRunningMethods()
{
while (number > 0)
{
Method();
System.Threading.Interlocked.Decrement(ref number);
}
}
public void Method()
{
...
}
}
由于多线程的特性,当我修改小于255的数字时,我担心Request方法。所以我实现了一个解决方案,但我不确定它是否是一个好的实现。
修改后的代码:
public void Request()
{
InterlockedIncrementIfLessThan(ref number, 255);
// It is the fisrt time Request is called
if( number == 1)
StartToRunTheMethod();
}
public bool InterlockedIncrementIfLessThan(ref int value, int condition)
{
int newValue, currentValue;
do
{
int initialValue = value;
currentValue = Thread.VolatileRead(ref value);
if (currentValue >= condition) return false;
newValue = initialValue + 1;
}
while (System.Threading.Interlocked.CompareExchange(ref value, newValue, initialValue) != initialValue);
return true;
}
执行比较(小于)的最佳方法是什么,如果比较为真则递增变量(数字)?
我是这些主题的新手,所以您可以向我推荐一些好的参考资料作为开始。
通常将 Volatile
和 Interlocked
操作结合起来不是一个好主意。最好使用其中之一,最好使用语义更清晰的 Interlocked
class。您可以通过使用 Interlocked.CompareExchange
来实现 Thread.VolatileRead
的功能,执行实际上不会更改基础值的空操作。
另外,直接访问线程间共享的变量也不是一个好主意。 if (number < 255)
或 while (number > 0)
等行是危险信号(表示可能存在错误),应避免使用。
这里有两种方法可以用于共享变量的条件更新,使用 Interlocked
class:InterlockedUpdate
和 InterlockedAdd
。每个方法都有两个重载,一个是 returns 当前值,一个不是。
public delegate bool InterlockedUpdateDelegate<T>(T existingValue, out T newValue);
public static bool InterlockedUpdate(ref int location1,
InterlockedUpdateDelegate<int> predicate, out int currentValue)
{
int value1 = Interlocked.CompareExchange(ref location1, 0, 0);
while (true)
{
bool updateApproved = predicate(value1, out int newValue);
if (!updateApproved) { currentValue = value1; return false; }
int value2 = Interlocked.CompareExchange(ref location1, newValue, value1);
if (value2 == value1) { currentValue = newValue; return true; }
value1 = value2;
}
}
public static bool InterlockedUpdate(ref int location1,
InterlockedUpdateDelegate<int> predicate)
=> InterlockedUpdate(ref location1, predicate, out _);
public static bool InterlockedAdd(ref int location1, int value,
Func<int, bool> predicate, out int currentValue)
=> InterlockedUpdate(ref location1, (int existingValue, out int newValue) =>
{
newValue = existingValue + value;
return predicate(existingValue);
}, out currentValue);
public static bool InterlockedAdd(ref int location1, int value,
Func<int, bool> predicate)
=> InterlockedAdd(ref location1, value, predicate, out _);
用法示例:
public void Request()
{
bool incremented = InterlockedAdd(ref number, 1, v => v < 255,
out var currentValue);
if (incremented && currentValue == 1) StartRunningMethods();
}
public void StartRunningMethods()
{
while (true)
{
bool decremented = InterlockedAdd(ref number, -1, v => v > 0);
if (!decremented) break;
Method();
}
}
更新: 上述InterlockedUpdate
的实现是悲观的,因为它假设观察到的变量大部分时间都是陈旧的,
所以它首先请求一个新的值。这是同一方法的乐观版本,它假设观察到的变量是新鲜的
大多数时候。没测过,不过我相信optimistic版本在低争用场景下应该会有更好的表现。
public static bool InterlockedUpdate(ref int location1,
InterlockedUpdateDelegate<int> predicate, out int currentValue)
{
int value1 = location1; // The value1 may be stale at this point
bool isFresh = false;
while (true)
{
bool updateApproved = predicate(value1, out int newValue);
if (!updateApproved)
{
if (isFresh) { currentValue = value1; return false; }
newValue = value1; // Try rewritting the possibly stale value
}
int value2 = Interlocked.CompareExchange(ref location1, newValue, value1);
if (value2 == value1) { currentValue = newValue; return updateApproved; }
value1 = value2;
isFresh = true;
}
}
如果您不坚持使用 Interlocked
,并且您的问题不是并行 运行 方法,也不是 运行 它超过 255 次,如您在评论,我建议使用 SemaphoreSlim (或者甚至只是简单的锁定)并将 maxCount 设置为 1:
private static SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
private volatile int counter = 0; // to switch
public void ShouldNotRunInParallelAndMoreThan255()
{
if (counter >= 255)
{
throw new Exception("Limit reached"); // or just return
}
semaphore.Wait(); // or semaphore.WaitAsync() in case of async or long workloads
try
{
if(counter >= 255)
{
throw new Exception("Limit reached"); // or just return;
}
counter++;
// Do you stuff
}
finally
{
semaphore.Release();
}
}