如何使用 MassTransit 从投票队列中检索消息?
How to retrieve messages from turnout queue using MassTransit?
我正在尝试按照本文 http://docs.masstransit-project.com/en/latest/advanced/turnout.html.
使用 MassTransit 和 Azure 服务总线创建应用程序
在我启动 Azure 服务总线中的应用程序后,创建了两个队列(其中一个已过期)。在我执行订阅者后,创建了投票队列,消息从 main 移动到这个队列。如果订户工作,我可以检索消息。如果我停止订阅者(终止进程或关闭机器)消息仍在投票队列中。下次我执行订阅者时,它会创建新的投票队列,并且我不会检索已处理但未完成的消息。那么,怎样才能不丢失消息呢?还有我如何设置在一个节点中处理的最大消息数的限制?
_busControl = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
var host = cfg.Host("********", h =>
{
//h.OperationTimeout = TimeSpan.FromMinutes(1);
});
cfg.MaxConcurrentCalls = 1;
cfg.UseServiceBusMessageScheduler();
cfg.TurnoutEndpoint<ISimpleRequest>(host, "test_longruning",
e =>
{
e.SuperviseInterval = TimeSpan.FromSeconds(30);
e.PartitionCount = 1;
e.SetJobFactory(async context =>
{
Console.WriteLine($"{DateTime.Now} Start Message: {context.Command.CustomerId}");
await Task.Delay(TimeSpan.FromMinutes(7), context.CancellationToken);
Console.WriteLine($"{DateTime.Now} End Message: {context.Command.CustomerId}");
});
});
});
首先,我要提醒您,此时的投票率很高 pre-production。虽然它在快乐的道路上工作,但服务故障的处理还不够完善。虽然消息生存时间设置应该以命令返回正确的队列结束,但尚未经过广泛测试。
就是说,您可以使用 ServiceBusExplorer 将消息移回适当的队列,我就是这样做的。它是手动的,但它是唯一真正让您完全控制服务总线环境的工具。
我正在尝试按照本文 http://docs.masstransit-project.com/en/latest/advanced/turnout.html.
使用 MassTransit 和 Azure 服务总线创建应用程序在我启动 Azure 服务总线中的应用程序后,创建了两个队列(其中一个已过期)。在我执行订阅者后,创建了投票队列,消息从 main 移动到这个队列。如果订户工作,我可以检索消息。如果我停止订阅者(终止进程或关闭机器)消息仍在投票队列中。下次我执行订阅者时,它会创建新的投票队列,并且我不会检索已处理但未完成的消息。那么,怎样才能不丢失消息呢?还有我如何设置在一个节点中处理的最大消息数的限制?
_busControl = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
var host = cfg.Host("********", h =>
{
//h.OperationTimeout = TimeSpan.FromMinutes(1);
});
cfg.MaxConcurrentCalls = 1;
cfg.UseServiceBusMessageScheduler();
cfg.TurnoutEndpoint<ISimpleRequest>(host, "test_longruning",
e =>
{
e.SuperviseInterval = TimeSpan.FromSeconds(30);
e.PartitionCount = 1;
e.SetJobFactory(async context =>
{
Console.WriteLine($"{DateTime.Now} Start Message: {context.Command.CustomerId}");
await Task.Delay(TimeSpan.FromMinutes(7), context.CancellationToken);
Console.WriteLine($"{DateTime.Now} End Message: {context.Command.CustomerId}");
});
});
});
首先,我要提醒您,此时的投票率很高 pre-production。虽然它在快乐的道路上工作,但服务故障的处理还不够完善。虽然消息生存时间设置应该以命令返回正确的队列结束,但尚未经过广泛测试。
就是说,您可以使用 ServiceBusExplorer 将消息移回适当的队列,我就是这样做的。它是手动的,但它是唯一真正让您完全控制服务总线环境的工具。