C# RabbitMQ 为什么第二个工人不接工作?
C# RabbitMQ why is the second worker not picking up work?
我正在尝试了解所有 RabbitMQ 选项,我想我想要的只是一个工作队列,所以我有一个队列,工作人员只需取出一个项目并处理它。
我创建了一个新的 Direct Exchange(我认为这是对的!?)
首先我想知道为什么在这个例子中,我向 exchange/queue 添加了 4 条新消息。我不启动任何工人。然后我开始第一个工作,然后是第二个,但是第二个不处理任何一个,第一个工作处理所有!?
我做错了什么,为什么这不起作用?请在下面找到完整的示例代码。
我似乎也没有确认发布工作正常,因为只有有时在最右边的输出上才会显示“消息已确认...”
我已阅读 https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html 并浏览了后面的其他页面,但不是很清楚。
发射器:
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
namespace PublishConfirms.Emit
{
class Program
{
public static ConcurrentDictionary<ulong, string> _outstandingConfirms = new ConcurrentDictionary<ulong, string>();
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
HostName = "localhost"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// We need to enable published confirms on the channel
channel.ConfirmSelect();
channel.ExchangeDeclare(exchange: "DirectExchange",
type: ExchangeType.Direct);
var queueName = "DirectExchangeQueue";
// Make sure to create the queue in case it doesn't exits
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicAcks += (sender, ea) =>
{
Console.WriteLine("Message Acknowledged with Delivery Tag {0}", ea.DeliveryTag);
// message is confirmed
CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};
channel.BasicNacks += (sender, ea) =>
{
// message is nack-ed (messages that have been lost
_outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};
var message = "A message here";
var body = Encoding.UTF8.GetBytes(message);
_outstandingConfirms.TryAdd(channel.NextPublishSeqNo, message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: properties,
body: body);
Console.WriteLine("Sent message '{0}'", message);
}
}
Console.WriteLine("Press any key to exit");
Console.ReadLine();
}
static void CleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{
if (multiple)
{
var confirmed = _outstandingConfirms.Where(k => k.Key <= sequenceNumber);
foreach (var entry in confirmed)
{
_outstandingConfirms.TryRemove(entry.Key, out _);
}
}
else
{
_outstandingConfirms.TryRemove(sequenceNumber, out _);
}
}
}
}
worker/receiver
using System;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace PublishConfirms.Receive
{
class Program
{
// We can run multiples of these and only one will get a messages from the queue with no sharing
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
HostName = "localhost"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// This will create the exchange if needed
channel.ExchangeDeclare(exchange: "DirectExchange",
type: ExchangeType.Direct);
var queueName = "DirectExchangeQueue";
// Make sure to create the queue in case it doesn't exits
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: queueName,
exchange: "DirectExchange",
routingKey: ""); // We keep the routing key the same as we dont want different handlers
// If we were to have different routes then we
// would most probably have to create a random queue e.g.
// var queueName = channel.QueueDeclare().QueueName;
Console.WriteLine("Waiting for messages...");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Received message: {0}", message);
Console.WriteLine("Processing...");
Thread.Sleep(3000); // simulate some work
Console.WriteLine("Processing Complete");
// send an acknowledgement back
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);
Console.WriteLine("Press any key to exit");
Console.ReadLine();
}
}
}
}
}
您的问题在于理解 Direct
交换器的工作原理,可能还有队列 binding keys
和 routing keys
。
Direct
交换机将其消息传递到 binding key
与消息的 routing key
完全匹配的队列。
binding key
是队列绑定到交换所依据的键。
routing key
是发送的消息由
路由的密钥
因此您的代码的问题是:
- 通过发布您的消息:
...
channel.BasicPublish(exchange: "",
routingKey: "", //
basicProperties: properties,
body: body);
...
- 或者您可以在创建队列时更改绑定键:
...
channel.QueueBind(queue: queueName,
exchange: "DirectExchange",
routingKey: queueName); // As you stated in your code that
// the queue name is your bindingKey
...
你可以选择你喜欢的那个。
除此之外,当你发布你的消息时,你将它发送到一个空的交换名称,
应该是:
channel.BasicPublish(exchange: "DirectExchange", // As you stated at the start of your code
routingKey: queueName,
basicProperties: properties,
body: body);
我建议阅读更多有关 RabbitMQ 的内容,并了解更多有关其实施的概念。
您可能想查看有关以下内容的参考资料:
希望对您有所帮助。
在阅读了 itama 所说的内容后,我又看了一眼,首先 BasicPublish 交换是一个空字符串,所以没有帮助!我决定不使用路由键作为队列名称并将其保留为空字符串。
对我来说,我想要交换一个队列,然后多个工作人员一次处理队列中的一个项目。
当我在 worker 上添加这段代码时,它只由一个工作人员处理的问题似乎得到了解决。
channel.BasicQos(0, 1, false);
添加这个后,其他工作人员各收到一条消息。我最终向队列中添加了 50-100 条消息,然后启动了 10 个工作人员,他们都得到了一个队列项并对其进行了处理,这很高兴看到。
对于发布确认 BasicAcks 和 BasicNacks 回调,我发现了这个 link https://rianjs.net/2013/12/publisher-confirms-with-rabbitmq-and-c-sharp 这表明我们需要调用 channel.WaitFormConfirmsOrDie() 在调用 BasicPublish 之后,之前我的代码没有等待确认。
完整代码如下:
emitter/producer:
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
namespace PublishConfirms.Emit
{
class Program
{
public static ConcurrentDictionary<ulong, string> _outstandingConfirms = new ConcurrentDictionary<ulong, string>();
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
HostName = "localhost"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// -------------------------------------------------------------------
// Setup the Exchange and Queue then bind the queue to the exchange
// -------------------------------------------------------------------
// This will create the exchange if needed
channel.ExchangeDeclare(exchange: "DirectExchange",
type: ExchangeType.Direct);
var queueName = "DirectExchangeQueue";
// Make sure to create the queue in case it doesn't exits
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: queueName,
exchange: "DirectExchange",
routingKey: "");
// -------------------------------------------------------------------
// Setup Publish Confirms
// -------------------------------------------------------------------
channel.BasicAcks += (sender, ea) =>
{
Console.WriteLine("Message Acknowledged with Delivery Tag {0}", ea.DeliveryTag);
// message is confirmed
CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};
channel.BasicNacks += (sender, ea) =>
{
// message is nack-ed (messages that have been lost)
_outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};
// We need to enable published confirms on the channel
channel.ConfirmSelect();
// -------------------------------------------------------------------
// Setup The message and add it to the ConcurrentDictionary, so we can
// Remove it when the BasicAcks is called
// -------------------------------------------------------------------
var message = "YOUR MESSAGE HERE";
var body = Encoding.UTF8.GetBytes(message);
var nextPublishSequenceNo = channel.NextPublishSeqNo;
Console.WriteLine("Next Publish Sequenece Number: {0}", nextPublishSequenceNo);
_outstandingConfirms.TryAdd(nextPublishSequenceNo, message);
// Make sure the message is written to disk as soon as it reaches the queue
// Imagine this will be slower but safer, this is also stored in memory if there is no memory pressure!
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "DirectExchange",
routingKey: "",
basicProperties: properties,
body: body);
channel.WaitForConfirmsOrDie();
Console.WriteLine("Sent message '{0}'", message);
}
}
}
static void CleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{
if (multiple)
{
var confirmed = _outstandingConfirms.Where(k => k.Key <= sequenceNumber);
foreach (var entry in confirmed)
{
_outstandingConfirms.TryRemove(entry.Key, out _);
}
}
else
{
_outstandingConfirms.TryRemove(sequenceNumber, out _);
}
}
}
}
receiver/worker:
using System;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace PublishConfirms.Receive
{
class Program
{
// We can run multiples of these and only one will get a messages from the queue with no sharing
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
HostName = "localhost"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// -------------------------------------------------------------------
// Setup the Exchange and Queue then bind the queue to the exchange
// -------------------------------------------------------------------
// This will create the exchange if needed
channel.ExchangeDeclare(exchange: "DirectExchange",
type: ExchangeType.Direct);
var queueName = "DirectExchangeQueue";
// Make sure to create the queue in case it doesn't exits
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
// Only dispatch one message at a time to a worked and wait for its acknowledgement
// Adding this in seems to correct the issue where only one worker would pick
// up the queued items
channel.BasicQos(0, 1, false);
// We keep the routing key the same as we don't want different handlers
// If we were to have different routes then we
// would most probably have to create a random queue e.g.
// var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "DirectExchange",
routingKey: "");
Console.WriteLine("Waiting for messages...");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Received message: {0}", message);
Console.WriteLine("Processing...");
Thread.Sleep(3000); // simulate some work
Console.WriteLine("Processing Complete");
// send an acknowledgement back
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);
Console.WriteLine("Press any key to exit");
Console.ReadLine();
}
}
}
}
}
我正在尝试了解所有 RabbitMQ 选项,我想我想要的只是一个工作队列,所以我有一个队列,工作人员只需取出一个项目并处理它。
我创建了一个新的 Direct Exchange(我认为这是对的!?)
首先我想知道为什么在这个例子中,我向 exchange/queue 添加了 4 条新消息。我不启动任何工人。然后我开始第一个工作,然后是第二个,但是第二个不处理任何一个,第一个工作处理所有!?
我做错了什么,为什么这不起作用?请在下面找到完整的示例代码。
我似乎也没有确认发布工作正常,因为只有有时在最右边的输出上才会显示“消息已确认...” 我已阅读 https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html 并浏览了后面的其他页面,但不是很清楚。
发射器:
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
namespace PublishConfirms.Emit
{
class Program
{
public static ConcurrentDictionary<ulong, string> _outstandingConfirms = new ConcurrentDictionary<ulong, string>();
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
HostName = "localhost"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// We need to enable published confirms on the channel
channel.ConfirmSelect();
channel.ExchangeDeclare(exchange: "DirectExchange",
type: ExchangeType.Direct);
var queueName = "DirectExchangeQueue";
// Make sure to create the queue in case it doesn't exits
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicAcks += (sender, ea) =>
{
Console.WriteLine("Message Acknowledged with Delivery Tag {0}", ea.DeliveryTag);
// message is confirmed
CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};
channel.BasicNacks += (sender, ea) =>
{
// message is nack-ed (messages that have been lost
_outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};
var message = "A message here";
var body = Encoding.UTF8.GetBytes(message);
_outstandingConfirms.TryAdd(channel.NextPublishSeqNo, message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: properties,
body: body);
Console.WriteLine("Sent message '{0}'", message);
}
}
Console.WriteLine("Press any key to exit");
Console.ReadLine();
}
static void CleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{
if (multiple)
{
var confirmed = _outstandingConfirms.Where(k => k.Key <= sequenceNumber);
foreach (var entry in confirmed)
{
_outstandingConfirms.TryRemove(entry.Key, out _);
}
}
else
{
_outstandingConfirms.TryRemove(sequenceNumber, out _);
}
}
}
}
worker/receiver
using System;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace PublishConfirms.Receive
{
class Program
{
// We can run multiples of these and only one will get a messages from the queue with no sharing
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
HostName = "localhost"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// This will create the exchange if needed
channel.ExchangeDeclare(exchange: "DirectExchange",
type: ExchangeType.Direct);
var queueName = "DirectExchangeQueue";
// Make sure to create the queue in case it doesn't exits
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: queueName,
exchange: "DirectExchange",
routingKey: ""); // We keep the routing key the same as we dont want different handlers
// If we were to have different routes then we
// would most probably have to create a random queue e.g.
// var queueName = channel.QueueDeclare().QueueName;
Console.WriteLine("Waiting for messages...");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Received message: {0}", message);
Console.WriteLine("Processing...");
Thread.Sleep(3000); // simulate some work
Console.WriteLine("Processing Complete");
// send an acknowledgement back
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);
Console.WriteLine("Press any key to exit");
Console.ReadLine();
}
}
}
}
}
您的问题在于理解 Direct
交换器的工作原理,可能还有队列 binding keys
和 routing keys
。
Direct
交换机将其消息传递到 binding key
与消息的 routing key
完全匹配的队列。
binding key
是队列绑定到交换所依据的键。
routing key
是发送的消息由
因此您的代码的问题是:
- 通过发布您的消息:
...
channel.BasicPublish(exchange: "",
routingKey: "", //
basicProperties: properties,
body: body);
...
- 或者您可以在创建队列时更改绑定键:
...
channel.QueueBind(queue: queueName,
exchange: "DirectExchange",
routingKey: queueName); // As you stated in your code that
// the queue name is your bindingKey
...
你可以选择你喜欢的那个。
除此之外,当你发布你的消息时,你将它发送到一个空的交换名称, 应该是:
channel.BasicPublish(exchange: "DirectExchange", // As you stated at the start of your code
routingKey: queueName,
basicProperties: properties,
body: body);
我建议阅读更多有关 RabbitMQ 的内容,并了解更多有关其实施的概念。 您可能想查看有关以下内容的参考资料:
希望对您有所帮助。
在阅读了 itama 所说的内容后,我又看了一眼,首先 BasicPublish 交换是一个空字符串,所以没有帮助!我决定不使用路由键作为队列名称并将其保留为空字符串。
对我来说,我想要交换一个队列,然后多个工作人员一次处理队列中的一个项目。
当我在 worker 上添加这段代码时,它只由一个工作人员处理的问题似乎得到了解决。
channel.BasicQos(0, 1, false);
添加这个后,其他工作人员各收到一条消息。我最终向队列中添加了 50-100 条消息,然后启动了 10 个工作人员,他们都得到了一个队列项并对其进行了处理,这很高兴看到。
对于发布确认 BasicAcks 和 BasicNacks 回调,我发现了这个 link https://rianjs.net/2013/12/publisher-confirms-with-rabbitmq-and-c-sharp 这表明我们需要调用 channel.WaitFormConfirmsOrDie() 在调用 BasicPublish 之后,之前我的代码没有等待确认。
完整代码如下:
emitter/producer:
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
namespace PublishConfirms.Emit
{
class Program
{
public static ConcurrentDictionary<ulong, string> _outstandingConfirms = new ConcurrentDictionary<ulong, string>();
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
HostName = "localhost"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// -------------------------------------------------------------------
// Setup the Exchange and Queue then bind the queue to the exchange
// -------------------------------------------------------------------
// This will create the exchange if needed
channel.ExchangeDeclare(exchange: "DirectExchange",
type: ExchangeType.Direct);
var queueName = "DirectExchangeQueue";
// Make sure to create the queue in case it doesn't exits
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue: queueName,
exchange: "DirectExchange",
routingKey: "");
// -------------------------------------------------------------------
// Setup Publish Confirms
// -------------------------------------------------------------------
channel.BasicAcks += (sender, ea) =>
{
Console.WriteLine("Message Acknowledged with Delivery Tag {0}", ea.DeliveryTag);
// message is confirmed
CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};
channel.BasicNacks += (sender, ea) =>
{
// message is nack-ed (messages that have been lost)
_outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};
// We need to enable published confirms on the channel
channel.ConfirmSelect();
// -------------------------------------------------------------------
// Setup The message and add it to the ConcurrentDictionary, so we can
// Remove it when the BasicAcks is called
// -------------------------------------------------------------------
var message = "YOUR MESSAGE HERE";
var body = Encoding.UTF8.GetBytes(message);
var nextPublishSequenceNo = channel.NextPublishSeqNo;
Console.WriteLine("Next Publish Sequenece Number: {0}", nextPublishSequenceNo);
_outstandingConfirms.TryAdd(nextPublishSequenceNo, message);
// Make sure the message is written to disk as soon as it reaches the queue
// Imagine this will be slower but safer, this is also stored in memory if there is no memory pressure!
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "DirectExchange",
routingKey: "",
basicProperties: properties,
body: body);
channel.WaitForConfirmsOrDie();
Console.WriteLine("Sent message '{0}'", message);
}
}
}
static void CleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{
if (multiple)
{
var confirmed = _outstandingConfirms.Where(k => k.Key <= sequenceNumber);
foreach (var entry in confirmed)
{
_outstandingConfirms.TryRemove(entry.Key, out _);
}
}
else
{
_outstandingConfirms.TryRemove(sequenceNumber, out _);
}
}
}
}
receiver/worker:
using System;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace PublishConfirms.Receive
{
class Program
{
// We can run multiples of these and only one will get a messages from the queue with no sharing
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
HostName = "localhost"
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// -------------------------------------------------------------------
// Setup the Exchange and Queue then bind the queue to the exchange
// -------------------------------------------------------------------
// This will create the exchange if needed
channel.ExchangeDeclare(exchange: "DirectExchange",
type: ExchangeType.Direct);
var queueName = "DirectExchangeQueue";
// Make sure to create the queue in case it doesn't exits
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
// Only dispatch one message at a time to a worked and wait for its acknowledgement
// Adding this in seems to correct the issue where only one worker would pick
// up the queued items
channel.BasicQos(0, 1, false);
// We keep the routing key the same as we don't want different handlers
// If we were to have different routes then we
// would most probably have to create a random queue e.g.
// var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "DirectExchange",
routingKey: "");
Console.WriteLine("Waiting for messages...");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Received message: {0}", message);
Console.WriteLine("Processing...");
Thread.Sleep(3000); // simulate some work
Console.WriteLine("Processing Complete");
// send an acknowledgement back
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);
Console.WriteLine("Press any key to exit");
Console.ReadLine();
}
}
}
}
}