使用 MassTransit SQS 时如何设置 MessageGroupId?
How can I set the MessageGroupId when using MassTransit SQS?
我尝试使用 MassTransit SQS 发布消息。这是我的配置代码:
public static void UseMassTransit(this IServiceCollection services, MassTransitConfiguration massTransitConfiguration)
{
services.AddMassTransit(x =>
{
x.AddConsumer<CustomerChangeConsumer>();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(massTransitConfiguration.Host, h =>
{
h.AccessKey(massTransitConfiguration.AccessKey);
h.SecretKey(massTransitConfiguration.SecretKey);
h.EnableScopedTopics();
});
cfg.ReceiveEndpoint("CustomerChangeConsumer",
configurator =>
{
configurator.ConfigureConsumer<CustomerChangeConsumer>(context);
});
cfg.Message<CustomerUpdate>(x =>
{
x.SetEntityName("customerupdate.fifo");
});
cfg.Publish<CustomerUpdate>(x =>
{
x.TopicAttributes["FifoTopic"] = "true";
});
});
});
services.AddMassTransitHostedService();
}
这是我的发布代码:
public class Publisher : IPublisher
{
private readonly IPublishEndpoint _publishEndpoint;
public Publisher(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public async Task PublishAsync(CustomerUpdate customerUpdate)
{
await _publishEndpoint.Publish(customerUpdate);
}
}
这是我的错误信息:
Invalid parameter: The MessageGroupId parameter is required for FIFO topics
异常类型如下:
Amazon.SimpleNotificationService.Model.InvalidParameterException
这是堆栈跟踪:
Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.HandleExceptionStream(IRequestContext requestContext, IWebResponseData httpErrorResponse, HttpErrorResponseException exception, Stream responseStream)
Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.<HandleExceptionAsync>d__2.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ExceptionHandler`1.<HandleAsync>d__6.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
Amazon.Runtime.Internal.ErrorHandler.<ProcessExceptionAsync>d__8.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ErrorHandler.<InvokeAsync>d__5`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.EndpointDiscoveryHandler.<InvokeAsync>d__2`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
Amazon.Runtime.Internal.EndpointDiscoveryHandler.<InvokeAsync>d__2`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CredentialsRetriever.<InvokeAsync>d__7`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
Amazon.Runtime.Internal.RetryHandler.<InvokeAsync>d__10`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
Amazon.Runtime.Internal.RetryHandler.<InvokeAsync>d__10`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ErrorCallbackHandler.<InvokeAsync>d__5`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.MetricsHandler.<InvokeAsync>d__1`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Contexts.AmazonSqsClientContext.<MassTransit-AmazonSqsTransport-ClientContext-Publish>d__18.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Transport.TopicSendTransport.SendPipe`1.<Send>d__5.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
MassTransit.AmazonSqsTransport.Transport.TopicSendTransport.SendPipe`1.<Send>d__5.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.TaskAwaiter.GetResult()
显然,我需要在某处设置 MessageGroupId
。怎么做?
发布时可以设置MessageGroupId
:
public class Publisher : IPublisher
{
private readonly IPublishEndpoint _publishEndpoint;
public Publisher(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public async Task PublishAsync(CustomerUpdate customerUpdate)
{
await _publishEndpoint.Publish(customerUpdate, x =>
x.SetGroupId("SomeGroupId"));
}
}
我尝试使用 MassTransit SQS 发布消息。这是我的配置代码:
public static void UseMassTransit(this IServiceCollection services, MassTransitConfiguration massTransitConfiguration)
{
services.AddMassTransit(x =>
{
x.AddConsumer<CustomerChangeConsumer>();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(massTransitConfiguration.Host, h =>
{
h.AccessKey(massTransitConfiguration.AccessKey);
h.SecretKey(massTransitConfiguration.SecretKey);
h.EnableScopedTopics();
});
cfg.ReceiveEndpoint("CustomerChangeConsumer",
configurator =>
{
configurator.ConfigureConsumer<CustomerChangeConsumer>(context);
});
cfg.Message<CustomerUpdate>(x =>
{
x.SetEntityName("customerupdate.fifo");
});
cfg.Publish<CustomerUpdate>(x =>
{
x.TopicAttributes["FifoTopic"] = "true";
});
});
});
services.AddMassTransitHostedService();
}
这是我的发布代码:
public class Publisher : IPublisher
{
private readonly IPublishEndpoint _publishEndpoint;
public Publisher(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public async Task PublishAsync(CustomerUpdate customerUpdate)
{
await _publishEndpoint.Publish(customerUpdate);
}
}
这是我的错误信息:
Invalid parameter: The MessageGroupId parameter is required for FIFO topics
异常类型如下:
Amazon.SimpleNotificationService.Model.InvalidParameterException
这是堆栈跟踪:
Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.HandleExceptionStream(IRequestContext requestContext, IWebResponseData httpErrorResponse, HttpErrorResponseException exception, Stream responseStream)
Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.<HandleExceptionAsync>d__2.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ExceptionHandler`1.<HandleAsync>d__6.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
Amazon.Runtime.Internal.ErrorHandler.<ProcessExceptionAsync>d__8.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ErrorHandler.<InvokeAsync>d__5`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.EndpointDiscoveryHandler.<InvokeAsync>d__2`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
Amazon.Runtime.Internal.EndpointDiscoveryHandler.<InvokeAsync>d__2`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CredentialsRetriever.<InvokeAsync>d__7`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
Amazon.Runtime.Internal.RetryHandler.<InvokeAsync>d__10`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
Amazon.Runtime.Internal.RetryHandler.<InvokeAsync>d__10`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.CallbackHandler.<InvokeAsync>d__9`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.ErrorCallbackHandler.<InvokeAsync>d__5`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
Amazon.Runtime.Internal.MetricsHandler.<InvokeAsync>d__1`1.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Contexts.AmazonSqsClientContext.<MassTransit-AmazonSqsTransport-ClientContext-Publish>d__18.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
MassTransit.AmazonSqsTransport.Transport.TopicSendTransport.SendPipe`1.<Send>d__5.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
MassTransit.AmazonSqsTransport.Transport.TopicSendTransport.SendPipe`1.<Send>d__5.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
GreenPipes.Agents.PipeContextSupervisor`1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
System.Runtime.CompilerServices.TaskAwaiter.GetResult()
显然,我需要在某处设置 MessageGroupId
。怎么做?
发布时可以设置MessageGroupId
:
public class Publisher : IPublisher
{
private readonly IPublishEndpoint _publishEndpoint;
public Publisher(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public async Task PublishAsync(CustomerUpdate customerUpdate)
{
await _publishEndpoint.Publish(customerUpdate, x =>
x.SetGroupId("SomeGroupId"));
}
}