Azure 服务总线代理消息泵,等待多种消息类型
Azure Service Bus Brokered Message pump, awaiting multiple message types
我有一个如下所示的消息泵:
public class MessagePump<T> where T : class
{
public async Task Run( string inQueue, IMessageProcessor<T> processor, CancellationToken cancellationToken)
{
var connectionString = Settings.ServiceBusConnectionString;
var factory = MessagingFactory.CreateFromConnectionString(connectionString);
var client = factory.CreateQueueClient(inQueue);
var msgOptions = new OnMessageOptions
{
AutoComplete = false,
MaxConcurrentCalls = 5,
AutoRenewTimeout = TimeSpan.FromMinutes(10)
};
await Task.Run(() =>
client.OnMessageAsync(
async message =>
{
await OnReceived(message, processor);
await message.CompleteAsync();
}, msgOptions),
cancellationToken);
}
static async Task OnReceived(BrokeredMessage brokeredMessage, IMessageProcessor<T> processor)
{
var message = brokeredMessage.GetBody<T>();
try
{
await processor.Process(message);
}
catch (Exception e)
{
var er = new ErrorLog();
await er.Create(new {error = e.Message});
}
}
}
它是从我在 Azure Service Fabric 中的无状态服务调用的
internal sealed class Core : StatelessService
{
public Core(StatelessServiceContext context)
: base(context)
{ }
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
return new ServiceInstanceListener[0];
}
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var inQueue = Settings.CoreQueue;
await new MessagePump<Message>().Run(inQueue, new Processor(), cancellationToken);
}
}
如何创建处理多种消息的消息泵?
几个选项:
将所有类型编码为一种容器类型的一部分(例如,将它们全部派生自一个基 class,或将它们全部放入容器类型的属性中)。始终反序列化此容器,然后根据其内容进行处理。
将消息的类型放入元数据中(例如BrokeredMessage
的ContentType
属性。读取此元数据后反序列化为特定类型。
调用 GetBody
只是 returns 一个字节数组,并自己处理反序列化。如果您不控制发送方,这可能是唯一的选择。
我有一个如下所示的消息泵:
public class MessagePump<T> where T : class
{
public async Task Run( string inQueue, IMessageProcessor<T> processor, CancellationToken cancellationToken)
{
var connectionString = Settings.ServiceBusConnectionString;
var factory = MessagingFactory.CreateFromConnectionString(connectionString);
var client = factory.CreateQueueClient(inQueue);
var msgOptions = new OnMessageOptions
{
AutoComplete = false,
MaxConcurrentCalls = 5,
AutoRenewTimeout = TimeSpan.FromMinutes(10)
};
await Task.Run(() =>
client.OnMessageAsync(
async message =>
{
await OnReceived(message, processor);
await message.CompleteAsync();
}, msgOptions),
cancellationToken);
}
static async Task OnReceived(BrokeredMessage brokeredMessage, IMessageProcessor<T> processor)
{
var message = brokeredMessage.GetBody<T>();
try
{
await processor.Process(message);
}
catch (Exception e)
{
var er = new ErrorLog();
await er.Create(new {error = e.Message});
}
}
}
它是从我在 Azure Service Fabric 中的无状态服务调用的
internal sealed class Core : StatelessService
{
public Core(StatelessServiceContext context)
: base(context)
{ }
protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
{
return new ServiceInstanceListener[0];
}
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var inQueue = Settings.CoreQueue;
await new MessagePump<Message>().Run(inQueue, new Processor(), cancellationToken);
}
}
如何创建处理多种消息的消息泵?
几个选项:
将所有类型编码为一种容器类型的一部分(例如,将它们全部派生自一个基 class,或将它们全部放入容器类型的属性中)。始终反序列化此容器,然后根据其内容进行处理。
将消息的类型放入元数据中(例如
BrokeredMessage
的ContentType
属性。读取此元数据后反序列化为特定类型。调用
GetBody
只是 returns 一个字节数组,并自己处理反序列化。如果您不控制发送方,这可能是唯一的选择。