MassTransit - 对大负载消费者进行单元测试

MassTransit - Unit Testing a Large Payload Consumer

我正在使用 InMemoryTestHarness 对使用 MessageData 属性 处理命令的消费者进行单元测试。使用 InMemoryMessageDataRepository 填充 MessageData 属性 工作正常,但是,当消费者尝试加载有效负载 (message.Body.Value) 时,我收到以下异常:"The message data was not loaded: urn:msgdata:xxxxxxxxxxxxxxxxxxxxxxxxxx"}。对使用 MessageData 属性 处理命令的消费者进行单元测试的正确方法是什么?

我打算使用内存传输,但这似乎不是正确的方法,因为单元测试不会立即收到来自消费者的反馈。最终的最终目标是让我的单元测试能够断言特定的异常被抛出或消费者成功完成而没有错误。

using System;
using System.Linq;
using System.Threading.Tasks;
using MassTransit;
using MassTransit.MessageData;
using MassTransit.Testing;
using Newtonsoft.Json;
using Xunit;

namespace Test
{
    public class LargePayloadConsumerTest
    {
        [Fact]
        public async Task Consumer_Should_Deserialize_Message()
        {
            var harness = new InMemoryTestHarness();
            var consumer = harness.Consumer<BigMessageConsumer>();
            var command = new BigMessageCommand();
            var dataRepo = new InMemoryMessageDataRepository();

            var largePayload = new BigMessagePayload()
            {
                Id = 1,
                FirstName = "TestFirstName",
                MiddleName = "TestMiddleName",
                LastName = "TestLastName"
            };


            var json = JsonConvert.SerializeObject(largePayload);
            command.Body = await dataRepo.PutString(json);

            var thisWorks = await command.Body.Value;
            await harness.Start();
            await harness.InputQueueSendEndpoint.Send(command);

            Assert.True(harness.Consumed.Select<BigMessageCommand>().Any());
        }

        public class BigMessageConsumer : IConsumer<BigMessageCommand>
        {
            public async Task Consume(ConsumeContext<BigMessageCommand> context)
            {
                try
                {
                    var json = await context.Message.Body.Value;
                    var deserialized = JsonConvert.DeserializeObject<BigMessagePayload>(json);
                    Console.WriteLine($"{deserialized.Id}: {deserialized.FirstName} {deserialized.MiddleName} {deserialized.LastName}");
                }
                catch (Exception e)
                {
                    //throws {"The message data was not loaded: urn:msgdata:xxxxxxxxxxxxxxxxxxxxxxxxxx"}
                    Console.WriteLine(e);
                    throw;
                }
            }
        }

        public class BigMessageCommand
        {
            public MessageData<string> Body { get; set; }
        }

        public class BigMessagePayload
        {
            public int Id { get; set; }
            public string FirstName { get; set; }
            public string LastName { get; set; }
            public string MiddleName { get; set; }
        }
    }
}

您需要配置接收端点来处理消息数据属性,类似这样:

e.UseMessageData<BigMessageCommand>(dataRepo);

由于您使用的是测试工具,因此可以在添加消费者测试工具之前将其添加到接收端点

void ConfigureReceiveEndpoint(IReceiveEndpointConfigurator configurator)
{
    configurator.UseMessageData<BigMessageCommand>(dataRepo);
}

harness.OnConfigureReceiveEndpoint += ConfigureReceiveEndpoint;