使用 AWS SQS 的 MassTransit - 异常发送动态消息

MassTransit with AWS SQS - exception sending dynamic message

我将 Masstransit 与 AWS SQS/SNS 一起用作交通工具。 现在我遇到了一个简单的问题——将 CorrelationId 添加到已发布的消息中。由于我遵循将接口用作 DTO 的建议,因此我不能使用 CorrelatedBy<Guid> 接口,因此我决定直接将 __CorrelationId 属性 添加到消息中。

我正在使用以下包装器发布消息:

public class MessageBus : IMessageBus
{
    // ... omitted for brevity

    public async Task Publish<T>(object message)
        where T : class
    {
        await this.publishEndpoint.Publish<T>(this.CreateCorrelatedMessage(message));
    }

    private object CreateCorrelatedMessage(object message)
    {
        dynamic corellatedMessage = new ExpandoObject();

        foreach (var property in message.GetType().GetProperties())
        {
            ((IDictionary<string, object>)corellatedMessage).Add(property.Name, property.GetValue(message));
        }

        corellatedMessage.__CorrelationId = this.correlationIdProvider.CorrelationId;
        return corellatedMessage;
    }
}

这是用法:

        await this.messageBus.Publish<ObtainRequestToken>(new
        {
            Social = SocialEnum.Twitter,
            AppId = credentials.First(x => x.Name == "TwitterConsumerKey").Value,
            AppSecret = credentials.First(x => x.Name == "TwitterConsumerSecret").Value,
            ReturnUrl = returnUrl
        });

这在具有以下总线注册的控制台应用程序中运行良好:

        var busControl = Bus.Factory.CreateUsingAmazonSqs(cfg =>
        {
            cfg.Host("eu-west-1", h =>
            {
                h.AccessKey("xxx");
                h.SecretKey("xxx");

                h.Scope($"Local", scopeTopics: true);
            });
        });

但是 ASP.NET 核心应用程序

        services.AddMassTransit(cfg =>
        {
            cfg.UsingAmazonSqs((context, sqsCfg) =>
            {
                var amazonAccount = this.Configuration
                    .GetSection("AmazonAccount")
                    .Get<AmazonAccountConfig>();

                sqsCfg.Host(amazonAccount.Region, h =>
                {
                    h.AccessKey(amazonAccount.KeyId);
                    h.SecretKey(amazonAccount.SecretKey);

                    h.Scope(this.Environment.EnvironmentName, scopeTopics: true);
                });
            });

            cfg.SetEndpointNameFormatter(new DefaultEndpointNameFormatter(this.Environment.EnvironmentName + "_", false));
        });

        services.AddMassTransitHostedService();

使用 Object reference not set to an instance of an object. 和 stacktrace

发布失败
   at MassTransit.Logging.EnabledDiagnosticSource.StartSendActivity[T](SendContext1 context, ValueTuple2[] tags)
   at MassTransit.AmazonSqsTransport.Transport.TopicSendTransport.SendPipe1.<Send>d__5.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at GreenPipes.Agents.PipeContextSupervisor1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at GreenPipes.Agents.PipeContextSupervisor1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at GreenPipes.Agents.PipeContextSupervisor1.<GreenPipes-IPipeContextSource<TContext>-Send>d__7.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at MassTransit.Initializers.MessageInitializer2.<Send>d__9.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at MassTransit.Transports.PublishEndpoint.<>c__DisplayClass18_01.<<PublishInternal>g__PublishAsync|0>d.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Dynamic.UpdateDelegates.UpdateAndExecuteVoid1[T0](CallSite site, T0 arg0)
   at xxx.Main.Api.Messaging.MessageBus.<Publish>d__51.MoveNext() in D:\repo\projects\xxx\mainapi\xxx.Main.Api\Messaging\MessageBus.cs:line 34 

StartSendActivity 看起来像是简单的诊断方法,我无法弄明白,那里可能会失败。

您无法使用 MassTransit 发送 object 或其他类似的类型。消息必须是引用类型。

Relevant Documentation