StackExchange.Redis 挡风设计
StackExchange.Redis Blocking pop design
所以我们有一个使用 ServiceStack.Redis 的现有帮助程序库,目前正在尝试将其与 StackExchange.Redis 交换。我们使用的是 BlockingPop (BLPOP),但由于 StackExchange.Redis 不支持它。我们实现如下
public static void Push(string Qname, string val)
{
IDatabase db = redis.GetDatabase();
db.ListLeftPush(Qname, val);
ISubscriber sub = redis.GetSubscriber();
sub.Publish(Qname + "_msg", "1");
}
和带有阻止选项的 Pop 如下:
public static string Pop(string Qname,
bool block_until_available = false,int timeout_secs=0)
{
IDatabase db = redis.GetDatabase();
var popped = db.ListRightPop(Qname);
if (popped.IsNull)
{
if (block_until_available == false)
return null;
}
else
return popped;
//wait for an item to be pushed in.
ISubscriber sub = redis.GetSubscriber();
AutoResetEvent autoEvent = new AutoResetEvent(false);
string obj = null;
Task.Run(() =>
{
sub.Subscribe(Qname + "_msg", (channel, message) =>
{
popped = db.ListRightPop(Qname);
if (!popped.IsNull)
{
obj = popped;
sub.Unsubscribe(Qname + "_msg");
autoEvent.Set();
}
});
});
if (timeout_secs > 0)
autoEvent.WaitOne(timeout_secs * 1000);
else
autoEvent.WaitOne();
return obj;
}
你们都看到这种方法有什么明显的问题吗?
此外,我很快 运行 进入以下错误。我增加了 syncTimeout。希望这会解决它?
System.TimeoutException: Timeout performing RPOP DL_PROD,
inst: 0, mgr: ProcessReadQueue, err: never, queue: 0, qu: 0, qs: 0, qc: 0,
wr: 0, wq: 0, in: 0, ar: 1, IOCP: (Busy=0,Free=1000,Min=8,Max=1000),
WORKER: (Busy=2,Free=32765,Min=8,Max=32767),
clientName: CD147RE1 at
StackExchange.Redis.ConnectionMultiplexer.ExecuteSyncImpl[T]
(Message message, ResultProcessor`1 processor, ServerEndPoint server)
RPOP 在这里超时没有具体原因,除非它与带宽相关(巨大的有效负载)。该错误看起来与 IMO 问题无关。这种方法很丑陋,但是……好吧,它可能会奏效。不过,这里没有必要使用 Task.Run。我认为,该方法的一个问题是它不能同时正确工作。它似乎取消订阅该频道/连接的所有代表,而不仅仅是一个。可以自行取消订阅,但坦率地说,我想知道只有一个自动重置事件和一个订阅是否更容易,如果一条消息在您等待时解锁大门:很好.
自行退订受托人的一般模式本质上是:
YourDelegateType handler = null;
handler = (args) => {
DoTheThing();
UnSubscribe(handler);
};
Subscribe(handler);
这利用词法捕获变量的乐趣来提供对委托实例inside的访问,这意味着委托可以传递自身 取消订阅方法。上面显示的模式应该适用于所有基于委托的回调场景——包括调节事件和诸如 SE.Redis pub/sub 处理程序之类的东西。
所以我们有一个使用 ServiceStack.Redis 的现有帮助程序库,目前正在尝试将其与 StackExchange.Redis 交换。我们使用的是 BlockingPop (BLPOP),但由于 StackExchange.Redis 不支持它。我们实现如下
public static void Push(string Qname, string val)
{
IDatabase db = redis.GetDatabase();
db.ListLeftPush(Qname, val);
ISubscriber sub = redis.GetSubscriber();
sub.Publish(Qname + "_msg", "1");
}
和带有阻止选项的 Pop 如下:
public static string Pop(string Qname,
bool block_until_available = false,int timeout_secs=0)
{
IDatabase db = redis.GetDatabase();
var popped = db.ListRightPop(Qname);
if (popped.IsNull)
{
if (block_until_available == false)
return null;
}
else
return popped;
//wait for an item to be pushed in.
ISubscriber sub = redis.GetSubscriber();
AutoResetEvent autoEvent = new AutoResetEvent(false);
string obj = null;
Task.Run(() =>
{
sub.Subscribe(Qname + "_msg", (channel, message) =>
{
popped = db.ListRightPop(Qname);
if (!popped.IsNull)
{
obj = popped;
sub.Unsubscribe(Qname + "_msg");
autoEvent.Set();
}
});
});
if (timeout_secs > 0)
autoEvent.WaitOne(timeout_secs * 1000);
else
autoEvent.WaitOne();
return obj;
}
你们都看到这种方法有什么明显的问题吗?
此外,我很快 运行 进入以下错误。我增加了 syncTimeout。希望这会解决它?
System.TimeoutException: Timeout performing RPOP DL_PROD,
inst: 0, mgr: ProcessReadQueue, err: never, queue: 0, qu: 0, qs: 0, qc: 0,
wr: 0, wq: 0, in: 0, ar: 1, IOCP: (Busy=0,Free=1000,Min=8,Max=1000),
WORKER: (Busy=2,Free=32765,Min=8,Max=32767),
clientName: CD147RE1 at
StackExchange.Redis.ConnectionMultiplexer.ExecuteSyncImpl[T]
(Message message, ResultProcessor`1 processor, ServerEndPoint server)
RPOP 在这里超时没有具体原因,除非它与带宽相关(巨大的有效负载)。该错误看起来与 IMO 问题无关。这种方法很丑陋,但是……好吧,它可能会奏效。不过,这里没有必要使用 Task.Run。我认为,该方法的一个问题是它不能同时正确工作。它似乎取消订阅该频道/连接的所有代表,而不仅仅是一个。可以自行取消订阅,但坦率地说,我想知道只有一个自动重置事件和一个订阅是否更容易,如果一条消息在您等待时解锁大门:很好.
自行退订受托人的一般模式本质上是:
YourDelegateType handler = null;
handler = (args) => {
DoTheThing();
UnSubscribe(handler);
};
Subscribe(handler);
这利用词法捕获变量的乐趣来提供对委托实例inside的访问,这意味着委托可以传递自身 取消订阅方法。上面显示的模式应该适用于所有基于委托的回调场景——包括调节事件和诸如 SE.Redis pub/sub 处理程序之类的东西。