Mass Transit - 只有一个 Consumer "handler" 正在处理消息
Mass Transit - only single Consumer "handler" is processing messages
我正在将我们的项目从内部 receiver/dispatcher 实施转换为公共交通。
事情最初进展顺利,但我 运行 遇到了一个问题,我正在努力确定其原因。
注意:当我们从旧实现迁移到新实现时,一些内部组件的命名可能看起来很奇怪。
消费者
我们正在 类 中实现我们的消费者,按用途分组。
我有这个测试分组:
public class TestCommandHandler :
IConsumer<PingCommand>,
IConsumer<TestCommand>
{
private readonly ILoggingProvider _logger;
public TestCommandHandler(
ILoggingProvider loggingProvider
{
_logger = loggingProvider;
}
public Task Consume(ConsumeContext<TestCommand> cmd)
{
_logger.Info(cmd.Message.Message);
if (cmd.SentTime.HasValue)
{
_logger.Info($"Sent to {cmd.SessionId()} at {cmd.SentTime} - duration to now {DateTime.UtcNow.Subtract(cmd.SentTime.Value)}");
}
return Task.CompletedTask;
}
public async Task Consume(ConsumeContext<PingCommand> cmd)
{
_logger.Info($"Received Ping Command for {cmd.Message.OperationId}");
_logger.Info($"Checking compression...");
// do some checks and throw if an issue
}
}
我也有这个 "polling" 结果分组:
public class PollCommandHandler :
IConsumer<PollResult>
{
private readonly IAggregateRepository<PollResult> _aggregateRepository;
public PollCommandHandler(IAggregateRepository<PollResult> aggregateRepository)
{
_aggregateRepository = aggregateRepository;
}
public async Task Consume(ConsumeContext<Model.Commands.PollResult> cmd)
{
var poll = _aggregateRepository.Find(cmd.Message.Id);
poll.AddPollResult(cmd.Message.RetailerId.ToString(), cmd.Message.PollDate, cmd.Message.PollResponseStatus, cmd.Message.BlobUri);
await _aggregateRepository.SaveAsync(poll, cmd.Message.Id.ToString());
}
总线设置
在我们的主机服务启动期间,我们使用 Autofac
来注册我们所有的组件(consumerAssembly
是具有上述消费者/处理程序的 Assembly
) :
builder.AddMassTransit(mt =>
{
mt.AddConsumers(consumerAssembly);
mt.AddBus(context => Bus.Factory.CreateUsingAzureServiceBus(x =>
{
x.RequiresSession = true;
x.MaxConcurrentCalls = 500;
x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
x.UseRenewLock(TimeSpan.FromMinutes(4));
var host = x.Host(serviceUri, h =>
{
h.SharedAccessSignature(s =>
{
s.KeyName = _config.Infrastructure.AzureServiceBusSharedSecretIssuer;
s.SharedAccessKey = _config.Infrastructure.AzureServiceBusSharedSecretValue;
s.TokenTimeToLive = TimeSpan.FromDays(1);
s.TokenScope = TokenScope.Namespace;
});
});
x.ReceiveEndpoint(host, $"mt.{QueueName.Command.ToString()}", ep =>
{
ep.RequiresSession = true;
ep.MaxConcurrentCalls = 500;
ep.RemoveSubscriptions = true;
ep.ConfigureConsumers(context);
});
}));
});
我看到的问题是 TestCommandHandler
处理的所有命令都被 接收和处理。这个不错。
但是,当我发送由 PollCommandHandler
处理的 PollResult
命令时,消息 未被 消费 - 相反,它们被移动到 skipped
队列,我不确定为什么。这很糟糕。
据我了解,mt.AddConsumers(consumerAssembly);
应该会自动扫描并注册我们定义的消费者,而ep.ConfigureConsumers(context);
会自动将它们连接起来,以便ReceiveEndpoint
知道在哪里派发留言至.
我在总线 post 上有 运行 一个 GetProbeResult()
配置,据我所知,所有的消费者都配置正确。
寻找指向我在设置时做错的事情的指针,或者寻找有关如何诊断问题的一些指导。
嗯,这很尴尬。
看起来我们有一个 PollCommandHandler
和一个 FtpPollCommandHandler
,以及相应的 PollResult
和 FtpPollResult
命令。
我已迁移 PollCommandHandler
成为消费者,但正在使用 FtpPollResult
命令进行测试。
因此,Mass Transit 将消息移至跳过的队列,因为确实没有与之关联的消费者。
我正在将我们的项目从内部 receiver/dispatcher 实施转换为公共交通。
事情最初进展顺利,但我 运行 遇到了一个问题,我正在努力确定其原因。
注意:当我们从旧实现迁移到新实现时,一些内部组件的命名可能看起来很奇怪。
消费者
我们正在 类 中实现我们的消费者,按用途分组。
我有这个测试分组:
public class TestCommandHandler :
IConsumer<PingCommand>,
IConsumer<TestCommand>
{
private readonly ILoggingProvider _logger;
public TestCommandHandler(
ILoggingProvider loggingProvider
{
_logger = loggingProvider;
}
public Task Consume(ConsumeContext<TestCommand> cmd)
{
_logger.Info(cmd.Message.Message);
if (cmd.SentTime.HasValue)
{
_logger.Info($"Sent to {cmd.SessionId()} at {cmd.SentTime} - duration to now {DateTime.UtcNow.Subtract(cmd.SentTime.Value)}");
}
return Task.CompletedTask;
}
public async Task Consume(ConsumeContext<PingCommand> cmd)
{
_logger.Info($"Received Ping Command for {cmd.Message.OperationId}");
_logger.Info($"Checking compression...");
// do some checks and throw if an issue
}
}
我也有这个 "polling" 结果分组:
public class PollCommandHandler :
IConsumer<PollResult>
{
private readonly IAggregateRepository<PollResult> _aggregateRepository;
public PollCommandHandler(IAggregateRepository<PollResult> aggregateRepository)
{
_aggregateRepository = aggregateRepository;
}
public async Task Consume(ConsumeContext<Model.Commands.PollResult> cmd)
{
var poll = _aggregateRepository.Find(cmd.Message.Id);
poll.AddPollResult(cmd.Message.RetailerId.ToString(), cmd.Message.PollDate, cmd.Message.PollResponseStatus, cmd.Message.BlobUri);
await _aggregateRepository.SaveAsync(poll, cmd.Message.Id.ToString());
}
总线设置
在我们的主机服务启动期间,我们使用 Autofac
来注册我们所有的组件(consumerAssembly
是具有上述消费者/处理程序的 Assembly
) :
builder.AddMassTransit(mt =>
{
mt.AddConsumers(consumerAssembly);
mt.AddBus(context => Bus.Factory.CreateUsingAzureServiceBus(x =>
{
x.RequiresSession = true;
x.MaxConcurrentCalls = 500;
x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
x.UseRenewLock(TimeSpan.FromMinutes(4));
var host = x.Host(serviceUri, h =>
{
h.SharedAccessSignature(s =>
{
s.KeyName = _config.Infrastructure.AzureServiceBusSharedSecretIssuer;
s.SharedAccessKey = _config.Infrastructure.AzureServiceBusSharedSecretValue;
s.TokenTimeToLive = TimeSpan.FromDays(1);
s.TokenScope = TokenScope.Namespace;
});
});
x.ReceiveEndpoint(host, $"mt.{QueueName.Command.ToString()}", ep =>
{
ep.RequiresSession = true;
ep.MaxConcurrentCalls = 500;
ep.RemoveSubscriptions = true;
ep.ConfigureConsumers(context);
});
}));
});
我看到的问题是 TestCommandHandler
处理的所有命令都被 接收和处理。这个不错。
但是,当我发送由 PollCommandHandler
处理的 PollResult
命令时,消息 未被 消费 - 相反,它们被移动到 skipped
队列,我不确定为什么。这很糟糕。
据我了解,mt.AddConsumers(consumerAssembly);
应该会自动扫描并注册我们定义的消费者,而ep.ConfigureConsumers(context);
会自动将它们连接起来,以便ReceiveEndpoint
知道在哪里派发留言至.
我在总线 post 上有 运行 一个 GetProbeResult()
配置,据我所知,所有的消费者都配置正确。
寻找指向我在设置时做错的事情的指针,或者寻找有关如何诊断问题的一些指导。
嗯,这很尴尬。
看起来我们有一个 PollCommandHandler
和一个 FtpPollCommandHandler
,以及相应的 PollResult
和 FtpPollResult
命令。
我已迁移 PollCommandHandler
成为消费者,但正在使用 FtpPollResult
命令进行测试。
因此,Mass Transit 将消息移至跳过的队列,因为确实没有与之关联的消费者。