MassTransit Azure - 来自主题错误的接收者
MassTransit Azure - Receiver from Topic error
我在 Azure 服务总线中有一个主题的订阅者。我经常看到返回间歇性错误,如下所示:
MassTransit.Azure.ServiceBus.Core.Transport.ReceiveTransport 错误:0:ReceiveTransport 出错:sb://###,System.AggregateException:发生一个或多个错误。 (发生一个或多个错误。(数字必须是非负数且小于或等于 Int32.MaxValue 或 -1。
参数名称:dueTime)) ---> System.AggregateException: 发生一个或多个错误。 (数字必须是非负数且小于或等于 Int32.MaxValue 或 -1。
参数名称:dueTime) ---> System.ArgumentOutOfRangeException: Number 必须为非负且小于或等于 Int32.MaxValue 或 -1。
参数名称:dueTime
我看不到 how/where 设置到期时间以防止这种情况发生。订阅者大部分时间都在处理消息。
有没有办法使用 MassTransit 设置到期时间?
出版商:
await _busControl.Publish(new ConfigurationReloaded(),
context => context.TimeToLive = new TimeSpan(0, 0, 10, 0), cancellationToken);
发布错误:
System.AggregateException: One or more errors occurred. (Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime) ---> System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime
at Microsoft.Azure.ServiceBus.Core.MessageSender.OnSendAsync(IList`1 messageList)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageSender.SendAsync(IList`1 messageList)
at MassTransit.Azure.ServiceBus.Core.Transport.ServiceBusSendTransport.SendClientPipe`1.Send(SendEndpointContext clientContext)
at MassTransit.Azure.ServiceBus.Core.Transport.ServiceBusSendTransport.SendClientPipe`1.Send(SendEndpointContext clientContext)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at MassTransit.Transports.PublishEndpoint.Publish[T](CancellationToken cancellationToken, T message, PublishEndpointPipeAdapter`1 adapter)
at MassTransit.Transports.PublishEndpoint.Publish[T](CancellationToken cancellationToken, T message, PublishEndpointPipeAdapter`1 adapter)
at Services.ConfigurationUpdatedHostedService.StartAsync(CancellationToken cancellationToken) in /src/ConfigurationService/Services/ConfigurationUpdatedHostedService.cs:line 32
at Microsoft.AspNetCore.Hosting.Internal.HostedServiceExecutor.ExecuteAsync(Func`2 callback)
--- End of inner exception stack trace ---
at Microsoft.AspNetCore.Hosting.Internal.HostedServiceExecutor.ExecuteAsync(Func`2 callback)
at Microsoft.AspNetCore.Hosting.Internal.HostedServiceExecutor.StartAsync(CancellationToken token)
---> (Inner Exception #0) System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime
接收者:
serviceBusHost.ConnectSubscriptionEndpoint<ConfigurationReloaded>($"{_serviceBusOptions.SubscriberName}_{_myUniqueSubscriberName}", x =>
{
x.AutoDeleteOnIdle = _serviceBusOptions.TimeToRemoveOnIdle;
x.Handler<ConfigurationReloaded>(context =>
{
this.Load();
return Task.CompletedTask;
});
});
完整堆栈跟踪:
MassTransit.Azure.ServiceBus.Core.Transport.ReceiveTransport Error: 0 : ReceiveTransport Faulted: sb://###, System.AggregateException: One or more errors occurred. (One or more errors occurred. (Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime)) ---> System.AggregateException: One or more errors occurred. (Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime) ---> System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.OnReceiveAsync(Int32 maxMessageCount, TimeSpan serverWaitTime)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<>c__DisplayClass64_0.<<ReceiveAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(Int32 maxMessageCount, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.MessageReceivePump.MessagePumpTaskAsync()
--- End of inner exception stack trace ---
--- End of inner exception stack trace ---
at MassTransit.Azure.ServiceBus.Core.Pipeline.MessageReceiverFilter.GreenPipes.IFilter<MassTransit.Azure.ServiceBus.Core.ClientContext>.Send(ClientContext context, IPipe`1 next)
at MassTransit.Azure.ServiceBus.Core.Pipeline.MessageReceiverFilter.GreenPipes.IFilter<MassTransit.Azure.ServiceBus.Core.ClientContext>.Send(ClientContext context, IPipe`1 next)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at MassTransit.Azure.ServiceBus.Core.Transport.ReceiveTransport.<Receiver>b__13_0()
---> (Inner Exception #0) System.AggregateException: One or more errors occurred. (Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime) ---> System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.OnReceiveAsync(Int32 maxMessageCount, TimeSpan serverWaitTime)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<>c__DisplayClass64_0.<<ReceiveAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(Int32 maxMessageCount, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.MessageReceivePump.MessagePumpTaskAsync()
--- End of inner exception stack trace ---
---> (Inner Exception #0) System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.OnReceiveAsync(Int32 maxMessageCount, TimeSpan serverWaitTime)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<>c__DisplayClass64_0.<<ReceiveAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
添加公共交通的服务集合扩展:
public static IServiceCollection AddMassTransit(
this IServiceCollection collection, string serviceBusHost, string serviceBusKeyName, string serviceBusSharedAccessKey)
{
collection.AddSingleton(Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
var host = cfg.Host(
serviceBusHost,
h =>
{
h.SharedAccessSignature(s =>
{
s.KeyName = serviceBusKeyName;
s.SharedAccessKey = serviceBusSharedAccessKey;
});
});
collection.AddSingleton(host);
}
));
collection.AddSingleton<IHostedService, BusService>();
return collection;
}
我也一直被这个问题困扰。调试后我想我发现了问题。 .net 核心服务总线库中有一个默认值太低。
在 ShareAccessSignature 配置方法中将 TokenTimeToLive 设置为合理的值似乎可以解决。
sbcfg.SharedAccessSignature(sas =>
{
sas.TokenTimeToLive = TimeSpan.FromMinutes(20);
sas.KeyName = Configuration.GetValue<string>("ServiceBus:KeyName");
sas.SharedAccessKey = Configuration.GetValue<string>("ServiceBus:AccessKey");
});
有关信息,这些是 github 中与此默认行为相关的相关(未解决)问题。
https://github.com/Azure/azure-service-bus-dotnet/issues/672
我在 Azure 服务总线中有一个主题的订阅者。我经常看到返回间歇性错误,如下所示:
MassTransit.Azure.ServiceBus.Core.Transport.ReceiveTransport 错误:0:ReceiveTransport 出错:sb://###,System.AggregateException:发生一个或多个错误。 (发生一个或多个错误。(数字必须是非负数且小于或等于 Int32.MaxValue 或 -1。 参数名称:dueTime)) ---> System.AggregateException: 发生一个或多个错误。 (数字必须是非负数且小于或等于 Int32.MaxValue 或 -1。 参数名称:dueTime) ---> System.ArgumentOutOfRangeException: Number 必须为非负且小于或等于 Int32.MaxValue 或 -1。 参数名称:dueTime
我看不到 how/where 设置到期时间以防止这种情况发生。订阅者大部分时间都在处理消息。
有没有办法使用 MassTransit 设置到期时间?
出版商:
await _busControl.Publish(new ConfigurationReloaded(),
context => context.TimeToLive = new TimeSpan(0, 0, 10, 0), cancellationToken);
发布错误:
System.AggregateException: One or more errors occurred. (Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime) ---> System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime
at Microsoft.Azure.ServiceBus.Core.MessageSender.OnSendAsync(IList`1 messageList)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageSender.SendAsync(IList`1 messageList)
at MassTransit.Azure.ServiceBus.Core.Transport.ServiceBusSendTransport.SendClientPipe`1.Send(SendEndpointContext clientContext)
at MassTransit.Azure.ServiceBus.Core.Transport.ServiceBusSendTransport.SendClientPipe`1.Send(SendEndpointContext clientContext)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at MassTransit.Transports.PublishEndpoint.Publish[T](CancellationToken cancellationToken, T message, PublishEndpointPipeAdapter`1 adapter)
at MassTransit.Transports.PublishEndpoint.Publish[T](CancellationToken cancellationToken, T message, PublishEndpointPipeAdapter`1 adapter)
at Services.ConfigurationUpdatedHostedService.StartAsync(CancellationToken cancellationToken) in /src/ConfigurationService/Services/ConfigurationUpdatedHostedService.cs:line 32
at Microsoft.AspNetCore.Hosting.Internal.HostedServiceExecutor.ExecuteAsync(Func`2 callback)
--- End of inner exception stack trace ---
at Microsoft.AspNetCore.Hosting.Internal.HostedServiceExecutor.ExecuteAsync(Func`2 callback)
at Microsoft.AspNetCore.Hosting.Internal.HostedServiceExecutor.StartAsync(CancellationToken token)
---> (Inner Exception #0) System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime
接收者:
serviceBusHost.ConnectSubscriptionEndpoint<ConfigurationReloaded>($"{_serviceBusOptions.SubscriberName}_{_myUniqueSubscriberName}", x =>
{
x.AutoDeleteOnIdle = _serviceBusOptions.TimeToRemoveOnIdle;
x.Handler<ConfigurationReloaded>(context =>
{
this.Load();
return Task.CompletedTask;
});
});
完整堆栈跟踪:
MassTransit.Azure.ServiceBus.Core.Transport.ReceiveTransport Error: 0 : ReceiveTransport Faulted: sb://###, System.AggregateException: One or more errors occurred. (One or more errors occurred. (Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime)) ---> System.AggregateException: One or more errors occurred. (Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime) ---> System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.OnReceiveAsync(Int32 maxMessageCount, TimeSpan serverWaitTime)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<>c__DisplayClass64_0.<<ReceiveAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(Int32 maxMessageCount, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.MessageReceivePump.MessagePumpTaskAsync()
--- End of inner exception stack trace ---
--- End of inner exception stack trace ---
at MassTransit.Azure.ServiceBus.Core.Pipeline.MessageReceiverFilter.GreenPipes.IFilter<MassTransit.Azure.ServiceBus.Core.ClientContext>.Send(ClientContext context, IPipe`1 next)
at MassTransit.Azure.ServiceBus.Core.Pipeline.MessageReceiverFilter.GreenPipes.IFilter<MassTransit.Azure.ServiceBus.Core.ClientContext>.Send(ClientContext context, IPipe`1 next)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at MassTransit.Azure.ServiceBus.Core.Transport.ReceiveTransport.<Receiver>b__13_0()
---> (Inner Exception #0) System.AggregateException: One or more errors occurred. (Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime) ---> System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.OnReceiveAsync(Int32 maxMessageCount, TimeSpan serverWaitTime)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<>c__DisplayClass64_0.<<ReceiveAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(Int32 maxMessageCount, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.MessageReceivePump.MessagePumpTaskAsync()
--- End of inner exception stack trace ---
---> (Inner Exception #0) System.ArgumentOutOfRangeException: Number must be either non-negative and less than or equal to Int32.MaxValue or -1.
Parameter name: dueTime
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.OnReceiveAsync(Int32 maxMessageCount, TimeSpan serverWaitTime)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<>c__DisplayClass64_0.<<ReceiveAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
添加公共交通的服务集合扩展:
public static IServiceCollection AddMassTransit(
this IServiceCollection collection, string serviceBusHost, string serviceBusKeyName, string serviceBusSharedAccessKey)
{
collection.AddSingleton(Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
var host = cfg.Host(
serviceBusHost,
h =>
{
h.SharedAccessSignature(s =>
{
s.KeyName = serviceBusKeyName;
s.SharedAccessKey = serviceBusSharedAccessKey;
});
});
collection.AddSingleton(host);
}
));
collection.AddSingleton<IHostedService, BusService>();
return collection;
}
我也一直被这个问题困扰。调试后我想我发现了问题。 .net 核心服务总线库中有一个默认值太低。
在 ShareAccessSignature 配置方法中将 TokenTimeToLive 设置为合理的值似乎可以解决。
sbcfg.SharedAccessSignature(sas =>
{
sas.TokenTimeToLive = TimeSpan.FromMinutes(20);
sas.KeyName = Configuration.GetValue<string>("ServiceBus:KeyName");
sas.SharedAccessKey = Configuration.GetValue<string>("ServiceBus:AccessKey");
});
有关信息,这些是 github 中与此默认行为相关的相关(未解决)问题。
https://github.com/Azure/azure-service-bus-dotnet/issues/672