Redis信号量锁无法释放
Redis semaphore locks can't be released
我正在使用 redis-semaphore gem,版本 0.3.1
。
出于某种原因,我有时无法释放过时的 Redis 锁。根据我的分析,如果我的 Docker 进程在创建锁后崩溃,这似乎会发生。
我在下面描述了我的调试过程,想知道是否有人可以建议如何进一步调试。
假设我们要用这个名字创建一个redis锁:
name = "test"
我们在两个不同的终端中插入这个变量windows。首先,我们 运行:
def lock_for_15_secs(name)
job = Redis::Semaphore.new(name.to_sym, redis: NonBlockingRedis.new(), custom_blpop: true, :stale_client_timeout => 15)
if job.lock(-1) == "0"
puts "Locked and starting"
sleep(15)
puts "Now it's stale, try to release in another process"
sleep(15)
puts "Now trying to unlock"
unlock = job.unlock
puts unlock == false ? "Wuhuu, already unlocked" : "Hm, should have been unlocked by another process, but wasn't"
end
end
lock_for_15_secs(name)
第二个我们运行:
def release_and_lock(name)
job = Redis::Semaphore.new(name.to_sym, redis: NonBlockingRedis.new(), custom_blpop: true, :stale_client_timeout => 15)
release = job.release_stale_locks!
count = job.available_count
puts "Release reponse is #{release.inspect} and available count is #{count}"
if job.lock(-1) == "0"
puts "Wuhuu, we can lock it"
job.unlock
else
puts "Hmm, we can't lock it"
end
end
release_and_lock(name)
这通常会按预期进行。 15 秒后,第二个终端无法释放锁,但是当 15 秒后 运行 时,它会释放。以下是 release_and_lock(name)
.
的输出
15 秒后:
irb(main):1:0> release_and_lock(name)
Release reponse is {"0"=>"1580292557.321834"} and available count is 0
Hmm, we can't lock it
=> nil
15 秒后:
irb(main):2:0> release_and_lock(name)
Release reponse is {"0"=>"1580292557.321834"} and available count is 1
Wuhuu, we can lock it
=> 1
irb(main):3:0> release_and_lock(name)
Release reponse is {} and available count is 1
Wuhuu, we can lock it
但是每当我看到一个陈旧的锁没有被释放,并且我 运行 release_and_lock(name)
进行诊断时,就会返回:
irb(main):4:0> release_and_lock(name)
Release reponse is {} and available count is 0
Hmm, we can't lock it
此时我唯一的选择是刷新 redis:
require 'non_blocking_redis'
non_blocking_redis = NonBlockingRedis.new()
non_blocking_redis.flushall
P.s。我的 NonBlockingRedis
继承自 Redis
:
class Redis
class Semaphore
def initialize(name, opts = {})
@custom_opts = opts
@name = name
@resource_count = opts.delete(:resources) || 1
@stale_client_timeout = opts.delete(:stale_client_timeout)
@redis = opts.delete(:redis) || Redis.new(opts)
@use_local_time = opts.delete(:use_local_time)
@custom_blpop = opts.delete(:custom_blpop) # false=queue, true=cancel
@tokens = []
end
def lock(timeout = 0)
exists_or_create!
release_stale_locks! if check_staleness?
token_pair = @redis.blpop(available_key, timeout, @custom_blpop)
return false if token_pair.nil?
current_token = token_pair[1]
@tokens.push(current_token)
@redis.hset(grabbed_key, current_token, current_time.to_f)
if block_given?
begin
yield current_token
ensure
signal(current_token)
end
end
current_token
end
alias_method :wait, :lock
end
end
class NonBlockingRedis < Redis
def initialize(options = {})
if options.empty?
options = {
url: Rails.application.secrets.redis_url,
db: Rails.application.secrets.redis_sidekiq_db,
driver: :hiredis,
network_timeout: 5
}
end
super(options)
end
def blpop(key, timeout, custom_blpop)
if custom_blpop
if timeout == -1
result = lpop(key)
return result if result.nil?
return [key, result]
else
super(key, timeout)
end
else
super
end
end
def lock(timeout = 0)
exists_or_create!
release_stale_locks! if check_staleness?
token_pair = @redis.blpop(available_key, timeout, @custom_blpop)
return false if token_pair.nil?
current_token = token_pair[1]
@tokens.push(current_token)
@redis.hset(grabbed_key, current_token, current_time.to_f)
if block_given?
begin
yield current_token
ensure
signal(current_token)
end
end
current_token
end
alias_method :wait, :lock
end
require 'non_blocking_redis'
一个很棒的错误
错误
我认为如果在 SEMAPHORE:test:AVAILABLE
上执行 lpop 时终止进程,就会发生这种情况
最有可能在这里https://github.com/dv/redis-semaphore/blob/v0.3.1/lib/redis/semaphore.rb#L67
复制它
NonBlockingRedis.new.flushall
release_and_lock('test');
NonBlockingRedis.new.lpop('SEMAPHORE:test:AVAILABLE')
现在最初你有:
SEMAPHORE:test:AVAILABLE 0
SEMAPHORE:test:VERSION 1
SEMAPHORE:test:EXISTS 1
以上代码后得到:
SEMAPHORE:test:VERSION 1
SEMAPHORE:test:EXISTS 1
代码检查 SEMAPHORE:test:EXISTS
然后期望有 SEMAPHORE:test:AVAILABLE
/ SEMAPHORE:test:GRABBED
解决方案
根据我的简要检查,我认为不修改 gem 是不可能的。我尝试添加一个 expiration:
但不知何故它设法禁用了 SEMAPHORE:test:EXISTS
的过期
NonBlockingRedis.new.ttl('SEMAPHORE:test:EXISTS') # => -1 and it should have been e.g. 20 seconds and going down
所以..也许会有一个修复
class Redis
class Semaphore
def exists_or_create!
token = @redis.getset(exists_key, EXISTS_TOKEN)
if token.nil? || all_tokens.empty?
create!
else
# Previous versions of redis-semaphore did not set `version_key`.
# Make sure it's set now, so we can use it in future versions.
if token == API_VERSION && @redis.get(version_key).nil?
@redis.set(version_key, API_VERSION)
end
true
end
end
end
end
all_tokens 是 https://github.com/dv/redis-semaphore/blob/v0.3.1/lib/redis/semaphore.rb#L120
我很快会打开一个 PR 到 gem -> https://github.com/dv/redis-semaphore/pull/66 也许 ♂️
备注 1
不确定您如何使用 NonBlockingRedis
,但它未在 Redis::Semaphore
中使用。你做 lock(-1)
代码 lpop
。此外,代码永远不会调用您的 lock
.
随机
这是一个转储密钥的助手
class Test
def self.all
r = NonBlockingRedis.new
puts r.keys('*').map { |k|
[
k,
((r.hgetall(k) rescue r.get(k)) rescue r.lrange(k, 0, -1).join(' | '))
].join("\t\t")
}
end
end
> Test.all
SEMAPHORE:test:AVAILABLE 0
SEMAPHORE:test:VERSION 1
SEMAPHORE:test:EXISTS 1
为完整起见,这里是您抓住锁后的样子
SEMAPHORE:test:VERSION 1
SEMAPHORE:test:EXISTS 1
SEMAPHORE:test:GRABBED {"0"=>"1583672948.7168388"}
我正在使用 redis-semaphore gem,版本 0.3.1
。
出于某种原因,我有时无法释放过时的 Redis 锁。根据我的分析,如果我的 Docker 进程在创建锁后崩溃,这似乎会发生。
我在下面描述了我的调试过程,想知道是否有人可以建议如何进一步调试。
假设我们要用这个名字创建一个redis锁:
name = "test"
我们在两个不同的终端中插入这个变量windows。首先,我们 运行:
def lock_for_15_secs(name)
job = Redis::Semaphore.new(name.to_sym, redis: NonBlockingRedis.new(), custom_blpop: true, :stale_client_timeout => 15)
if job.lock(-1) == "0"
puts "Locked and starting"
sleep(15)
puts "Now it's stale, try to release in another process"
sleep(15)
puts "Now trying to unlock"
unlock = job.unlock
puts unlock == false ? "Wuhuu, already unlocked" : "Hm, should have been unlocked by another process, but wasn't"
end
end
lock_for_15_secs(name)
第二个我们运行:
def release_and_lock(name)
job = Redis::Semaphore.new(name.to_sym, redis: NonBlockingRedis.new(), custom_blpop: true, :stale_client_timeout => 15)
release = job.release_stale_locks!
count = job.available_count
puts "Release reponse is #{release.inspect} and available count is #{count}"
if job.lock(-1) == "0"
puts "Wuhuu, we can lock it"
job.unlock
else
puts "Hmm, we can't lock it"
end
end
release_and_lock(name)
这通常会按预期进行。 15 秒后,第二个终端无法释放锁,但是当 15 秒后 运行 时,它会释放。以下是 release_and_lock(name)
.
15 秒后:
irb(main):1:0> release_and_lock(name)
Release reponse is {"0"=>"1580292557.321834"} and available count is 0
Hmm, we can't lock it
=> nil
15 秒后:
irb(main):2:0> release_and_lock(name)
Release reponse is {"0"=>"1580292557.321834"} and available count is 1
Wuhuu, we can lock it
=> 1
irb(main):3:0> release_and_lock(name)
Release reponse is {} and available count is 1
Wuhuu, we can lock it
但是每当我看到一个陈旧的锁没有被释放,并且我 运行 release_and_lock(name)
进行诊断时,就会返回:
irb(main):4:0> release_and_lock(name)
Release reponse is {} and available count is 0
Hmm, we can't lock it
此时我唯一的选择是刷新 redis:
require 'non_blocking_redis'
non_blocking_redis = NonBlockingRedis.new()
non_blocking_redis.flushall
P.s。我的 NonBlockingRedis
继承自 Redis
:
class Redis
class Semaphore
def initialize(name, opts = {})
@custom_opts = opts
@name = name
@resource_count = opts.delete(:resources) || 1
@stale_client_timeout = opts.delete(:stale_client_timeout)
@redis = opts.delete(:redis) || Redis.new(opts)
@use_local_time = opts.delete(:use_local_time)
@custom_blpop = opts.delete(:custom_blpop) # false=queue, true=cancel
@tokens = []
end
def lock(timeout = 0)
exists_or_create!
release_stale_locks! if check_staleness?
token_pair = @redis.blpop(available_key, timeout, @custom_blpop)
return false if token_pair.nil?
current_token = token_pair[1]
@tokens.push(current_token)
@redis.hset(grabbed_key, current_token, current_time.to_f)
if block_given?
begin
yield current_token
ensure
signal(current_token)
end
end
current_token
end
alias_method :wait, :lock
end
end
class NonBlockingRedis < Redis
def initialize(options = {})
if options.empty?
options = {
url: Rails.application.secrets.redis_url,
db: Rails.application.secrets.redis_sidekiq_db,
driver: :hiredis,
network_timeout: 5
}
end
super(options)
end
def blpop(key, timeout, custom_blpop)
if custom_blpop
if timeout == -1
result = lpop(key)
return result if result.nil?
return [key, result]
else
super(key, timeout)
end
else
super
end
end
def lock(timeout = 0)
exists_or_create!
release_stale_locks! if check_staleness?
token_pair = @redis.blpop(available_key, timeout, @custom_blpop)
return false if token_pair.nil?
current_token = token_pair[1]
@tokens.push(current_token)
@redis.hset(grabbed_key, current_token, current_time.to_f)
if block_given?
begin
yield current_token
ensure
signal(current_token)
end
end
current_token
end
alias_method :wait, :lock
end
require 'non_blocking_redis'
一个很棒的错误
错误
我认为如果在 SEMAPHORE:test:AVAILABLE
最有可能在这里https://github.com/dv/redis-semaphore/blob/v0.3.1/lib/redis/semaphore.rb#L67
复制它
NonBlockingRedis.new.flushall
release_and_lock('test');
NonBlockingRedis.new.lpop('SEMAPHORE:test:AVAILABLE')
现在最初你有:
SEMAPHORE:test:AVAILABLE 0
SEMAPHORE:test:VERSION 1
SEMAPHORE:test:EXISTS 1
以上代码后得到:
SEMAPHORE:test:VERSION 1
SEMAPHORE:test:EXISTS 1
代码检查 SEMAPHORE:test:EXISTS
然后期望有 SEMAPHORE:test:AVAILABLE
/ SEMAPHORE:test:GRABBED
解决方案
根据我的简要检查,我认为不修改 gem 是不可能的。我尝试添加一个 expiration:
但不知何故它设法禁用了 SEMAPHORE:test:EXISTS
NonBlockingRedis.new.ttl('SEMAPHORE:test:EXISTS') # => -1 and it should have been e.g. 20 seconds and going down
所以..也许会有一个修复
class Redis
class Semaphore
def exists_or_create!
token = @redis.getset(exists_key, EXISTS_TOKEN)
if token.nil? || all_tokens.empty?
create!
else
# Previous versions of redis-semaphore did not set `version_key`.
# Make sure it's set now, so we can use it in future versions.
if token == API_VERSION && @redis.get(version_key).nil?
@redis.set(version_key, API_VERSION)
end
true
end
end
end
end
all_tokens 是 https://github.com/dv/redis-semaphore/blob/v0.3.1/lib/redis/semaphore.rb#L120
我很快会打开一个 PR 到 gem -> https://github.com/dv/redis-semaphore/pull/66 也许 ♂️
备注 1
不确定您如何使用 NonBlockingRedis
,但它未在 Redis::Semaphore
中使用。你做 lock(-1)
代码 lpop
。此外,代码永远不会调用您的 lock
.
随机
这是一个转储密钥的助手
class Test
def self.all
r = NonBlockingRedis.new
puts r.keys('*').map { |k|
[
k,
((r.hgetall(k) rescue r.get(k)) rescue r.lrange(k, 0, -1).join(' | '))
].join("\t\t")
}
end
end
> Test.all
SEMAPHORE:test:AVAILABLE 0
SEMAPHORE:test:VERSION 1
SEMAPHORE:test:EXISTS 1
为完整起见,这里是您抓住锁后的样子
SEMAPHORE:test:VERSION 1
SEMAPHORE:test:EXISTS 1
SEMAPHORE:test:GRABBED {"0"=>"1583672948.7168388"}