使用 NServiceBus 发布大量消息
Publishing bulk of messages using NService Bus
我想使用 NServiceBus 发布大量消息。
我发现了一个重载的 Publish 方法,它将消息列表作为参数。
当我尝试使用它时,它显示以下错误:
var message= new Message
{
Id = id,
Timestamp = DateTime.SpecifyKind(DateTime.Now, DateTimeKind.Local),
PublishingStartTimestamp = DateTime.SpecifyKind(DateTime.Now, DateTimeKind.Local)
};
bus.Publish(new[] { message, message});
错误:'NServiceBus.IBus.Publish(params T[])' 已过时:'Removed to reduce complexity and API confusion. See https://github.com/Particular/NServiceBus/issues/1346 for more information. Will be removed in version 5.0.0.'
发布大量消息的替代方法是什么。
我的任务是每秒发布 600 个事件。
NServicebus支持异步发布消息:https://particular.net/blog/async-await-its-time
假设您不在接收消息上下文中,您可以在任务列表中收集所有发布任务,然后让任务 运行 与 WhenAll 异步。
还要确保您没有达到 azure 服务总线每秒最大消息数的限制。
更新
请参阅 Ramon Smit 的回答以获得完整答案。
高音量
我不确定您的用例是什么。如果这是一个一致的消息流,600 msg/s 相当于单个端点每天将近 5200 万条消息。
用例
我们会对您的用例感兴趣,我们可以在我们的 google 组中公开讨论这个问题
我们也可以通过 support@particular.net
联系我们私下进行此操作
作为单个或单独的消息接收
如果您发送这 600 条消息,您是否希望在接收端将它们视为一条消息?我问的原因是问题中 API 的作用。它创建一个包含所有这些消息的信封。在传输中,您会看到一条物理消息。在接收端,所有消息将作为单个接收操作处理。
如果这是您想要的行为,您必须创建一个包含集合 (array/list) 的消息,并将所有这些消息片段放入其中。
这个策略的好处是你在合同中使行为更加明确。
var message = new ParentMessage();
for(int i=0;i<600;i++) message.Children.Add(new ChildFragment());
Bus.Publish(message);
传输消息大小限制
Azure 服务总线消息在标准层上最多为 256KB,在高级层上最多为 1MB。如果片段包含大量数据,以前使用单个大消息的解决方案可能行不通。
接收上下文
在您的问题中,不清楚您是在接收消息的上下文中还是在接收上下文之外发送这些消息。每个人的行为都不同。
在传入消息上下文中,发送被缓冲。在传入消息上下文之外,消息被直接推送到传输。
批量发送
您在 NServiceBus V5 或 V6 发送中使用 Azure 服务总线,默认情况下会进行缓冲。如果您在 handler/saga 中执行 publish/send,则消息不会立即推送到传输。
此行为的原因是为了防止 'ghost' 消息。假设您发送了一条消息,然后您创建了第二条消息,并且在映射期间发生了空引用异常。您是否希望第一条消息已经被推送到传输中?可能不是,这是默认行为。
对于 Azure 服务总线,此行为也存在于 V5 中,方法是执行事务范围登记。如果您不希望 V5 中出现此行为,请通过禁用分布式事务来禁用事务范围的使用。
异步
在 V6 中,API 是异步的,这意味着您可以执行以下操作:
var tasks = new List<Task>();
for(int i=0;i<600;i++) tasks.Add(context.Publish(new MyEvent()));
await Task.WhenAll(tasks).ConfigureAwait(false);;
由于缓冲发送,除非消息的创建正在执行 IO,否则这不会真正产生巨大差异。
如果您在接收消息上下文之外执行此操作,那么它确实会提高性能。
var tasks = new List<Task>();
for(int i=0;i<600;i++) tasks.Add(messageSession.Publish(new MyEvent()));
await Task.WhenAll(tasks).ConfigureAwait(false);;
这是因为消息将被立即并发地推送到传输。
立即发货
如果您明确不想缓冲消息,那么您可以使用立即分派。以下为V6样例
var options = new SendOptions();
options.RequireImmediateDispatch();
var message = new MyMessage();
await context.Send(message, options).ConfigureAwait(false);
性能
您在描述中使用的API是NServiceBus V5或更早版本。在 V6 中,Azure 服务总线的性能有了显着提高。
有很多变量可以调整以提高性能。
对于 V6 中的单个端点实例,每秒 send/receive 数千条消息应该不是问题。
我想使用 NServiceBus 发布大量消息。 我发现了一个重载的 Publish 方法,它将消息列表作为参数。 当我尝试使用它时,它显示以下错误:
var message= new Message
{
Id = id,
Timestamp = DateTime.SpecifyKind(DateTime.Now, DateTimeKind.Local),
PublishingStartTimestamp = DateTime.SpecifyKind(DateTime.Now, DateTimeKind.Local)
};
bus.Publish(new[] { message, message});
错误:'NServiceBus.IBus.Publish(params T[])' 已过时:'Removed to reduce complexity and API confusion. See https://github.com/Particular/NServiceBus/issues/1346 for more information. Will be removed in version 5.0.0.'
发布大量消息的替代方法是什么。 我的任务是每秒发布 600 个事件。
NServicebus支持异步发布消息:https://particular.net/blog/async-await-its-time
假设您不在接收消息上下文中,您可以在任务列表中收集所有发布任务,然后让任务 运行 与 WhenAll 异步。
还要确保您没有达到 azure 服务总线每秒最大消息数的限制。
更新 请参阅 Ramon Smit 的回答以获得完整答案。
高音量
我不确定您的用例是什么。如果这是一个一致的消息流,600 msg/s 相当于单个端点每天将近 5200 万条消息。
用例
我们会对您的用例感兴趣,我们可以在我们的 google 组中公开讨论这个问题
我们也可以通过 support@particular.net
联系我们私下进行此操作作为单个或单独的消息接收
如果您发送这 600 条消息,您是否希望在接收端将它们视为一条消息?我问的原因是问题中 API 的作用。它创建一个包含所有这些消息的信封。在传输中,您会看到一条物理消息。在接收端,所有消息将作为单个接收操作处理。
如果这是您想要的行为,您必须创建一个包含集合 (array/list) 的消息,并将所有这些消息片段放入其中。
这个策略的好处是你在合同中使行为更加明确。
var message = new ParentMessage();
for(int i=0;i<600;i++) message.Children.Add(new ChildFragment());
Bus.Publish(message);
传输消息大小限制
Azure 服务总线消息在标准层上最多为 256KB,在高级层上最多为 1MB。如果片段包含大量数据,以前使用单个大消息的解决方案可能行不通。
接收上下文
在您的问题中,不清楚您是在接收消息的上下文中还是在接收上下文之外发送这些消息。每个人的行为都不同。
在传入消息上下文中,发送被缓冲。在传入消息上下文之外,消息被直接推送到传输。
批量发送
您在 NServiceBus V5 或 V6 发送中使用 Azure 服务总线,默认情况下会进行缓冲。如果您在 handler/saga 中执行 publish/send,则消息不会立即推送到传输。
此行为的原因是为了防止 'ghost' 消息。假设您发送了一条消息,然后您创建了第二条消息,并且在映射期间发生了空引用异常。您是否希望第一条消息已经被推送到传输中?可能不是,这是默认行为。
对于 Azure 服务总线,此行为也存在于 V5 中,方法是执行事务范围登记。如果您不希望 V5 中出现此行为,请通过禁用分布式事务来禁用事务范围的使用。
异步
在 V6 中,API 是异步的,这意味着您可以执行以下操作:
var tasks = new List<Task>();
for(int i=0;i<600;i++) tasks.Add(context.Publish(new MyEvent()));
await Task.WhenAll(tasks).ConfigureAwait(false);;
由于缓冲发送,除非消息的创建正在执行 IO,否则这不会真正产生巨大差异。
如果您在接收消息上下文之外执行此操作,那么它确实会提高性能。
var tasks = new List<Task>();
for(int i=0;i<600;i++) tasks.Add(messageSession.Publish(new MyEvent()));
await Task.WhenAll(tasks).ConfigureAwait(false);;
这是因为消息将被立即并发地推送到传输。
立即发货
如果您明确不想缓冲消息,那么您可以使用立即分派。以下为V6样例
var options = new SendOptions();
options.RequireImmediateDispatch();
var message = new MyMessage();
await context.Send(message, options).ConfigureAwait(false);
性能
您在描述中使用的API是NServiceBus V5或更早版本。在 V6 中,Azure 服务总线的性能有了显着提高。
有很多变量可以调整以提高性能。
对于 V6 中的单个端点实例,每秒 send/receive 数千条消息应该不是问题。