如果我有消息类型列表,如何在 MassTransit 中注册通用消费者适配器
How to register a generic consumer adapter in MassTransit if I have a list of message types
我成功地将 MassTransit 用于一个愚蠢的示例应用程序,我在其中从 Publisher 控制台应用程序发布一条消息(一个事件),并在两个不同的消费者处接收到它,这两个消费者也是使用 RabbitMq 的控制台应用程序。
这是整个示例项目git repo: https://gitlab.com/DiegoDrivenDesign/DiDrDe.MessageBus
我想要一个包含 MassTransit 功能的项目,这样我的 Publisher 和 Consumers 项目对 MassTransit 一无所知。依赖关系应该朝这个方向发展:
- DiDrDe.MessageBus ==> 地铁
- DiDrDe.MessageBus ==> DiDrDe.Contracts
- DiDrDe.Model ==> DiDrDe.Contracts
- DiDrDe.Publisher ==> DiDrDe.MessageBus
- DiDrDe.Publisher ==> DiDrDe.Contracts
- DiDrDe.Publisher ==> DiDrDe.Model
- DiDrDe.ConsumerOne ==> DiDrDe.Contracts
- DiDrDe.ConsumerOne ==> DiDrDe.MessageBus
- DiDrDe.ConsumerOne ==> DiDrDe.Model
- DiDrDe.ConsumerTwo ==> DiDrDe.Contracts
- DiDrDe.ConsumerTwo ==> DiDrDe.MessageBus
- DiDrDe.ConsumerTwo ==> DiDrDe.Model
请注意 DiDrDe.MessageBus 对 DiDrDe.Model 一无所知,因为它是一个对任何消息类型都有效的通用项目。
为了实现这一点,我正在实施适配器模式,以便我的自定义接口 IEventDtoBus
(用于发布事件)和 IEventDtoHandler<TEventDto>
(用于使用事件)都是我的发布者和消费者所知道的。 MassTransit 包装器项目(称为 DiDrDe.MessageBus)使用由 IEventDtoBus
和 EventDtoHandlerAdapter<TEventDto>
组成的 EventDtoBusAdapter
实现适配器,作为我唯一的通用 IConsumer<TEventDto>
由 IEventDtoHandler<TEventDto>
我遇到的问题是 MassTransit 要求注册消费者的方式,因为我的消费者是通用消费者,MassTransit 包装器在编译时不应知道其类型。
我需要找到一种方法将 EventDtoHandlerAdapter<TEventDto>
注册为我在运行时传递的每个 TEventDto 类型的使用者(例如,作为类型的集合)。 请查看我的存储库以了解所有详细信息。
MassTransit 支持接受类型的重载方法(好!正是我想要的)但它还需要第二个参数Func<type, object> consumerFactory
,我不知道如何实现它。
更新 1:
问题 是我无法像这样注册这个通用消费者:
consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>();
因为我得到一个编译错误
Severity Code Description Project File Line Suppression State
Error CS0310 'EventDtoHandlerAdapter' must be a
non-abstract type with a public parameterless constructor in order to
use it as parameter 'TConsumer' in the generic type or method
'ConsumerExtensions.Consumer(IReceiveEndpointConfigurator,
Action>)' DiDrDe.MessageBus C:\src\DiDrDe.MessageBus\DiDrDe.MessageBus\IoCC\Autofac\RegistrationExtensions.cs
更新 2: 我已经尝试了几件事,并且已经更新了我的 repo 上的项目。这些是我在 MassTransit 包装器项目中的尝试。请注意,如果我向要处理的每条消息(事件)添加依赖项,我将如何让一切正常工作。但我不想那样……我不想让这个项目知道任何关于它可以处理的消息。要是我能注册只知道消息类型的消费者就好了..
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
//THIS WORKS
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));
// DOES NOT WORK
//var typeEventDtoHandler = typeof(IEventDtoHandler<>).MakeGenericType(typeof(ThingHappened));
//var eventDtoHandler = context.Resolve(typeEventDtoHandler);
//consumer.Consumer(eventDtoHandler);
// DOES NOT WORK
//consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);
// DOES NOT WORK
//var consumerGenericType = typeof(IConsumer<>);
//var consumerThingHappenedType = consumerGenericType.MakeGenericType(typeof(ThingHappened));
//consumer.Consumer(consumerThingHappenedType, null);
});
更新 3: 按照 Igor 的建议,我尝试执行以下操作:
var adapterType = typeof(EventDtoHandlerAdapter<>).MakeGenericType(typeof(ThingHappened));
consumer.Consumer(adapterType, (Type x) => context.Resolve(x));
但我收到一个运行时错误,提示
The requested service
'DiDrDe.MessageBus.EventDtoHandlerAdapter`1[[DiDrDe.Model.ThingHappened,
DiDrDe.Model, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null]]'
has not been registered. To avoid this exception, either register a
component to provide the service, check for service registration using
IsRegistered(), or use the ResolveOptional() method to resolve an
optional dependency.
我什至尝试将 EventDtoHandlerAdapter<>
单独注册为 IConsumer,以防出现问题但没有成功。
builder
.RegisterGeneric(typeof(EventDtoHandlerAdapter<>))
.As(typeof(IConsumer<>))
.SingleInstance();
还有:
builder
.RegisterType<EventDtoHandlerAdapter<ThingHappened>>()
.AsSelf();
它告诉我
System.ObjectDisposedException: 'This resolve operation has already
ended. When registering components using lambdas, the
IComponentContext 'c' parameter to the lambda cannot be stored.
Instead, either resolve IComponentContext again from 'c', or resolve a
Func<> based factory to create subsequent components from
澄清一下,我唯一需要注册的消费者是我的 EventDtoHandlerAdapter<TEventDto>
。它是通用的,因此基本上它将存在一个我支持的每个 TEventDto 的注册。问题是我不需要预先的类型,所以我需要使用类型进行操作。
更新 4: Igor 建议的新尝试。这次用的是"proxy"。我已经用所有细节的最后一次尝试更新了我的回购协议。
我有我的消费者和标记接口:
public interface IEventDtoHandler
{
}
public interface IEventDtoHandler<in TEventDto>
: IEventDtoHandler
where TEventDto : IEventDto
{
Task HandleAsync(TEventDto eventDto);
}
而且我自己实现了一个对 MassTransit 一无所知的消费者:
public class ThingHappenedHandler
: IEventDtoHandler<ThingHappened>
{
public Task HandleAsync(ThingHappened eventDto)
{
Console.WriteLine($"Received {eventDto.Name} " +
$"{eventDto.Description} at consumer one that uses an IEventDtoHandler");
return Task.CompletedTask;
}
}
现在我的 "wrapper consumer" 就是我所说的适配器,因为它知道 MassTransit(它实现了 MassTransit IConsumer
)。
public class EventDtoHandlerAdapter<TConsumer, TEventDto>
: IConsumer<TEventDto>
where TConsumer : IEventDtoHandler<TEventDto>
where TEventDto : class, IEventDto
{
private readonly TConsumer _consumer;
public EventDtoHandlerAdapter(TConsumer consumer)
{
_consumer = consumer;
}
public async Task Consume(ConsumeContext<TEventDto> context)
{
await _consumer.HandleAsync(context.Message);
}
}
现在最后一步是在 MassTransit 注册我的 "wrapper consumer"。但由于它是通用的,我不知道该怎么做。就是这个问题。
我可以按照建议在 Autofac 中扫描并注册我的所有消费者类型:
var interfaceType = typeof(IEventDtoHandler);
var consumerTypes =
AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(x => x.GetTypes())
.Where(x => interfaceType.IsAssignableFrom(x)
&& !x.IsInterface
&& !x.IsAbstract)
.ToList();
所以现在我有了所有的消费者类型(IEventDtoHandler
的所有实现,包括我的 ThingHappenedHandler
)。现在怎么办?如何注册?
类似下面的内容不起作用:
foreach (var consumerType in consumerTypes)
{
consumer.Consumer(consumerType, (Type x) => context.Resolve(x));
}
不过我估计没用也是正常的,因为我要注册的是我的EventDtoHandlerAdapter
,才是真正的IConsumer
.
所以,我想我没有理解你的建议。抱歉!
我需要的是这样的:
//THIS WORKS
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<IEventDtoHandler<ThingHappened>, ThingHappened>(eventDtoHandler));
但是没有使用 ThingHappened 模型,因为该模型不应该是已知的。这是我卡住的地方
更新 5: Chris Patterson 建议的新尝试(他的解决方案已合并到我的 repo 的 master 中),但问题仍然存在。
澄清一下,DiDrDe.MessageBus
必须不了解任何发布者、消费者和模型。它应该只依赖于 MassTransit 和 DiDrDe.Contracts
,Chris 的解决方案有这样一行:
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);
});
这直接依赖于 ThingHappened
模型。这是不允许的,它实际上与我已经拥有的解决方案没有太大区别:
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
//THIS works, but it uses ThingHappened explicitly and I don't want that dependency
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));
});
抱歉,如果这还不够清楚,但 DiDrDe.MessageBus 最终将成为一个在许多不同的消费者和发布者项目之间共享的 nuGet 包,它不应该依赖于任何特定的 message/model。
更新 6:
问题已经解决。非常感谢 Igor 和 Chris 的时间和帮助。
我已将解决方案推送到我的仓库中。
PS:不幸的是,当我在同一个消费者中有两个处理程序处理同一个事件时,此解决方案有其局限性,因为似乎只有一个处理程序被执行(两次)。我希望两个处理程序都被执行或只执行一个,但只执行一次(不是两次)。但这已经是另一个主题了:)
在此处查看自定义消费者约定:https://github.com/MassTransit/MassTransit/tree/develop/src/MassTransit.Tests/Conventional
创建您自己的 IMyConsumerInterface,IMyMessageInterface 将其插入该测试的代码中。
在创建总线之前注册它 ConsumerConvention.Register<CustomConsumerConvention>();
。应该可以。
此外,您可以围绕消费者上下文创建自己的包装器,并将其与消息一起传递。
LoadFrom
(MassTransit.AutofacIntegration) 不适用于我的自定义约定,所以我不得不手动注册消费者
foreach (var consumer in consumerTypes)
cfg.Consumer(consumer, (Type x) => _container.Resolve(x));
或者 如果您想使用 "proxy" 方法,请执行以下操作:
public class WrapperConsumer<TConsumer, TMessage> : IConsumer<TMessage>
where TMessage : class, IMyMessageInterface
where TConsumer : IMyConsumerInterface<TMessage>
{
private readonly TConsumer _consumer;
public WrapperConsumer(TConsumer consumer)
{
_consumer = consumer;
}
public Task Consume(ConsumeContext<TMessage> context)
{
return _consumer.Consume(context.Message);
}
}
...
// create wrapper registrations
cfg.Consumer(() => new WrapperConsumer<MyConsumer, MyMessage>(new MyConsumer()));
适用于这两种方法的附加代码
// marker interface
public interface IMyConsumerInterface
{
}
public interface IMyConsumerInterface<T> : IMyConsumerInterface
where T : IMyMessageInterface
{
Task Consume(T message);
}
...
builder.RegisterAssemblyTypes(ThisAssembly)
.AssignableTo<IMyConsumerInterface>()
.AsSelf()
.As<IMyConsumerInterface>();
...
var interfaceType = typeof(IMyConsumerInterface);
var consumerTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
.Where(x => interfaceType.IsAssignableFrom(x) && !x.IsInterface && !x.IsAbstract)
.ToList();
回复:更新 5
builder.Register(context =>
{
var ctx = context.Resolve<IComponentContext>();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
foreach (var adapterType in adapterTypes)
consumer.Consumer(adapterType, (Type type) => ctx .Resolve(adapterType));
});
});
return busControl;
})
我针对你的问题提交了一个对我有用的拉取请求,消费者开始时没有问题。
如果需要,您可以扩展它以包括所有消费者在他们自己的端点上的自动注册。
诀窍是使用正确的通用接口类型正确注册处理程序类型。有点打字技巧,但对我来说很管用。
我成功地将 MassTransit 用于一个愚蠢的示例应用程序,我在其中从 Publisher 控制台应用程序发布一条消息(一个事件),并在两个不同的消费者处接收到它,这两个消费者也是使用 RabbitMq 的控制台应用程序。
这是整个示例项目git repo: https://gitlab.com/DiegoDrivenDesign/DiDrDe.MessageBus
我想要一个包含 MassTransit 功能的项目,这样我的 Publisher 和 Consumers 项目对 MassTransit 一无所知。依赖关系应该朝这个方向发展:
- DiDrDe.MessageBus ==> 地铁
- DiDrDe.MessageBus ==> DiDrDe.Contracts
- DiDrDe.Model ==> DiDrDe.Contracts
- DiDrDe.Publisher ==> DiDrDe.MessageBus
- DiDrDe.Publisher ==> DiDrDe.Contracts
- DiDrDe.Publisher ==> DiDrDe.Model
- DiDrDe.ConsumerOne ==> DiDrDe.Contracts
- DiDrDe.ConsumerOne ==> DiDrDe.MessageBus
- DiDrDe.ConsumerOne ==> DiDrDe.Model
- DiDrDe.ConsumerTwo ==> DiDrDe.Contracts
- DiDrDe.ConsumerTwo ==> DiDrDe.MessageBus
- DiDrDe.ConsumerTwo ==> DiDrDe.Model
请注意 DiDrDe.MessageBus 对 DiDrDe.Model 一无所知,因为它是一个对任何消息类型都有效的通用项目。
为了实现这一点,我正在实施适配器模式,以便我的自定义接口 IEventDtoBus
(用于发布事件)和 IEventDtoHandler<TEventDto>
(用于使用事件)都是我的发布者和消费者所知道的。 MassTransit 包装器项目(称为 DiDrDe.MessageBus)使用由 IEventDtoBus
和 EventDtoHandlerAdapter<TEventDto>
组成的 EventDtoBusAdapter
实现适配器,作为我唯一的通用 IConsumer<TEventDto>
由 IEventDtoHandler<TEventDto>
我遇到的问题是 MassTransit 要求注册消费者的方式,因为我的消费者是通用消费者,MassTransit 包装器在编译时不应知道其类型。
我需要找到一种方法将 EventDtoHandlerAdapter<TEventDto>
注册为我在运行时传递的每个 TEventDto 类型的使用者(例如,作为类型的集合)。 请查看我的存储库以了解所有详细信息。
MassTransit 支持接受类型的重载方法(好!正是我想要的)但它还需要第二个参数Func<type, object> consumerFactory
,我不知道如何实现它。
更新 1: 问题 是我无法像这样注册这个通用消费者:
consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>();
因为我得到一个编译错误
Severity Code Description Project File Line Suppression State Error CS0310 'EventDtoHandlerAdapter' must be a non-abstract type with a public parameterless constructor in order to use it as parameter 'TConsumer' in the generic type or method 'ConsumerExtensions.Consumer(IReceiveEndpointConfigurator, Action>)' DiDrDe.MessageBus C:\src\DiDrDe.MessageBus\DiDrDe.MessageBus\IoCC\Autofac\RegistrationExtensions.cs
更新 2: 我已经尝试了几件事,并且已经更新了我的 repo 上的项目。这些是我在 MassTransit 包装器项目中的尝试。请注意,如果我向要处理的每条消息(事件)添加依赖项,我将如何让一切正常工作。但我不想那样……我不想让这个项目知道任何关于它可以处理的消息。要是我能注册只知道消息类型的消费者就好了..
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
//THIS WORKS
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));
// DOES NOT WORK
//var typeEventDtoHandler = typeof(IEventDtoHandler<>).MakeGenericType(typeof(ThingHappened));
//var eventDtoHandler = context.Resolve(typeEventDtoHandler);
//consumer.Consumer(eventDtoHandler);
// DOES NOT WORK
//consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);
// DOES NOT WORK
//var consumerGenericType = typeof(IConsumer<>);
//var consumerThingHappenedType = consumerGenericType.MakeGenericType(typeof(ThingHappened));
//consumer.Consumer(consumerThingHappenedType, null);
});
更新 3: 按照 Igor 的建议,我尝试执行以下操作:
var adapterType = typeof(EventDtoHandlerAdapter<>).MakeGenericType(typeof(ThingHappened));
consumer.Consumer(adapterType, (Type x) => context.Resolve(x));
但我收到一个运行时错误,提示
The requested service 'DiDrDe.MessageBus.EventDtoHandlerAdapter`1[[DiDrDe.Model.ThingHappened, DiDrDe.Model, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null]]' has not been registered. To avoid this exception, either register a component to provide the service, check for service registration using IsRegistered(), or use the ResolveOptional() method to resolve an optional dependency.
我什至尝试将 EventDtoHandlerAdapter<>
单独注册为 IConsumer,以防出现问题但没有成功。
builder
.RegisterGeneric(typeof(EventDtoHandlerAdapter<>))
.As(typeof(IConsumer<>))
.SingleInstance();
还有:
builder
.RegisterType<EventDtoHandlerAdapter<ThingHappened>>()
.AsSelf();
它告诉我
System.ObjectDisposedException: 'This resolve operation has already ended. When registering components using lambdas, the IComponentContext 'c' parameter to the lambda cannot be stored. Instead, either resolve IComponentContext again from 'c', or resolve a Func<> based factory to create subsequent components from
澄清一下,我唯一需要注册的消费者是我的 EventDtoHandlerAdapter<TEventDto>
。它是通用的,因此基本上它将存在一个我支持的每个 TEventDto 的注册。问题是我不需要预先的类型,所以我需要使用类型进行操作。
更新 4: Igor 建议的新尝试。这次用的是"proxy"。我已经用所有细节的最后一次尝试更新了我的回购协议。 我有我的消费者和标记接口:
public interface IEventDtoHandler
{
}
public interface IEventDtoHandler<in TEventDto>
: IEventDtoHandler
where TEventDto : IEventDto
{
Task HandleAsync(TEventDto eventDto);
}
而且我自己实现了一个对 MassTransit 一无所知的消费者:
public class ThingHappenedHandler
: IEventDtoHandler<ThingHappened>
{
public Task HandleAsync(ThingHappened eventDto)
{
Console.WriteLine($"Received {eventDto.Name} " +
$"{eventDto.Description} at consumer one that uses an IEventDtoHandler");
return Task.CompletedTask;
}
}
现在我的 "wrapper consumer" 就是我所说的适配器,因为它知道 MassTransit(它实现了 MassTransit IConsumer
)。
public class EventDtoHandlerAdapter<TConsumer, TEventDto>
: IConsumer<TEventDto>
where TConsumer : IEventDtoHandler<TEventDto>
where TEventDto : class, IEventDto
{
private readonly TConsumer _consumer;
public EventDtoHandlerAdapter(TConsumer consumer)
{
_consumer = consumer;
}
public async Task Consume(ConsumeContext<TEventDto> context)
{
await _consumer.HandleAsync(context.Message);
}
}
现在最后一步是在 MassTransit 注册我的 "wrapper consumer"。但由于它是通用的,我不知道该怎么做。就是这个问题。
我可以按照建议在 Autofac 中扫描并注册我的所有消费者类型:
var interfaceType = typeof(IEventDtoHandler);
var consumerTypes =
AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(x => x.GetTypes())
.Where(x => interfaceType.IsAssignableFrom(x)
&& !x.IsInterface
&& !x.IsAbstract)
.ToList();
所以现在我有了所有的消费者类型(IEventDtoHandler
的所有实现,包括我的 ThingHappenedHandler
)。现在怎么办?如何注册?
类似下面的内容不起作用:
foreach (var consumerType in consumerTypes)
{
consumer.Consumer(consumerType, (Type x) => context.Resolve(x));
}
不过我估计没用也是正常的,因为我要注册的是我的EventDtoHandlerAdapter
,才是真正的IConsumer
.
所以,我想我没有理解你的建议。抱歉!
我需要的是这样的:
//THIS WORKS
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<IEventDtoHandler<ThingHappened>, ThingHappened>(eventDtoHandler));
但是没有使用 ThingHappened 模型,因为该模型不应该是已知的。这是我卡住的地方
更新 5: Chris Patterson 建议的新尝试(他的解决方案已合并到我的 repo 的 master 中),但问题仍然存在。
澄清一下,DiDrDe.MessageBus
必须不了解任何发布者、消费者和模型。它应该只依赖于 MassTransit 和 DiDrDe.Contracts
,Chris 的解决方案有这样一行:
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);
});
这直接依赖于 ThingHappened
模型。这是不允许的,它实际上与我已经拥有的解决方案没有太大区别:
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
//THIS works, but it uses ThingHappened explicitly and I don't want that dependency
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));
});
抱歉,如果这还不够清楚,但 DiDrDe.MessageBus 最终将成为一个在许多不同的消费者和发布者项目之间共享的 nuGet 包,它不应该依赖于任何特定的 message/model。
更新 6: 问题已经解决。非常感谢 Igor 和 Chris 的时间和帮助。 我已将解决方案推送到我的仓库中。
PS:不幸的是,当我在同一个消费者中有两个处理程序处理同一个事件时,此解决方案有其局限性,因为似乎只有一个处理程序被执行(两次)。我希望两个处理程序都被执行或只执行一个,但只执行一次(不是两次)。但这已经是另一个主题了:)
在此处查看自定义消费者约定:https://github.com/MassTransit/MassTransit/tree/develop/src/MassTransit.Tests/Conventional
创建您自己的 IMyConsumerInterface,IMyMessageInterface 将其插入该测试的代码中。
在创建总线之前注册它 ConsumerConvention.Register<CustomConsumerConvention>();
。应该可以。
此外,您可以围绕消费者上下文创建自己的包装器,并将其与消息一起传递。
LoadFrom
(MassTransit.AutofacIntegration) 不适用于我的自定义约定,所以我不得不手动注册消费者
foreach (var consumer in consumerTypes)
cfg.Consumer(consumer, (Type x) => _container.Resolve(x));
或者 如果您想使用 "proxy" 方法,请执行以下操作:
public class WrapperConsumer<TConsumer, TMessage> : IConsumer<TMessage>
where TMessage : class, IMyMessageInterface
where TConsumer : IMyConsumerInterface<TMessage>
{
private readonly TConsumer _consumer;
public WrapperConsumer(TConsumer consumer)
{
_consumer = consumer;
}
public Task Consume(ConsumeContext<TMessage> context)
{
return _consumer.Consume(context.Message);
}
}
...
// create wrapper registrations
cfg.Consumer(() => new WrapperConsumer<MyConsumer, MyMessage>(new MyConsumer()));
适用于这两种方法的附加代码
// marker interface
public interface IMyConsumerInterface
{
}
public interface IMyConsumerInterface<T> : IMyConsumerInterface
where T : IMyMessageInterface
{
Task Consume(T message);
}
...
builder.RegisterAssemblyTypes(ThisAssembly)
.AssignableTo<IMyConsumerInterface>()
.AsSelf()
.As<IMyConsumerInterface>();
...
var interfaceType = typeof(IMyConsumerInterface);
var consumerTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
.Where(x => interfaceType.IsAssignableFrom(x) && !x.IsInterface && !x.IsAbstract)
.ToList();
回复:更新 5
builder.Register(context =>
{
var ctx = context.Resolve<IComponentContext>();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
foreach (var adapterType in adapterTypes)
consumer.Consumer(adapterType, (Type type) => ctx .Resolve(adapterType));
});
});
return busControl;
})
我针对你的问题提交了一个对我有用的拉取请求,消费者开始时没有问题。
如果需要,您可以扩展它以包括所有消费者在他们自己的端点上的自动注册。
诀窍是使用正确的通用接口类型正确注册处理程序类型。有点打字技巧,但对我来说很管用。