我如何调用 rabbitMq 从 asp.net Webapi 获取消息?
How can i call rabbitMq get message from asp.net Webapi?
我的问题很简单。我想使用 rabbitmq 通过 asp.net webapi 来制作消息队列。另一方面,GetAllQueues 不返回任何消息。消息始终为空。下面的代码在控制台应用程序中是完美的,但是当我 运行 below post 时消息总是空的:
http://localhost:53301/api/CustomerPipline/?queueName=test123
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Web.Http;
using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;
using System.Diagnostics;
using RabbitMQ.Client.Exceptions;
namespace Atom.Mqpipline.Controllers
{
public class CustomerPiplineController : ApiController
{
private static readonly string _queueName = "test123";
[HttpPost]
public HttpResponseMessage AddQueue(int customerId)
{
var publisher = new Publisher(_queueName, "Hello RabbitMQ World!");
var resp = Request.CreateResponse(HttpStatusCode.OK, JsonConvert.SerializeObject("OK"));
return resp;
}
[HttpGet]
public HttpResponseMessage GetAllQueues(string queueName)
{
var consumer = new Consumer(queueName);
var resp = Request.CreateResponse(HttpStatusCode.OK, JsonConvert.SerializeObject(consumer.Message));
return resp;
}
}
}
public class RabbitMQService
{
private readonly string _hostName = "localhost";
public IConnection GetRabbitMQConnection()
{
ConnectionFactory connectionFactory = new ConnectionFactory()
{
HostName = _hostName
};
return connectionFactory.CreateConnection();
}
}
public class Publisher
{
private readonly RabbitMQService _rabbitMQService;
public Publisher(string queueName, string message)
{
_rabbitMQService = new RabbitMQService();
using (var connection = _rabbitMQService.GetRabbitMQConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queueName, false, false, false, null);
channel.BasicPublish("", queueName, null, Encoding.UTF8.GetBytes(message));
}
}
}
}
public class Consumer
{
private readonly RabbitMQService _rabbitMQService;
public string Message { get; set; }
public Consumer(string queueName)
{
_rabbitMQService = new RabbitMQService();
using (var connection = _rabbitMQService.GetRabbitMQConnection())
{
using (var channel = connection.CreateModel())
{
var consumer = new EventingBasicConsumer(channel);
// Received event'i sürekli listen modunda olacaktır.
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Debug.WriteLine(message);
Message = message;
};
channel.BasicConsume(queueName, true, consumer);
}
}
}
}
你不能打电话给消费者。消费者操作在收到消息时运行,而不是在您调用时运行。
- 在您的应用中只创建一个 Consumer 实例..
- 添加队列接收消息。或通过 SignalR.
直接向客户端推送消息
我认为您不能在回发或休息端点中使用消息。
但是您可以在项目启动时启动一个接收队列的任务。
public void Configure(IApplicationBuilder app)
{
[...]
Task.Run(GetAllQueues(queueName));
}
public void GetAllQueues(string queueName)
{
var consumer = new Consumer(queueName);
var resp = Request.CreateResponse(HttpStatusCode.OK, JsonConvert.SerializeObject(consumer.Message));
return resp;
}
http://localhost:53301/api/CustomerPipline/?queueName=test123
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Web.Http;
using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;
using System.Diagnostics;
using RabbitMQ.Client.Exceptions;
namespace Atom.Mqpipline.Controllers
{
public class CustomerPiplineController : ApiController
{
private static readonly string _queueName = "test123";
[HttpPost]
public HttpResponseMessage AddQueue(int customerId)
{
var publisher = new Publisher(_queueName, "Hello RabbitMQ World!");
var resp = Request.CreateResponse(HttpStatusCode.OK, JsonConvert.SerializeObject("OK"));
return resp;
}
[HttpGet]
public HttpResponseMessage GetAllQueues(string queueName)
{
var consumer = new Consumer(queueName);
var resp = Request.CreateResponse(HttpStatusCode.OK, JsonConvert.SerializeObject(consumer.Message));
return resp;
}
}
}
public class RabbitMQService
{
private readonly string _hostName = "localhost";
public IConnection GetRabbitMQConnection()
{
ConnectionFactory connectionFactory = new ConnectionFactory()
{
HostName = _hostName
};
return connectionFactory.CreateConnection();
}
}
public class Publisher
{
private readonly RabbitMQService _rabbitMQService;
public Publisher(string queueName, string message)
{
_rabbitMQService = new RabbitMQService();
using (var connection = _rabbitMQService.GetRabbitMQConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queueName, false, false, false, null);
channel.BasicPublish("", queueName, null, Encoding.UTF8.GetBytes(message));
}
}
}
}
public class Consumer
{
private readonly RabbitMQService _rabbitMQService;
public string Message { get; set; }
public Consumer(string queueName)
{
_rabbitMQService = new RabbitMQService();
using (var connection = _rabbitMQService.GetRabbitMQConnection())
{
using (var channel = connection.CreateModel())
{
var consumer = new EventingBasicConsumer(channel);
// Received event'i sürekli listen modunda olacaktır.
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Debug.WriteLine(message);
Message = message;
};
channel.BasicConsume(queueName, true, consumer);
}
}
}
}
你不能打电话给消费者。消费者操作在收到消息时运行,而不是在您调用时运行。
- 在您的应用中只创建一个 Consumer 实例..
- 添加队列接收消息。或通过 SignalR. 直接向客户端推送消息
我认为您不能在回发或休息端点中使用消息。 但是您可以在项目启动时启动一个接收队列的任务。
public void Configure(IApplicationBuilder app)
{
[...]
Task.Run(GetAllQueues(queueName));
}
public void GetAllQueues(string queueName)
{
var consumer = new Consumer(queueName);
var resp = Request.CreateResponse(HttpStatusCode.OK, JsonConvert.SerializeObject(consumer.Message));
return resp;
}