带锁的 Redis 分布式增量
Redis distributed increment with locking
我需要生成一个计数器,该计数器将发送给一些 api 调用。我的应用程序在多个节点上 运行ning 所以我想如何生成唯一的计数器。
我试过下面的代码
public static long GetTransactionCountForUser(int telcoId)
{
long valreturn = 0;
string key = "TelcoId:" + telcoId + ":Sequence";
if (Muxer != null && Muxer.IsConnected && (Muxer.GetDatabase()) != null)
{
IDatabase db = Muxer.GetDatabase();
var val = db.StringGet(key);
int maxVal = 999;
if (Convert.ToInt32(val) < maxVal)
{
valreturn = db.StringIncrement(key);
}
else
{
bool isdone = db.StringSet(key, valreturn);
//db.SetAdd(key,new RedisValue) .StringIncrement(key, Convert.ToDouble(val))
}
}
return valreturn;
}
并且运行通过任务并行库对其进行了测试。当我有边界值时,我看到的是设置了多次 0 条目
请让我知道我需要做什么更正
更新:
我的最终逻辑如下
public static long GetSequenceNumberForTelcoApiCallViaLuaScript(int telcoId)
{
long valreturn = 0;
long maxIncrement = 9999;//todo via configuration
if (true)//todo via configuration
{
IDatabase db;
string key = "TelcoId:" + telcoId + ":SequenceNumber";
if (Muxer != null && Muxer.IsConnected && (db = Muxer.GetDatabase()) != null)
{
valreturn = (long)db.ScriptEvaluate(@"
local result = redis.call('incr', KEYS[1])
if result > tonumber(ARGV[1]) then
result = 1
redis.call('set', KEYS[1], result)
end
return result", new RedisKey[] { key }, flags: CommandFlags.HighPriority, values: new RedisValue[] { maxIncrement });
}
}
return valreturn;
}
确实,您的代码在翻转边界附近并不安全,因为您正在做 "get",(延迟和思考),"set" - 没有检查 [=59] 中的条件=] 仍然适用。如果服务器在项目 1000 附近很忙,则可能会得到各种疯狂的输出,包括:
1
2
...
999
1000 // when "get" returns 998, so you do an incr
1001 // ditto
1002 // ditto
0 // when "get" returns 999 or above, so you do a set
0 // ditto
0 // ditto
1
选项:
- 使用事务和约束 API 使您的逻辑 concurrency-safe
- 通过
ScriptEvaluate
将您的逻辑重写为 Lua 脚本
现在,redis 事务(根据选项 1)are hard。就个人而言,我会使用“2”——除了更易于编码和调试之外,这意味着您只有 1 个 round-trip 和操作,而不是 "get, watch, get, multi, incr/set, exec/discard",以及一个 "retry from start"循环以说明中止情况。如果你愿意,我可以试着把它写成 Lua - 它应该是大约 4 行。
这是 Lua 实现:
string key = ...
for(int i = 0; i < 2000; i++) // just a test loop for me; you'd only do it once etc
{
int result = (int) db.ScriptEvaluate(@"
local result = redis.call('incr', KEYS[1])
if result > 999 then
result = 0
redis.call('set', KEYS[1], result)
end
return result", new RedisKey[] { key });
Console.WriteLine(result);
}
注意:如果你需要参数化最大值,你会使用:
if result > tonumber(ARGV[1]) then
和:
int result = (int)db.ScriptEvaluate(...,
new RedisKey[] { key }, new RedisValue[] { max });
(因此 ARGV[1]
从 max
中获取值)
有必要了解 eval
/evalsha
(ScriptEvaluate
调用)不与其他服务器请求竞争 ,因此 incr
和可能的 set
之间没有任何变化。这意味着我们不需要复杂的 watch
等逻辑。
这是相同的(我认为!)通过事务/约束 API:
static int IncrementAndLoopToZero(IDatabase db, RedisKey key, int max)
{
int result;
bool success;
do
{
RedisValue current = db.StringGet(key);
var tran = db.CreateTransaction();
// assert hasn't changed - note this handles "not exists" correctly
tran.AddCondition(Condition.StringEqual(key, current));
if(((int)current) > max)
{
result = 0;
tran.StringSetAsync(key, result, flags: CommandFlags.FireAndForget);
}
else
{
result = ((int)current) + 1;
tran.StringIncrementAsync(key, flags: CommandFlags.FireAndForget);
}
success = tran.Execute(); // if assertion fails, returns false and aborts
} while (!success); // and if it aborts, we need to redo
return result;
}
复杂,嗯?那么简单的成功案例就是:
GET {key} # get the current value
WATCH {key} # assertion stating that {key} should be guarded
GET {key} # used by the assertion to check the value
MULTI # begin a block
INCR {key} # increment {key}
EXEC # execute the block *if WATCH is happy*
这是...相当多的工作,并且涉及多路复用器上的流水线停顿。更复杂的情况(断言失败、监视失败、wrap-arounds)的输出会略有不同,但应该可以。
您可以使用 WATCH command - 这样,如果值发生变化,您会收到通知
我需要生成一个计数器,该计数器将发送给一些 api 调用。我的应用程序在多个节点上 运行ning 所以我想如何生成唯一的计数器。 我试过下面的代码
public static long GetTransactionCountForUser(int telcoId)
{
long valreturn = 0;
string key = "TelcoId:" + telcoId + ":Sequence";
if (Muxer != null && Muxer.IsConnected && (Muxer.GetDatabase()) != null)
{
IDatabase db = Muxer.GetDatabase();
var val = db.StringGet(key);
int maxVal = 999;
if (Convert.ToInt32(val) < maxVal)
{
valreturn = db.StringIncrement(key);
}
else
{
bool isdone = db.StringSet(key, valreturn);
//db.SetAdd(key,new RedisValue) .StringIncrement(key, Convert.ToDouble(val))
}
}
return valreturn;
}
并且运行通过任务并行库对其进行了测试。当我有边界值时,我看到的是设置了多次 0 条目
请让我知道我需要做什么更正
更新: 我的最终逻辑如下
public static long GetSequenceNumberForTelcoApiCallViaLuaScript(int telcoId)
{
long valreturn = 0;
long maxIncrement = 9999;//todo via configuration
if (true)//todo via configuration
{
IDatabase db;
string key = "TelcoId:" + telcoId + ":SequenceNumber";
if (Muxer != null && Muxer.IsConnected && (db = Muxer.GetDatabase()) != null)
{
valreturn = (long)db.ScriptEvaluate(@"
local result = redis.call('incr', KEYS[1])
if result > tonumber(ARGV[1]) then
result = 1
redis.call('set', KEYS[1], result)
end
return result", new RedisKey[] { key }, flags: CommandFlags.HighPriority, values: new RedisValue[] { maxIncrement });
}
}
return valreturn;
}
确实,您的代码在翻转边界附近并不安全,因为您正在做 "get",(延迟和思考),"set" - 没有检查 [=59] 中的条件=] 仍然适用。如果服务器在项目 1000 附近很忙,则可能会得到各种疯狂的输出,包括:
1
2
...
999
1000 // when "get" returns 998, so you do an incr
1001 // ditto
1002 // ditto
0 // when "get" returns 999 or above, so you do a set
0 // ditto
0 // ditto
1
选项:
- 使用事务和约束 API 使您的逻辑 concurrency-safe
- 通过
ScriptEvaluate
将您的逻辑重写为 Lua 脚本
现在,redis 事务(根据选项 1)are hard。就个人而言,我会使用“2”——除了更易于编码和调试之外,这意味着您只有 1 个 round-trip 和操作,而不是 "get, watch, get, multi, incr/set, exec/discard",以及一个 "retry from start"循环以说明中止情况。如果你愿意,我可以试着把它写成 Lua - 它应该是大约 4 行。
这是 Lua 实现:
string key = ...
for(int i = 0; i < 2000; i++) // just a test loop for me; you'd only do it once etc
{
int result = (int) db.ScriptEvaluate(@"
local result = redis.call('incr', KEYS[1])
if result > 999 then
result = 0
redis.call('set', KEYS[1], result)
end
return result", new RedisKey[] { key });
Console.WriteLine(result);
}
注意:如果你需要参数化最大值,你会使用:
if result > tonumber(ARGV[1]) then
和:
int result = (int)db.ScriptEvaluate(...,
new RedisKey[] { key }, new RedisValue[] { max });
(因此 ARGV[1]
从 max
中获取值)
有必要了解 eval
/evalsha
(ScriptEvaluate
调用)不与其他服务器请求竞争 ,因此 incr
和可能的 set
之间没有任何变化。这意味着我们不需要复杂的 watch
等逻辑。
这是相同的(我认为!)通过事务/约束 API:
static int IncrementAndLoopToZero(IDatabase db, RedisKey key, int max)
{
int result;
bool success;
do
{
RedisValue current = db.StringGet(key);
var tran = db.CreateTransaction();
// assert hasn't changed - note this handles "not exists" correctly
tran.AddCondition(Condition.StringEqual(key, current));
if(((int)current) > max)
{
result = 0;
tran.StringSetAsync(key, result, flags: CommandFlags.FireAndForget);
}
else
{
result = ((int)current) + 1;
tran.StringIncrementAsync(key, flags: CommandFlags.FireAndForget);
}
success = tran.Execute(); // if assertion fails, returns false and aborts
} while (!success); // and if it aborts, we need to redo
return result;
}
复杂,嗯?那么简单的成功案例就是:
GET {key} # get the current value
WATCH {key} # assertion stating that {key} should be guarded
GET {key} # used by the assertion to check the value
MULTI # begin a block
INCR {key} # increment {key}
EXEC # execute the block *if WATCH is happy*
这是...相当多的工作,并且涉及多路复用器上的流水线停顿。更复杂的情况(断言失败、监视失败、wrap-arounds)的输出会略有不同,但应该可以。
您可以使用 WATCH command - 这样,如果值发生变化,您会收到通知