Azure 服务总线 - 会话正在复制传递到多个实例的消息
Azure service bus - Session is duplicating messages in delivery to multiple instances
我在一个系统上工作,我需要在其中管理有序消息并抛出服务总线。在我的公司,我们在 Azure 云上拥有一切,我正在使用 Azure 服务总线进行消息传递。
我阅读了有关该会话的信息,它似乎可以通过 Azure 服务总线解决我开箱即用的挑战,但它仅适用于单个实例。一旦我将 we job 扩展到多个实例,就会有多个实例使用来自同一会话的相同消息。
在我的 POC 中,我使用带有订阅的主题。
是否有人能够将其扩展到接收器的多个实例?
我的第二个选择是自己实现 Resequencer 企业模式,你有什么建议吗?
这是我用来接收消息的代码:
static async Task InitializeReceiver(string connectionString, string queueName, CancellationToken ct)
{
var receiverFactory = MessagingFactory.CreateFromConnectionString(connectionString);
ct.Register(() => receiverFactory.Close());
var client = receiverFactory.CreateSubscriptionClient(queueName, "parallel", ReceiveMode.ReceiveAndDelete);
client.RegisterSessionHandler(
typeof(SessionHandler),
new SessionHandlerOptions
{
AutoRenewTimeout = TimeSpan.FromMinutes(5),
MessageWaitTimeout = TimeSpan.FromSeconds(120),
MaxConcurrentSessions = 100,
AutoComplete = false
});
}
这是我用来处理的代码:
public async Task OnMessageAsync(MessageSession session, BrokeredMessage message)
{
await ProcessMessage(session, message, currentInstanceId, recipeStep);
// If I process the last message of the session
if (message.Sequence == numberOfMsgPerSession)
{
// end of the session!
await session.CloseAsync();
logger.LogInformation($"Session with id {message.SessionId} is completed.");
}
}
谢谢。
问候。
我在 Web 作业中使用计时器来触发我的方法。在扩展方面,计时器的数量与实例数量成比例地增加。在那里,我用 Azure 服务总线主题订阅替换了计时器。由于消息只能接收一次,因此即使有多个 运行 个实例,方法也只会被触发一次。
我有一个活跃的接收者在监听消息,检查here创建一个活跃的监听器。
正如您所说的消息被多次收到,您可能只是偷看了消息,而不是收到了消息。查看消息不要将其从订阅中删除。仅接收它会将其从订阅中删除,导致其他接收者无法使用该消息。
我解决了。
如果您看到我在问题中发布的代码,对于每个会话,当我根据其序列号处理最后一条消息时,我会关闭它。
有一次,我删除了那段代码,一切都开始正常工作,我在 10 个会话中每个会话发送 200 条消息。消息是按每个会话的顺序处理的,正如我所期望的那样,只有一个实例占用一个会话。
谢谢大家。
OaicStef
我在一个系统上工作,我需要在其中管理有序消息并抛出服务总线。在我的公司,我们在 Azure 云上拥有一切,我正在使用 Azure 服务总线进行消息传递。 我阅读了有关该会话的信息,它似乎可以通过 Azure 服务总线解决我开箱即用的挑战,但它仅适用于单个实例。一旦我将 we job 扩展到多个实例,就会有多个实例使用来自同一会话的相同消息。 在我的 POC 中,我使用带有订阅的主题。
是否有人能够将其扩展到接收器的多个实例? 我的第二个选择是自己实现 Resequencer 企业模式,你有什么建议吗?
这是我用来接收消息的代码:
static async Task InitializeReceiver(string connectionString, string queueName, CancellationToken ct)
{
var receiverFactory = MessagingFactory.CreateFromConnectionString(connectionString);
ct.Register(() => receiverFactory.Close());
var client = receiverFactory.CreateSubscriptionClient(queueName, "parallel", ReceiveMode.ReceiveAndDelete);
client.RegisterSessionHandler(
typeof(SessionHandler),
new SessionHandlerOptions
{
AutoRenewTimeout = TimeSpan.FromMinutes(5),
MessageWaitTimeout = TimeSpan.FromSeconds(120),
MaxConcurrentSessions = 100,
AutoComplete = false
});
}
这是我用来处理的代码:
public async Task OnMessageAsync(MessageSession session, BrokeredMessage message)
{
await ProcessMessage(session, message, currentInstanceId, recipeStep);
// If I process the last message of the session
if (message.Sequence == numberOfMsgPerSession)
{
// end of the session!
await session.CloseAsync();
logger.LogInformation($"Session with id {message.SessionId} is completed.");
}
}
谢谢。 问候。
我在 Web 作业中使用计时器来触发我的方法。在扩展方面,计时器的数量与实例数量成比例地增加。在那里,我用 Azure 服务总线主题订阅替换了计时器。由于消息只能接收一次,因此即使有多个 运行 个实例,方法也只会被触发一次。
我有一个活跃的接收者在监听消息,检查here创建一个活跃的监听器。
正如您所说的消息被多次收到,您可能只是偷看了消息,而不是收到了消息。查看消息不要将其从订阅中删除。仅接收它会将其从订阅中删除,导致其他接收者无法使用该消息。
我解决了。 如果您看到我在问题中发布的代码,对于每个会话,当我根据其序列号处理最后一条消息时,我会关闭它。 有一次,我删除了那段代码,一切都开始正常工作,我在 10 个会话中每个会话发送 200 条消息。消息是按每个会话的顺序处理的,正如我所期望的那样,只有一个实例占用一个会话。
谢谢大家。 OaicStef