ASB MessageReceiver ReceiveAsync 崩溃
ASB MessageReceiver ReceiveAsync crashes
环境
- Windows 10 专业
- .NET Core 控制台应用程序
代码
我有一个抽象的消息接收器,看起来像这样。在此代码中,entity
是 Subscription
的名称(例如 user
)。
public class AzureMessageReceiver : ITdlMessageReceiver
{
private readonly ServiceBusConnection serviceBusConnection;
private readonly ILogger<AzureMessageReceiver> logger;
public AzureMessageReceiver(ServiceBusConnection serviceBusConnection, ILogger<AzureMessageReceiver> logger)
{
this.serviceBusConnection = serviceBusConnection;
this.logger = logger;
}
public async Task<TdlMessage<T>> ReceiveAsync<T>(string topic, string entity) where T : class
{
try
{
var subscriptionPath = EntityNameHelper.FormatSubscriptionPath(topic, entity);
var messageReceiver = new MessageReceiver(serviceBusConnection, subscriptionPath, ReceiveMode.ReceiveAndDelete);
var message = await messageReceiver.ReceiveAsync();
if (message == null)
{
return null;
}
var messageString = Encoding.UTF8.GetString(message.Body);
return JsonConvert.DeserializeObject<TdlMessage<T>>(messageString);
}
catch (Exception ex)
{
logger.LogError(ex, "Error receiving Azure message.");
return null;
}
}
}
注入的ServiceBusConnection
是这样构造的。 注意: 相同的连接初始化 工作 以 将消息 写入相同的 Topic
和 Subscription
.
services.AddSingleton(serviceProvider =>
new ServiceBusConnection(configuration[$"{DurableCommunicationKey}:AzureConnectionString"]));
更新: 这里是包装对接收者的调用的代码 class 并且是接收消息的控制器:
static async void Receive(ITdlMessageReceiver receiver, ILogger logger)
{
while (true)
{
var message = await receiver.ReceiveAsync<TdlMessage<object>>(topic, entity);
if (message != null)
{
logger.LogDebug($"Message received. Topic: {topic}. Action: {Enum.GetName(typeof(TopicActions), message.Action)}. Message: {JsonConvert.SerializeObject(message)}.");
}
Thread.Sleep(sleepTime);
}
}
问题
每次我执行这一行时 var message = await messageReceiver.ReceiveAsync();
它只会让控制台应用程序崩溃。没有 Exception
并且 Event Viewer
.
中没有任何内容
我试过的
- 使用来自 ASB
的 Secondary Connection String
- 提供像
messageReceiver.ReceiveAsync(TimeSpan.FromMinutes(1));
这样的超时
- 将注入的
topic
从仅 主题名称 更改为整个 URL 主题(例如 https://{...}.servicebus.windows.net/{topicName}
)
- 将
ReceiveMode
更改为 PeekLock
- 正在
ConfigureAwait(false)
加入 ReceiveAsync
通话。
- 将超时更改为
TimeSpan.Zero
。 注意:这确实 不会 使应用程序崩溃,但实际上会抛出一个 Exception
并被记录。
async void
应转换为 async Task
,并且您应该等待 Task.Delay
而不是调用 Thread.Sleep
。如果要异步,你需要一直异步
static async Task Receive(ITdlMessageReceiver receiver, ILogger logger) {
while (true) {
var message = await receiver.ReceiveAsync<TdlMessage<object>>(topic, entity);
if (message != null) {
logger.LogDebug($"Message received. Topic: {topic}. Action: {Enum.GetName(typeof(TopicActions), message.Action)}. Message: {JsonConvert.SerializeObject(message)}.");
}
await Task.Delay(sleepTime);
}
}
尝试使代码一直异步,是的,但是作为控制台应用程序(单线程),您将被允许在 [=18= 中的 Receive
方法上调用 Wait()
] 因为它没有混合会导致异步流出现问题的调用。
public static void Main(string[] args) {
//...
//...
//...
Receive(receiver, logger).Wait();
}
环境
- Windows 10 专业
- .NET Core 控制台应用程序
代码
我有一个抽象的消息接收器,看起来像这样。在此代码中,entity
是 Subscription
的名称(例如 user
)。
public class AzureMessageReceiver : ITdlMessageReceiver
{
private readonly ServiceBusConnection serviceBusConnection;
private readonly ILogger<AzureMessageReceiver> logger;
public AzureMessageReceiver(ServiceBusConnection serviceBusConnection, ILogger<AzureMessageReceiver> logger)
{
this.serviceBusConnection = serviceBusConnection;
this.logger = logger;
}
public async Task<TdlMessage<T>> ReceiveAsync<T>(string topic, string entity) where T : class
{
try
{
var subscriptionPath = EntityNameHelper.FormatSubscriptionPath(topic, entity);
var messageReceiver = new MessageReceiver(serviceBusConnection, subscriptionPath, ReceiveMode.ReceiveAndDelete);
var message = await messageReceiver.ReceiveAsync();
if (message == null)
{
return null;
}
var messageString = Encoding.UTF8.GetString(message.Body);
return JsonConvert.DeserializeObject<TdlMessage<T>>(messageString);
}
catch (Exception ex)
{
logger.LogError(ex, "Error receiving Azure message.");
return null;
}
}
}
注入的ServiceBusConnection
是这样构造的。 注意: 相同的连接初始化 工作 以 将消息 写入相同的 Topic
和 Subscription
.
services.AddSingleton(serviceProvider =>
new ServiceBusConnection(configuration[$"{DurableCommunicationKey}:AzureConnectionString"]));
更新: 这里是包装对接收者的调用的代码 class 并且是接收消息的控制器:
static async void Receive(ITdlMessageReceiver receiver, ILogger logger)
{
while (true)
{
var message = await receiver.ReceiveAsync<TdlMessage<object>>(topic, entity);
if (message != null)
{
logger.LogDebug($"Message received. Topic: {topic}. Action: {Enum.GetName(typeof(TopicActions), message.Action)}. Message: {JsonConvert.SerializeObject(message)}.");
}
Thread.Sleep(sleepTime);
}
}
问题
每次我执行这一行时 var message = await messageReceiver.ReceiveAsync();
它只会让控制台应用程序崩溃。没有 Exception
并且 Event Viewer
.
我试过的
- 使用来自 ASB 的
- 提供像
messageReceiver.ReceiveAsync(TimeSpan.FromMinutes(1));
这样的超时
- 将注入的
topic
从仅 主题名称 更改为整个 URL 主题(例如https://{...}.servicebus.windows.net/{topicName}
) - 将
ReceiveMode
更改为PeekLock
- 正在
ConfigureAwait(false)
加入ReceiveAsync
通话。 - 将超时更改为
TimeSpan.Zero
。 注意:这确实 不会 使应用程序崩溃,但实际上会抛出一个Exception
并被记录。
Secondary Connection String
async void
应转换为 async Task
,并且您应该等待 Task.Delay
而不是调用 Thread.Sleep
。如果要异步,你需要一直异步
static async Task Receive(ITdlMessageReceiver receiver, ILogger logger) {
while (true) {
var message = await receiver.ReceiveAsync<TdlMessage<object>>(topic, entity);
if (message != null) {
logger.LogDebug($"Message received. Topic: {topic}. Action: {Enum.GetName(typeof(TopicActions), message.Action)}. Message: {JsonConvert.SerializeObject(message)}.");
}
await Task.Delay(sleepTime);
}
}
尝试使代码一直异步,是的,但是作为控制台应用程序(单线程),您将被允许在 [=18= 中的 Receive
方法上调用 Wait()
] 因为它没有混合会导致异步流出现问题的调用。
public static void Main(string[] args) {
//...
//...
//...
Receive(receiver, logger).Wait();
}