使用 Autofac 在 Masstransit 中注册一个依赖于 IConsumer<T>

Register an IConsumer<T> with dependency in Masstransit using Autofac

我有 2 个系统,一个用于发布消息,另一个用于使用消息。两者都使用 Masstransit(使用 RabbitMQ)并使用 ASP.Net web api 2 和 OWIN(以及作为 IoC 容器的 Autofac)实现。 如果我的消费者没有依赖项,一切正常,但是当我向我的消费者注入依赖项时,Consume 方法永远不会执行(初始化期间没有抛出错误)。

这是相关的出版商代码:

//Startup.cs
public class Startup
{
    public void Configuration(IAppBuilder app)
    {
        HttpConfiguration config = new HttpConfiguration();

        IContainer container = null;
        var builder = new ContainerBuilder();

        builder.Register(context =>
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                IRabbitMqHost rabbitMqHost = cfg.Host(new Uri(ConfigurationManager.AppSettings["RabbitMQHost"]), settings =>
                {
                    settings.Username(ConfigurationManager.AppSettings["RabbitMQUser"]);
                    settings.Password(ConfigurationManager.AppSettings["RabbitMQPassword"]);
                });
            });

            return busControl;
        })
        .As<IBusControl>()
        .As<IBus>()
        .SingleInstance();

        // Register Web API controllers
        builder.RegisterApiControllers(Assembly.GetExecutingAssembly());

        // Resolve dependencies
        container = builder.Build();
        config.DependencyResolver = AutofacWebApiDependencyResolver(container);

        WebApiConfig.Register(config);
        SwaggerConfig.Register(config);
        app.UseCors(CorsOptions.AllowAll);

        // Register the Autofac middleware FIRST.
        app.UseAutofacMiddleware(container);
        app.UseWebApi(config);

        // Starts MassTransit Service bus, and registers stopping of bus on app dispose
        var bus = container.Resolve<IBusControl>();
        var busHandle = bus.StartAsync();
        var properties = new AppProperties(app.Properties);
        if (properties.OnAppDisposing != CancellationToken.None)
        {
            properties.OnAppDisposing.Register(() => busHandle.Result.StopAsync(TimeSpan.FromSeconds(30)));
        }
    }
}

// Controller
public IHttpActionResult Post()
{
    _bus.Publish<IFooMessage>(new
    {
        Foo = "Foo"
    });

    return Ok();
}

这是相关的 Consumer 代码:

// Startup.cs
public class Startup
{
    public void Configuration(IAppBuilder app)
    {
        HttpConfiguration config = new HttpConfiguration();

        IContainer container = null;
        var builder = new ContainerBuilder();

        builder.RegisterType<FooService>().As<IFooService>().InstancePerRequest();
        builder.RegisterModule<BusModule>();
        builder.RegisterModule<ConsumersModule>();

        // Register Web API controllers
        builder.RegisterApiControllers(Assembly.GetExecutingAssembly());

        // Resolve dependencies
        container = builder.Build();
        config.DependencyResolver = new AutofacWebApiDependencyResolver(container);

        WebApiConfig.Register(config);
        SwaggerConfig.Register(config);
        app.UseCors(CorsOptions.AllowAll);

        // Register the Autofac middleware FIRST.
        app.UseAutofacMiddleware(container);
        app.UseWebApi(config);

        // Starts MassTransit Service bus, and registers stopping of bus on app dispose
        var bus = container.Resolve<IBusControl>();
        var busHandle = bus.StartAsync();
        var properties = new AppProperties(app.Properties);
        if (properties.OnAppDisposing != CancellationToken.None)
        {
            properties.OnAppDisposing.Register(() => busHandle.Result.StopAsync(TimeSpan.FromSeconds(30)));
        }
    }
}

// BusModule.cs
public class BusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        builder.Register(context =>
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                IRabbitMqHost rabbitMqHost = cfg.Host(new Uri(ConfigurationManager.AppSettings["RabbitMQHost"]), settings =>
                {
                    settings.Username(ConfigurationManager.AppSettings["RabbitMQUser"]);
                    settings.Password(ConfigurationManager.AppSettings["RabbitMQPassword"]);
                });
                cfg.ReceiveEndpoint(rabbitMqHost, "IP.AgilePoint.queue", ec =>
                {
                    ec.LoadFrom(context);
                });
            });

            return busControl;
        })
        .SingleInstance()
        .As<IBusControl>()
        .As<IBus>();
    }
}

// ConsumerModule.cs
public class ConsumersModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        builder.RegisterType<FooConsumer>();
    }
}

// FooConsumer.cs
public class FooConsumer : IConsumer<IFooMessage>
{
    private IFooService _service;

    public FooConsumer(IFooService service)
    {
        _service = service;
    }

    public Task Consume(ConsumeContext<IFooMessage> context)
    {
        IFooMessage @event = context.Message;

        _service.DoStuff(@event.Foo);

        return Task.FromResult(context.Message);
    }
}

请注意,我的 FooConsumer 依赖于(构造函数)IFooService。 我关注了 Masstransit documentation,但我无法让它发挥作用。我做错了什么?

框架版本:

更新:

可以找到代码in this Github repository

我建议查看特定于 Autofac 的文档,它是一个通过扩展库得到完全支持的容器。

http://masstransit-project.com/MassTransit/usage/containers/autofac.html

包裹: https://www.nuget.org/packages/masstransit.autofac

终于发现自己哪里做错了。出于某种原因,我在 RabbitMQ 管理中看不到我的队列。当我能够看到错误队列时,我注意到以下错误:

我是这样注册我的 IFooService 的:

builder.RegisterType<FooService>().As<IFooService>().InstancePerRequest();

InstancePerRequest() 导致了错误。如果我使用 builder.RegisterType<FooService>().As<IFooService>() 注册服务,一切正常。我认为这是因为我的总线实例作为单例工作(注册为 SingleIsntace())。使用网络 api 项目托管我的 bus/consumers 的事实让我感到困惑。

无论如何,感谢@Chris Patterson 和@Alexey Zimarev 为我指明了正确的实施方向(使用 MT 扩展来注册消费者)。