MassTransit Azure - 来自主题错误的接收者

MassTransit Azure - Receiver from Topic error

我在 Azure 服务总线中有一个主题的订阅者。我经常看到返回间歇性错误,如下所示:

MassTransit.Azure.ServiceBus.Core.Transport.ReceiveTransport 错误:0:ReceiveTransport 出错:sb://###,System.AggregateException:发生一个或多个错误。 (发生一个或多个错误。(数字必须是非负数且小于或等于 Int32.MaxValue 或 -1。 参数名称:dueTime)) ---> System.AggregateException: 发生一个或多个错误。 (数字必须是非负数且小于或等于 Int32.MaxValue 或 -1。 参数名称:dueTime) ---> System.ArgumentOutOfRangeException: Number 必须为非负且小于或等于 Int32.MaxValue 或 -1。 参数名称:dueTime

我看不到 how/where 设置到期时间以防止这种情况发生。订阅者大部分时间都在处理消息。

有没有办法使用 MassTransit 设置到期时间?

出版商:

await _busControl.Publish(new ConfigurationReloaded(),
                context => context.TimeToLive = new TimeSpan(0, 0, 10, 0), cancellationToken);

发布错误:

System.AggregateException: One or more errors occurred. (Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime) ---> System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime
   at Microsoft.Azure.ServiceBus.Core.MessageSender.OnSendAsync(IList`1 messageList)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.Core.MessageSender.SendAsync(IList`1 messageList)
   at MassTransit.Azure.ServiceBus.Core.Transport.ServiceBusSendTransport.SendClientPipe`1.Send(SendEndpointContext clientContext)
   at MassTransit.Azure.ServiceBus.Core.Transport.ServiceBusSendTransport.SendClientPipe`1.Send(SendEndpointContext clientContext)
   at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
   at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
   at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
   at MassTransit.Transports.PublishEndpoint.Publish[T](CancellationToken cancellationToken, T message, PublishEndpointPipeAdapter`1 adapter)
   at MassTransit.Transports.PublishEndpoint.Publish[T](CancellationToken cancellationToken, T message, PublishEndpointPipeAdapter`1 adapter)
   at Services.ConfigurationUpdatedHostedService.StartAsync(CancellationToken cancellationToken) in /src/ConfigurationService/Services/ConfigurationUpdatedHostedService.cs:line 32
   at Microsoft.AspNetCore.Hosting.Internal.HostedServiceExecutor.ExecuteAsync(Func`2 callback)
   --- End of inner exception stack trace ---
   at Microsoft.AspNetCore.Hosting.Internal.HostedServiceExecutor.ExecuteAsync(Func`2 callback)
   at Microsoft.AspNetCore.Hosting.Internal.HostedServiceExecutor.StartAsync(CancellationToken token)
---> (Inner Exception #0) System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime

接收者:

 serviceBusHost.ConnectSubscriptionEndpoint<ConfigurationReloaded>($"{_serviceBusOptions.SubscriberName}_{_myUniqueSubscriberName}", x =>
            {
                x.AutoDeleteOnIdle = _serviceBusOptions.TimeToRemoveOnIdle;
                x.Handler<ConfigurationReloaded>(context =>
                {
                    this.Load();
                    return Task.CompletedTask;
                });
            });

完整堆栈跟踪:

MassTransit.Azure.ServiceBus.Core.Transport.ReceiveTransport Error: 0 : ReceiveTransport Faulted: sb://###, System.AggregateException: One or more errors occurred. (One or more errors occurred. (Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime)) ---> System.AggregateException: One or more errors occurred. (Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime) ---> System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.OnReceiveAsync(Int32 maxMessageCount, TimeSpan serverWaitTime)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<>c__DisplayClass64_0.<<ReceiveAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(Int32 maxMessageCount, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.MessageReceivePump.MessagePumpTaskAsync()
   --- End of inner exception stack trace ---
   --- End of inner exception stack trace ---
   at MassTransit.Azure.ServiceBus.Core.Pipeline.MessageReceiverFilter.GreenPipes.IFilter<MassTransit.Azure.ServiceBus.Core.ClientContext>.Send(ClientContext context, IPipe`1 next)
   at MassTransit.Azure.ServiceBus.Core.Pipeline.MessageReceiverFilter.GreenPipes.IFilter<MassTransit.Azure.ServiceBus.Core.ClientContext>.Send(ClientContext context, IPipe`1 next)
   at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
   at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
   at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
   at MassTransit.Azure.ServiceBus.Core.Transport.ReceiveTransport.<Receiver>b__13_0()
---> (Inner Exception #0) System.AggregateException: One or more errors occurred. (Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime) ---> System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.OnReceiveAsync(Int32 maxMessageCount, TimeSpan serverWaitTime)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<>c__DisplayClass64_0.<<ReceiveAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(Int32 maxMessageCount, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.MessageReceivePump.MessagePumpTaskAsync()
   --- End of inner exception stack trace ---
---> (Inner Exception #0) System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.OnReceiveAsync(Int32 maxMessageCount, TimeSpan serverWaitTime)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<>c__DisplayClass64_0.<<ReceiveAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)

添加公共交通的服务集合扩展:

 public static IServiceCollection AddMassTransit(
            this IServiceCollection collection, string serviceBusHost, string serviceBusKeyName, string serviceBusSharedAccessKey)
        {
            collection.AddSingleton(Bus.Factory.CreateUsingAzureServiceBus(cfg =>
                {
                    var host = cfg.Host(
                        serviceBusHost,
                        h =>
                        {
                            h.SharedAccessSignature(s =>
                            {
                                s.KeyName = serviceBusKeyName;
                                s.SharedAccessKey = serviceBusSharedAccessKey;
                            });
                        });
                    collection.AddSingleton(host);
                }
            ));

            collection.AddSingleton<IHostedService, BusService>();

            return collection;
        }

我也一直被这个问题困扰。调试后我想我发现了问题。 .net 核心服务总线库中有一个默认值太低。

在 ShareAccessSignature 配置方法中将 TokenTimeToLive 设置为合理的值似乎可以解决。

                        sbcfg.SharedAccessSignature(sas =>
                    {
                        sas.TokenTimeToLive = TimeSpan.FromMinutes(20);
                        sas.KeyName = Configuration.GetValue<string>("ServiceBus:KeyName");
                        sas.SharedAccessKey = Configuration.GetValue<string>("ServiceBus:AccessKey");
                    });

有关信息,这些是 github 中与此默认行为相关的相关(未解决)问题。

https://github.com/Azure/azure-service-bus-dotnet/issues/672

https://github.com/Azure/azure-sdk-for-net/issues/6312