如何指定要与 MassTransit 一起使用的 Azure 服务总线主题
How to specify which Azure Service Bus Topic to use with MassTransit
我尝试使用 MassTransit 将消息发布到 Azure 服务总线中名为 events
的主题。我在配置 MassTransit 以使用我的预定义主题 events
时遇到问题,而是创建了一个由消息类型 namespace/classname 命名的新主题。所以我想知道如何指定使用哪个主题而不是创建一个新主题。
这是我测试过的代码:
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransit.AzureServiceBusTransport;
using Microsoft.ServiceBus;
namespace PublisherNameSpace
{
public class Publisher
{
public static async Task PublishMessage()
{
var topic = "events";
var bus = Bus.Factory.CreateUsingAzureServiceBus(
cfg =>
{
var azureServiceBusHost = cfg.Host(new Uri("sb://<busname>.servicebus.windows.net"), host =>
{
host.OperationTimeout = TimeSpan.FromSeconds(5);
host.TokenProvider =
TokenProvider.CreateSharedAccessSignatureTokenProvider(
"RootManageSharedAccessKey",
"<key>"
);
});
cfg.ReceiveEndpoint(azureServiceBusHost, topic, e =>
{
e.Consumer<TestConsumer>();
});
});
await bus.Publish<TestConsumer>(new TestMessage { TestString = "testing" });
}
}
public class TestConsumer : IConsumer<TestMessage>
{
public Task Consume(ConsumeContext<TestMessage> context)
{
return Console.Out.WriteAsync("Consuming message");
}
}
public class TestMessage
{
public string TestString { get; set; }
}
}
如果要从特定主题消费,请创建订阅端点而不是接收端点,并在配置中指定主题和订阅名称。
单元测试中显示了最简单的形式:
接受的答案清除订阅端:
cfg.SubscriptionEndpoint(
host,
"sub-1",
"my-topic-1",
e =>
{
e.ConfigureConsumer<TestConsumer>(provider);
});
对于那些想知道如何在发布端正确获取总线配置的人,它应该如下所示:
cfg.Message<TestMessage>(x =>
{
x.SetEntityName("my-topic-1");
});
然后就可以在总线上调用publish了:
await bus.Publish<TestMessage>(message);
感谢@ChrisPatterson pointing this out to me!
我能够使用 _sendEndpointProvider.GetSendEndpoint(new Uri("topic:shape")); 发送到 Azure 服务总线主题,其中..." shape" 是主题名称。
public class MassTransitController : ControllerBase
{
private readonly ILogger<MassTransitController> _logger;
private readonly ISendEndpointProvider _sendEndpointProvider;
public MassTransitController(ILogger<MassTransitController> logger, ISendEndpointProvider sendEndpointProvider)
{
_logger = logger;
_sendEndpointProvider = sendEndpointProvider;
}
[HttpGet]
public async Task<IActionResult> Get()
{
try
{
var randomType = new Random();
var randomColor = new Random();
var shape = new Shape();
shape.ShapeId = Guid.NewGuid();
shape.Color = ShapeType.ShapeColors[randomColor.Next(ShapeType.ShapeColors.Count)];
shape.Type = ShapeType.ShapeTypes[randomType.Next(ShapeType.ShapeTypes.Count)];
var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("topic:shape"));
await endpoint.Send(shape);
return Ok(shape);
}
catch (Exception ex)
{
throw ex;
}
}
}
我还能够让 .NET 5 Worker Consumer 使用这样的代码...其中订阅“sub-all”会捕获所有形状...我将创建一个博客 post / git 这个的回购。
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host("Endpoint=sb://******");
cfg.SubscriptionEndpoint(
"sub-all",
"shape",
e =>
{
e.Handler<Shape>(async context =>
{
await Console.Out.WriteLineAsync($"Shape Received: {context.Message.Type}");
});
e.MaxDeliveryCount = 15;
});
});
});
services.AddMassTransitHostedService();
});
我尝试使用 MassTransit 将消息发布到 Azure 服务总线中名为 events
的主题。我在配置 MassTransit 以使用我的预定义主题 events
时遇到问题,而是创建了一个由消息类型 namespace/classname 命名的新主题。所以我想知道如何指定使用哪个主题而不是创建一个新主题。
这是我测试过的代码:
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransit.AzureServiceBusTransport;
using Microsoft.ServiceBus;
namespace PublisherNameSpace
{
public class Publisher
{
public static async Task PublishMessage()
{
var topic = "events";
var bus = Bus.Factory.CreateUsingAzureServiceBus(
cfg =>
{
var azureServiceBusHost = cfg.Host(new Uri("sb://<busname>.servicebus.windows.net"), host =>
{
host.OperationTimeout = TimeSpan.FromSeconds(5);
host.TokenProvider =
TokenProvider.CreateSharedAccessSignatureTokenProvider(
"RootManageSharedAccessKey",
"<key>"
);
});
cfg.ReceiveEndpoint(azureServiceBusHost, topic, e =>
{
e.Consumer<TestConsumer>();
});
});
await bus.Publish<TestConsumer>(new TestMessage { TestString = "testing" });
}
}
public class TestConsumer : IConsumer<TestMessage>
{
public Task Consume(ConsumeContext<TestMessage> context)
{
return Console.Out.WriteAsync("Consuming message");
}
}
public class TestMessage
{
public string TestString { get; set; }
}
}
如果要从特定主题消费,请创建订阅端点而不是接收端点,并在配置中指定主题和订阅名称。
单元测试中显示了最简单的形式:
接受的答案清除订阅端:
cfg.SubscriptionEndpoint(
host,
"sub-1",
"my-topic-1",
e =>
{
e.ConfigureConsumer<TestConsumer>(provider);
});
对于那些想知道如何在发布端正确获取总线配置的人,它应该如下所示:
cfg.Message<TestMessage>(x =>
{
x.SetEntityName("my-topic-1");
});
然后就可以在总线上调用publish了:
await bus.Publish<TestMessage>(message);
感谢@ChrisPatterson pointing this out to me!
我能够使用 _sendEndpointProvider.GetSendEndpoint(new Uri("topic:shape")); 发送到 Azure 服务总线主题,其中..." shape" 是主题名称。
public class MassTransitController : ControllerBase
{
private readonly ILogger<MassTransitController> _logger;
private readonly ISendEndpointProvider _sendEndpointProvider;
public MassTransitController(ILogger<MassTransitController> logger, ISendEndpointProvider sendEndpointProvider)
{
_logger = logger;
_sendEndpointProvider = sendEndpointProvider;
}
[HttpGet]
public async Task<IActionResult> Get()
{
try
{
var randomType = new Random();
var randomColor = new Random();
var shape = new Shape();
shape.ShapeId = Guid.NewGuid();
shape.Color = ShapeType.ShapeColors[randomColor.Next(ShapeType.ShapeColors.Count)];
shape.Type = ShapeType.ShapeTypes[randomType.Next(ShapeType.ShapeTypes.Count)];
var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("topic:shape"));
await endpoint.Send(shape);
return Ok(shape);
}
catch (Exception ex)
{
throw ex;
}
}
}
我还能够让 .NET 5 Worker Consumer 使用这样的代码...其中订阅“sub-all”会捕获所有形状...我将创建一个博客 post / git 这个的回购。
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host("Endpoint=sb://******");
cfg.SubscriptionEndpoint(
"sub-all",
"shape",
e =>
{
e.Handler<Shape>(async context =>
{
await Console.Out.WriteLineAsync($"Shape Received: {context.Message.Type}");
});
e.MaxDeliveryCount = 15;
});
});
});
services.AddMassTransitHostedService();
});