如何使用 MassTransit 从投票队列中检索消息?

How to retrieve messages from turnout queue using MassTransit?

我正在尝试按照本文 http://docs.masstransit-project.com/en/latest/advanced/turnout.html.

使用 MassTransit 和 Azure 服务总线创建应用程序

在我启动 Azure 服务总线中的应用程序后,创建了两个队列(其中一个已过期)。在我执行订阅者后,创建了投票队列,消息从 main 移动到这个队列。如果订户工作,我可以检索消息。如果我停止订阅者(终止进程或关闭机器)消息仍在投票队列中。下次我执行订阅者时,它会创建新的投票队列,并且我不会检索已处理但未完成的消息。那么,怎样才能不丢失消息呢?还有我如何设置在一个节点中处理的最大消息数的限制?

_busControl = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
        {
            var host = cfg.Host("********", h =>
            {
                    //h.OperationTimeout = TimeSpan.FromMinutes(1);
            });
            cfg.MaxConcurrentCalls = 1;
            cfg.UseServiceBusMessageScheduler();
            cfg.TurnoutEndpoint<ISimpleRequest>(host, "test_longruning",
                e =>
                {
                    e.SuperviseInterval = TimeSpan.FromSeconds(30);
                    e.PartitionCount = 1;
                    e.SetJobFactory(async context =>
                        {
                            Console.WriteLine($"{DateTime.Now} Start Message: {context.Command.CustomerId}");
                            await Task.Delay(TimeSpan.FromMinutes(7), context.CancellationToken);
                            Console.WriteLine($"{DateTime.Now} End Message: {context.Command.CustomerId}");
                        });
                });
        });

首先,我要提醒您,此时的投票率很高 pre-production。虽然它在快乐的道路上工作,但服务故障的处理还不够完善。虽然消息生存时间设置应该以命令返回正确的队列结束,但尚未经过广泛测试。

就是说,您可以使用 ServiceBusExplorer 将消息移回适当的队列,我就是这样做的。它是手动的,但它是唯一真正让您完全控制服务总线环境的工具。