让工作队列和 RPC 协同工作
Getting work queue and RPC to work together
我开始使用 RabbitMQ,在学习了教程之后,我现在正试图让它按照我需要的方式工作,但我 运行 遇到了困难。我的设置是我需要能够先创建一个 RPC,然后根据客户端的响应将(或不会)发送另一条消息到工作队列(我不需要响应客户端)。不幸的是,我为使它一起工作所做的努力似乎并没有按照我想要的方式进行。在服务器端,我有这样的事情(我尝试了很多变体,但都遇到了同样的问题):
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "jobs", type: "direct", durable: true);
// I started with a named queue, not sure if that's better or worse for this
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "jobs",
routingKey: "saveJob_queue");
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
// save stuff that was sent with the saveJob_queue routingKey
}
channel.BasicConsume(queue: queueName,
noAck: false,
consumer: consumer);
// set up channel for RPC
// Not sure if this has to have another channel, but it wasn't working on the same channel either
rpcChannel = connection.CreateModel();
var rpcQueueName = rpcChannel.QueueDeclare().QueueName;
rpcChannel.QueueBind(queue: rpcQueueName,
exchange: "jobs",
routingKey: "rpc_CheckJob_queue");
var rpcConsumer = new EventingBasicConsumer(rpcChannel);
rpcConsumer.Received += (model, ea) =>
{
// do my remote call and send back a response
}
我遇到的问题是,尽管事实上它应该只接收 saveJob_queue
条路线。我可以在该处理程序中检查 ea.RoutingKey
并忽略这些消息,但我不明白它们是如何以及为什么最终出现在那里的?
设置连接以便它可以接收工作队列消息和 RPC 消息并正确处理它们的正确方法是什么?
由于您没有为队列指定名称,我怀疑您得到了两次相同的队列。所以我认为正在发生的事情本质上是这样的。
作业 --> saveJob_queue --> SomeSystemQueue
作业 --> rpc_CheckJob_queue--> SomeSystemQueue
尝试选择两个不同的队列名称,然后再次 运行 您的代码。
所以不是这个:
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "jobs",
routingKey: "saveJob_queue");
有:
var name = "Queue A";
channel.QueueDeclare(name);
channel.QueueBind(queue: queueName,
exchange: "jobs",
routingKey: "saveJob_queue");
然后为您的第二个队列命名并尝试。
所以我放弃了,决定只在 Received
事件中过滤。我认为问题在于 RabbitMQ 在 channel 上只有一个 Received
事件,但在 queue 上没有。所以 Received
事件无论哪种方式都会被击中。所以现在我有了这个:
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueDeclare(queue: rpcQueueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
switch (ea.RoutingKey)
{
case queueName:
SaveJob(ea);
break;
case rpcQueueName:
CheckJob(ea);
break;
}
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName,
noAck: false,
consumer: consumer);
channel.BasicConsume(queue: rpcQueueName,
noAck: false,
consumer: consumer);
我愿意接受更好的建议,因为这似乎有点不对劲。
所以发送只是:
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: properties,
body: body);
对于常规工作和:
var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;
var messageBytes = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "",
routingKey: rpcQueueName,
basicProperties: props,
body: messageBytes);
while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
if (ea.BasicProperties.CorrelationId == corrId)
{
return ea.Body != null && ea.Body.Any() ? BitConverter.ToInt32(ea.Body,0) : (int?)null;
}
}
对于 RPC。
我开始使用 RabbitMQ,在学习了教程之后,我现在正试图让它按照我需要的方式工作,但我 运行 遇到了困难。我的设置是我需要能够先创建一个 RPC,然后根据客户端的响应将(或不会)发送另一条消息到工作队列(我不需要响应客户端)。不幸的是,我为使它一起工作所做的努力似乎并没有按照我想要的方式进行。在服务器端,我有这样的事情(我尝试了很多变体,但都遇到了同样的问题):
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "jobs", type: "direct", durable: true);
// I started with a named queue, not sure if that's better or worse for this
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "jobs",
routingKey: "saveJob_queue");
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
// save stuff that was sent with the saveJob_queue routingKey
}
channel.BasicConsume(queue: queueName,
noAck: false,
consumer: consumer);
// set up channel for RPC
// Not sure if this has to have another channel, but it wasn't working on the same channel either
rpcChannel = connection.CreateModel();
var rpcQueueName = rpcChannel.QueueDeclare().QueueName;
rpcChannel.QueueBind(queue: rpcQueueName,
exchange: "jobs",
routingKey: "rpc_CheckJob_queue");
var rpcConsumer = new EventingBasicConsumer(rpcChannel);
rpcConsumer.Received += (model, ea) =>
{
// do my remote call and send back a response
}
我遇到的问题是,尽管事实上它应该只接收 saveJob_queue
条路线。我可以在该处理程序中检查 ea.RoutingKey
并忽略这些消息,但我不明白它们是如何以及为什么最终出现在那里的?
设置连接以便它可以接收工作队列消息和 RPC 消息并正确处理它们的正确方法是什么?
由于您没有为队列指定名称,我怀疑您得到了两次相同的队列。所以我认为正在发生的事情本质上是这样的。
作业 --> saveJob_queue --> SomeSystemQueue
作业 --> rpc_CheckJob_queue--> SomeSystemQueue
尝试选择两个不同的队列名称,然后再次 运行 您的代码。 所以不是这个:
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "jobs",
routingKey: "saveJob_queue");
有:
var name = "Queue A";
channel.QueueDeclare(name);
channel.QueueBind(queue: queueName,
exchange: "jobs",
routingKey: "saveJob_queue");
然后为您的第二个队列命名并尝试。
所以我放弃了,决定只在 Received
事件中过滤。我认为问题在于 RabbitMQ 在 channel 上只有一个 Received
事件,但在 queue 上没有。所以 Received
事件无论哪种方式都会被击中。所以现在我有了这个:
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueDeclare(queue: rpcQueueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
switch (ea.RoutingKey)
{
case queueName:
SaveJob(ea);
break;
case rpcQueueName:
CheckJob(ea);
break;
}
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName,
noAck: false,
consumer: consumer);
channel.BasicConsume(queue: rpcQueueName,
noAck: false,
consumer: consumer);
我愿意接受更好的建议,因为这似乎有点不对劲。
所以发送只是:
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: properties,
body: body);
对于常规工作和:
var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;
var messageBytes = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "",
routingKey: rpcQueueName,
basicProperties: props,
body: messageBytes);
while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
if (ea.BasicProperties.CorrelationId == corrId)
{
return ea.Body != null && ea.Body.Any() ? BitConverter.ToInt32(ea.Body,0) : (int?)null;
}
}
对于 RPC。