收件人未通过公共交通接收消息 - 订阅
Receiver not picking up message with Mass Transit - Subscription
我正在使用下面的代码将 MassTransit 设置为使用 ServiceBus
private static ServiceProvider SetupServiceCollection()
{
var connectionString = ConfigurationManager.AppSettings["AzureServiceBusConnectionString"];
var services = new ServiceCollection()
.AddMassTransit(x =>
{
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(connectionString);
cfg.ConfigureEndpoints(context);
cfg.Message<MyMessage>(x =>
{
x.SetEntityName("my-topic");
});
});
});
return services.BuildServiceProvider();
}
我使用下面的代码发送消息
var message = new MyMessage()
{
MessageIdentifier = Guid.NewGuid().ToString(),
};
await _busControl.Publish(message);
我希望我的消息只发送到我的主题
但是,MassTransit 正在创建主题,名称似乎是使用类型名称生成的。我该如何完全阻止它?
我正在如下设置接收器
public static void SetupMassTransit(this ServiceCollection services, string connectionString)
{
services.AddMassTransit(x =>
{
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(connectionString);
cfg.ConfigureEndpoints(context);
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(connectionString);
cfg.SubscriptionEndpoint<MyMessage>("low", e =>
{
e.Consumer<MyMessageConsumer>(context);
e.PrefetchCount = 100;
e.MaxConcurrentCalls = 100;
e.LockDuration = TimeSpan.FromMinutes(5);
e.MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
e.UseMessageRetry(r => r.Intervals(100, 200, 500, 800, 1000));
e.UseInMemoryOutbox();
e.ConfigureConsumeTopology = false;
});
});
});
}
我可以看到消息已正确发送,如服务总线资源管理器中的订阅内所示。但是,接收者不接收它?没有错误或任何事情要继续吗?真令人沮丧
保罗
您正在调用 ConfigureEndpoints
,它将默认为已添加的消费者、sagas 等创建接收端点。但是,您的代码示例未显示任何 .AddConsumer
方法。如果您没有任何消费者,请不要调用 ConfigureEndpoints
.
对于你的接收器,你应该使用:
public static void SetupMassTransit(this ServiceCollection services, string connectionString)
{
services.AddMassTransit(x =>
{
x.AddConsumer<MyMessageConsumer>();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(connectionString);
cfg.SubscriptionEndpoint("your-topic-name", "your-subscription-name", e =>
{
e.PrefetchCount = 100;
e.MaxConcurrentCalls = 100;
e.LockDuration = TimeSpan.FromMinutes(5);
e.MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
e.UseMessageRetry(r => r.Intervals(100, 200, 500, 800, 1000));
e.UseInMemoryOutbox();
e.ConfigureConsumer<MyMessageConsumer>(context);
});
});
});
}
对于您的制作人,您可以简单地使用:
private static ServiceProvider SetupServiceCollection()
{
var connectionString = ConfigurationManager.AppSettings["AzureServiceBusConnectionString"];
var services = new ServiceCollection()
.AddMassTransit(x =>
{
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(connectionString);
});
});
return services.BuildServiceProvider();
}
然后,使用上面创建的 IServiceProvider
进行发布:
var bus = serviceProvider.GetRequiredService<IBus>();
var endpoint = await bus.GetSendEndpoint(new Uri("topic:your-topic-name"));
await endpoint.Send(new MyMessage());
这应该是您需要的绝对最低要求。
我正在使用下面的代码将 MassTransit 设置为使用 ServiceBus
private static ServiceProvider SetupServiceCollection()
{
var connectionString = ConfigurationManager.AppSettings["AzureServiceBusConnectionString"];
var services = new ServiceCollection()
.AddMassTransit(x =>
{
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(connectionString);
cfg.ConfigureEndpoints(context);
cfg.Message<MyMessage>(x =>
{
x.SetEntityName("my-topic");
});
});
});
return services.BuildServiceProvider();
}
我使用下面的代码发送消息
var message = new MyMessage()
{
MessageIdentifier = Guid.NewGuid().ToString(),
};
await _busControl.Publish(message);
我希望我的消息只发送到我的主题
但是,MassTransit 正在创建主题,名称似乎是使用类型名称生成的。我该如何完全阻止它?
我正在如下设置接收器
public static void SetupMassTransit(this ServiceCollection services, string connectionString)
{
services.AddMassTransit(x =>
{
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(connectionString);
cfg.ConfigureEndpoints(context);
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(connectionString);
cfg.SubscriptionEndpoint<MyMessage>("low", e =>
{
e.Consumer<MyMessageConsumer>(context);
e.PrefetchCount = 100;
e.MaxConcurrentCalls = 100;
e.LockDuration = TimeSpan.FromMinutes(5);
e.MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
e.UseMessageRetry(r => r.Intervals(100, 200, 500, 800, 1000));
e.UseInMemoryOutbox();
e.ConfigureConsumeTopology = false;
});
});
});
}
我可以看到消息已正确发送,如服务总线资源管理器中的订阅内所示。但是,接收者不接收它?没有错误或任何事情要继续吗?真令人沮丧
保罗
您正在调用 ConfigureEndpoints
,它将默认为已添加的消费者、sagas 等创建接收端点。但是,您的代码示例未显示任何 .AddConsumer
方法。如果您没有任何消费者,请不要调用 ConfigureEndpoints
.
对于你的接收器,你应该使用:
public static void SetupMassTransit(this ServiceCollection services, string connectionString)
{
services.AddMassTransit(x =>
{
x.AddConsumer<MyMessageConsumer>();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(connectionString);
cfg.SubscriptionEndpoint("your-topic-name", "your-subscription-name", e =>
{
e.PrefetchCount = 100;
e.MaxConcurrentCalls = 100;
e.LockDuration = TimeSpan.FromMinutes(5);
e.MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
e.UseMessageRetry(r => r.Intervals(100, 200, 500, 800, 1000));
e.UseInMemoryOutbox();
e.ConfigureConsumer<MyMessageConsumer>(context);
});
});
});
}
对于您的制作人,您可以简单地使用:
private static ServiceProvider SetupServiceCollection()
{
var connectionString = ConfigurationManager.AppSettings["AzureServiceBusConnectionString"];
var services = new ServiceCollection()
.AddMassTransit(x =>
{
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(connectionString);
});
});
return services.BuildServiceProvider();
}
然后,使用上面创建的 IServiceProvider
进行发布:
var bus = serviceProvider.GetRequiredService<IBus>();
var endpoint = await bus.GetSendEndpoint(new Uri("topic:your-topic-name"));
await endpoint.Send(new MyMessage());
这应该是您需要的绝对最低要求。