配置更多时,MassTransit 仅批处理 10
MassTransit is only batching 10 when more is configured
我正在尝试配置 MassTransit 批处理,但是 运行 它一次只能批处理 10 个。
hostHandler = receiveEndpointConnector.ConnectReceiveEndpoint(queueName, (context, cfg) =>
{
cfg.TrySetPrefetchCount(2000);
cfg.Batch<T>(cfg =>
{
cfg.Consumer(() => consumer);
cfg.ConcurrencyLimit = 2;
cfg.MessageLimit = 1000;
cfg.TimeLimit = TimeSpan.FromSeconds(1);
});
cfg.UseMessageRetry(r => r.Immediate(2)));
});
await hostHandler.Ready;
在我发布问题 5 分钟后,我尝试更改批处理配置的顺序,并将消费者作为最后一个语句,成功了。
hostHandler = receiveEndpointConnector.ConnectReceiveEndpoint(queueName, (context, cfg) =>
{
cfg.TrySetPrefetchCount(2000);
cfg.Batch<T>(cfg =>
{
cfg.ConcurrencyLimit = 2;
cfg.MessageLimit = 1000;
cfg.TimeLimit = TimeSpan.FromSeconds(1);
cfg.Consumer(() => consumer);
});
cfg.UseMessageRetry(r => r.Immediate(2)));
});
await hostHandler.Ready;
您也可以使用较新的批处理语法,但仍需要在 Consumer
调用之前指定:
var handle = receiveEndpointConnector.ConnectReceiveEndpoint(queueName, (context, cfg) =>
{
cfg.TrySetPrefetchCount(2000);
cfg.UseMessageRetry(r => r.Immediate(2)));
cfg.ConfigureConsumer<YourConsumer>(context, cons =>
{
cons.Options<BatchOptions>(options => options
.SetMessageLimit(1000)
.SetTimeLimit(1000)
.SetConcurrencyLimit(2));
});
});
await handle.Ready;
由于您使用的是接收端点连接器,您还可以将消费者定义中的批处理选项配置为 shown in the documentation。
我正在尝试配置 MassTransit 批处理,但是 运行 它一次只能批处理 10 个。
hostHandler = receiveEndpointConnector.ConnectReceiveEndpoint(queueName, (context, cfg) =>
{
cfg.TrySetPrefetchCount(2000);
cfg.Batch<T>(cfg =>
{
cfg.Consumer(() => consumer);
cfg.ConcurrencyLimit = 2;
cfg.MessageLimit = 1000;
cfg.TimeLimit = TimeSpan.FromSeconds(1);
});
cfg.UseMessageRetry(r => r.Immediate(2)));
});
await hostHandler.Ready;
在我发布问题 5 分钟后,我尝试更改批处理配置的顺序,并将消费者作为最后一个语句,成功了。
hostHandler = receiveEndpointConnector.ConnectReceiveEndpoint(queueName, (context, cfg) =>
{
cfg.TrySetPrefetchCount(2000);
cfg.Batch<T>(cfg =>
{
cfg.ConcurrencyLimit = 2;
cfg.MessageLimit = 1000;
cfg.TimeLimit = TimeSpan.FromSeconds(1);
cfg.Consumer(() => consumer);
});
cfg.UseMessageRetry(r => r.Immediate(2)));
});
await hostHandler.Ready;
您也可以使用较新的批处理语法,但仍需要在 Consumer
调用之前指定:
var handle = receiveEndpointConnector.ConnectReceiveEndpoint(queueName, (context, cfg) =>
{
cfg.TrySetPrefetchCount(2000);
cfg.UseMessageRetry(r => r.Immediate(2)));
cfg.ConfigureConsumer<YourConsumer>(context, cons =>
{
cons.Options<BatchOptions>(options => options
.SetMessageLimit(1000)
.SetTimeLimit(1000)
.SetConcurrencyLimit(2));
});
});
await handle.Ready;
由于您使用的是接收端点连接器,您还可以将消费者定义中的批处理选项配置为 shown in the documentation。