如何确保在关闭 MessageReciever 期间完成 MessageHandler 委托?

How to ensure that MessageHandler delegate is completed during closing of MessageReciever?

我有一项服务使用来自 Microsoft.Azure.ServiceBusMessageReceiver 连续监听 ServiceBus 中的订阅。当服务停止时,我想在进程被终止之前给所有操作一个完成的机会。

这是我根据库提供的example使用的代码:

private async Task StartReceiveLoop(IMessageReceiver receiver, CancellationToken cancellationToken)
{
    var doneReceiving = new TaskCompletionSource<bool>();

    cancellationToken.Register(() =>
    {
        receiver.CloseAsync();
        doneReceiving.SetResult(true);
    });

    receiver.RegisterMessageHandler(
        async (message, ct) => await HandleMessage(receiver, message),
        new MessageHandlerOptions(HandleException));

    await doneReceiving.Task;
}

在服务停止时,我取消了任务,即使 HandleMessage 仍然是 运行,服务也会立即终止。

有什么方法可以通过库本身检查操作是否仍然是 运行 延迟任务取消?我可以想出一种方法来锁定所有 运行 的任务来进行我自己的计数,但我希望有更好的方法可以让我知道 运行 处理程序的数量。

理想情况下,我想取消注册处理程序,以便消息泵停止,而接收器本身不会关闭以允许例如CompleteAsync 调用。

MessageReceiver.CloseAsync()所述如下:

Closes the Client. Closes the connections opened by it.

根据我的测试,在调用 MessageReceiver.CloseAsync() 之后,后续调用 CompleteAsyncDeadLetterAsync 将失败,因为 IMessageReceiver 的实例已被释放。如果您仍想完成队列消息,则需要创建一个新的 MessageReceiver.

Is there any way that I can check via the library itself that the operation is still running to delay task cancellation?

据我所知,SDK 目前不提供上述功能。此外,这里有一个类似的feedback关于正常关闭Azure服务总线的消息泵。

When the service is stopped I would like to give a chance for all operations to complete before the process is killed.

根据您的要求,我假设您需要自己实现它,以确保即使在MessageReceiver关闭后也能成功处理接收到的队列消息。或者您可以将 CancellationToken 参数传递到 HandleMessage 方法中以显式取消而不是完成检索到的消息。

另一个答案表明,不幸的是,现在不可能实现,并且该功能非常受欢迎,即使它在库本身中不可用。一种替代方法是创建您自己的接收消息泵,但是您需要自己处理断开连接、管理等,尽管正常关闭本身并不难。使用当前的方法,我设法编写了一个似乎工作正常的解决方法,即使它很笨拙。

private async Task StartReceiveLoop(IMessageReceiver receiver, CancellationToken cancellationToken)
{
    int activeMessageHandlersCount = 0;
    var doneReceiving = new TaskCompletionSource<bool>();

    cancellationToken.Register(() =>
    {
        lock (receiver)
        {
            int attemptCount = 0;
            while (attemptCount++ < 10 && activeMessageHandlersCount > 0)
            {
                Thread.Sleep(1000);
            }

            receiver.CloseAsync();
        }

        doneReceiving.SetResult(true);
    });

    receiver.RegisterMessageHandler(
        async (message, ct) =>
        {
            bool canBeProcessed;
            lock (receiver)
            {
                canBeProcessed = !cancellationToken.IsCancellationRequested;
                if (canBeProcessed)
                {
                    Interlocked.Increment(ref activeMessageHandlersCount);
                }
            }

            if (canBeProcessed)
            {
                try
                {
                    await HandleMessage(receiver, message);
                }
                finally
                {
                    Interlocked.Decrement(ref activeMessageHandlersCount);
                }
            }
            else
            {
                await Task.Delay(60000); // Otherwise message receiver will keep pumping message during graceful shutdown
            }
        }, new MessageHandlerOptions(HandleException));

    await doneReceiving.Task;
}

一个额外的缺点是 Receiver 会在不处理的情况下接收大量消息,并只保留它们直到锁过期。