如何确保在关闭 MessageReciever 期间完成 MessageHandler 委托?
How to ensure that MessageHandler delegate is completed during closing of MessageReciever?
我有一项服务使用来自 Microsoft.Azure.ServiceBus
的 MessageReceiver
连续监听 ServiceBus 中的订阅。当服务停止时,我想在进程被终止之前给所有操作一个完成的机会。
这是我根据库提供的example使用的代码:
private async Task StartReceiveLoop(IMessageReceiver receiver, CancellationToken cancellationToken)
{
var doneReceiving = new TaskCompletionSource<bool>();
cancellationToken.Register(() =>
{
receiver.CloseAsync();
doneReceiving.SetResult(true);
});
receiver.RegisterMessageHandler(
async (message, ct) => await HandleMessage(receiver, message),
new MessageHandlerOptions(HandleException));
await doneReceiving.Task;
}
在服务停止时,我取消了任务,即使 HandleMessage
仍然是 运行,服务也会立即终止。
有什么方法可以通过库本身检查操作是否仍然是 运行 延迟任务取消?我可以想出一种方法来锁定所有 运行 的任务来进行我自己的计数,但我希望有更好的方法可以让我知道 运行 处理程序的数量。
理想情况下,我想取消注册处理程序,以便消息泵停止,而接收器本身不会关闭以允许例如CompleteAsync 调用。
如MessageReceiver.CloseAsync()所述如下:
Closes the Client. Closes the connections opened by it.
根据我的测试,在调用 MessageReceiver.CloseAsync()
之后,后续调用 CompleteAsync
、DeadLetterAsync
将失败,因为 IMessageReceiver
的实例已被释放。如果您仍想完成队列消息,则需要创建一个新的 MessageReceiver
.
Is there any way that I can check via the library itself that the operation is still running to delay task cancellation?
据我所知,SDK 目前不提供上述功能。此外,这里有一个类似的feedback关于正常关闭Azure服务总线的消息泵。
When the service is stopped I would like to give a chance for all operations to complete before the process is killed.
根据您的要求,我假设您需要自己实现它,以确保即使在MessageReceiver关闭后也能成功处理接收到的队列消息。或者您可以将 CancellationToken
参数传递到 HandleMessage
方法中以显式取消而不是完成检索到的消息。
另一个答案表明,不幸的是,现在不可能实现,并且该功能非常受欢迎,即使它在库本身中不可用。一种替代方法是创建您自己的接收消息泵,但是您需要自己处理断开连接、管理等,尽管正常关闭本身并不难。使用当前的方法,我设法编写了一个似乎工作正常的解决方法,即使它很笨拙。
private async Task StartReceiveLoop(IMessageReceiver receiver, CancellationToken cancellationToken)
{
int activeMessageHandlersCount = 0;
var doneReceiving = new TaskCompletionSource<bool>();
cancellationToken.Register(() =>
{
lock (receiver)
{
int attemptCount = 0;
while (attemptCount++ < 10 && activeMessageHandlersCount > 0)
{
Thread.Sleep(1000);
}
receiver.CloseAsync();
}
doneReceiving.SetResult(true);
});
receiver.RegisterMessageHandler(
async (message, ct) =>
{
bool canBeProcessed;
lock (receiver)
{
canBeProcessed = !cancellationToken.IsCancellationRequested;
if (canBeProcessed)
{
Interlocked.Increment(ref activeMessageHandlersCount);
}
}
if (canBeProcessed)
{
try
{
await HandleMessage(receiver, message);
}
finally
{
Interlocked.Decrement(ref activeMessageHandlersCount);
}
}
else
{
await Task.Delay(60000); // Otherwise message receiver will keep pumping message during graceful shutdown
}
}, new MessageHandlerOptions(HandleException));
await doneReceiving.Task;
}
一个额外的缺点是 Receiver 会在不处理的情况下接收大量消息,并只保留它们直到锁过期。
我有一项服务使用来自 Microsoft.Azure.ServiceBus
的 MessageReceiver
连续监听 ServiceBus 中的订阅。当服务停止时,我想在进程被终止之前给所有操作一个完成的机会。
这是我根据库提供的example使用的代码:
private async Task StartReceiveLoop(IMessageReceiver receiver, CancellationToken cancellationToken)
{
var doneReceiving = new TaskCompletionSource<bool>();
cancellationToken.Register(() =>
{
receiver.CloseAsync();
doneReceiving.SetResult(true);
});
receiver.RegisterMessageHandler(
async (message, ct) => await HandleMessage(receiver, message),
new MessageHandlerOptions(HandleException));
await doneReceiving.Task;
}
在服务停止时,我取消了任务,即使 HandleMessage
仍然是 运行,服务也会立即终止。
有什么方法可以通过库本身检查操作是否仍然是 运行 延迟任务取消?我可以想出一种方法来锁定所有 运行 的任务来进行我自己的计数,但我希望有更好的方法可以让我知道 运行 处理程序的数量。
理想情况下,我想取消注册处理程序,以便消息泵停止,而接收器本身不会关闭以允许例如CompleteAsync 调用。
如MessageReceiver.CloseAsync()所述如下:
Closes the Client. Closes the connections opened by it.
根据我的测试,在调用 MessageReceiver.CloseAsync()
之后,后续调用 CompleteAsync
、DeadLetterAsync
将失败,因为 IMessageReceiver
的实例已被释放。如果您仍想完成队列消息,则需要创建一个新的 MessageReceiver
.
Is there any way that I can check via the library itself that the operation is still running to delay task cancellation?
据我所知,SDK 目前不提供上述功能。此外,这里有一个类似的feedback关于正常关闭Azure服务总线的消息泵。
When the service is stopped I would like to give a chance for all operations to complete before the process is killed.
根据您的要求,我假设您需要自己实现它,以确保即使在MessageReceiver关闭后也能成功处理接收到的队列消息。或者您可以将 CancellationToken
参数传递到 HandleMessage
方法中以显式取消而不是完成检索到的消息。
另一个答案表明,不幸的是,现在不可能实现,并且该功能非常受欢迎,即使它在库本身中不可用。一种替代方法是创建您自己的接收消息泵,但是您需要自己处理断开连接、管理等,尽管正常关闭本身并不难。使用当前的方法,我设法编写了一个似乎工作正常的解决方法,即使它很笨拙。
private async Task StartReceiveLoop(IMessageReceiver receiver, CancellationToken cancellationToken)
{
int activeMessageHandlersCount = 0;
var doneReceiving = new TaskCompletionSource<bool>();
cancellationToken.Register(() =>
{
lock (receiver)
{
int attemptCount = 0;
while (attemptCount++ < 10 && activeMessageHandlersCount > 0)
{
Thread.Sleep(1000);
}
receiver.CloseAsync();
}
doneReceiving.SetResult(true);
});
receiver.RegisterMessageHandler(
async (message, ct) =>
{
bool canBeProcessed;
lock (receiver)
{
canBeProcessed = !cancellationToken.IsCancellationRequested;
if (canBeProcessed)
{
Interlocked.Increment(ref activeMessageHandlersCount);
}
}
if (canBeProcessed)
{
try
{
await HandleMessage(receiver, message);
}
finally
{
Interlocked.Decrement(ref activeMessageHandlersCount);
}
}
else
{
await Task.Delay(60000); // Otherwise message receiver will keep pumping message during graceful shutdown
}
}, new MessageHandlerOptions(HandleException));
await doneReceiving.Task;
}
一个额外的缺点是 Receiver 会在不处理的情况下接收大量消息,并只保留它们直到锁过期。