RabbitMQ 3.5 和消息优先级
RabbitMQ 3.5 and Message Priority
RabbitMQ 3.5 现在 supports message priority;
但是,我无法构建一个工作示例。我把我的代码放在下面。它包括我期望的输出和我实际的输出。我会对更多文档感兴趣,and/or 一个工作示例。
简而言之,我的问题是:如何让消息优先级在 Rabbit 3.5.0.0 中工作?
出版商:
using System;
using RabbitMQ.Client;
using System.Text;
using System.Collections.Generic;
class Publisher
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
IDictionary <String , Object> args = new Dictionary<String,Object>() ;
args.Add(" x-max-priority ", 10);
channel.QueueDeclare("task_queue1", true, false, true, args);
for (int i = 1 ; i<=10; i++ )
{
var message = "Message";
var body = Encoding.UTF8.GetBytes(message + " " + i);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
properties.Priority = Convert.ToByte(i);
channel.BasicPublish("", "task_queue1", properties, body);
}
}
}
}
}
消费者:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
using System.Collections.Generic;
namespace Consumer
{
class Worker
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
IDictionary<String, Object> args = new Dictionary<String, Object>();
channel.BasicQos(0, 1, false);
var consumer = new QueueingBasicConsumer(channel);
IDictionary<string, object> consumerArgs = new Dictionary<string, object>();
channel.BasicConsume( "task_queue1", false, "", args, consumer);
Console.WriteLine(" [*] Waiting for messages. " +
"To exit press CTRL+C");
while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
}
}
实际输出:
[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 1
[x] Received Message 2
[x] Received Message 3
[x] Received Message 4
[x] Received Message 5
[x] Received Message 6
[x] Received Message 7
[x] Received Message 8
[x] Received Message 9
[x] Received Message 10
预期输出:
[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 10
[x] Received Message 9
[x] Received Message 8
[x] Received Message 7
[x] Received Message 6
[x] Received Message 5
[x] Received Message 4
[x] Received Message 3
[x] Received Message 2
[x] Received Message 1
更新#1。
我在 Java here 中找到了一个示例。然而,它是 Rabbit 3.4.x.x。被合并到 3.5 中的插件。
我能看到的唯一区别是它们将优先级表示为 int 而我的是字节。但我觉得这是一条红鲱鱼。我在这里有点不知所措。
嗯,我解决了。
这是一个愚蠢的错误。
我写道:
args.Add(" x-max-priority ", 10);
应该是
args.Add("x-max-priority", 10);
我将保留它,以便其他人可以在 C# 中获得 Rabbitmq 3.5 优先级队列的工作示例。
Node JS 中类似的 RabbitMq 优先级队列实现
安装amqplib
为了测试,我们需要安装amqplib
npm install amqplib
发布者(send.js)
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
function on_connect(err, conn) {
if (err !== null) return bail(err);
// name of queue
var q = 'hello';
var msg = 'Hello World!';
var priorityValue = 0;
function on_channel_open(err, ch) {
if (err !== null) return bail(err, conn);
// maxPriority : max priority value supported by queue
ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
if (err !== null) return bail(err, conn);
for(var index=1; index<=100; index++) {
priorityValue = Math.floor((Math.random() * 10));
msg = 'Hello World!' + ' ' + index + ' ' + priorityValue;
ch.publish('', q, new Buffer(msg), {priority: priorityValue});
console.log(" [x] Sent '%s'", msg);
}
ch.close(function() { conn.close(); });
});
}
conn.createChannel(on_channel_open);
}
amqp.connect(on_connect);
订户(receive.js)
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
function on_connect(err, conn) {
if (err !== null) return bail(err);
process.once('SIGINT', function() { conn.close(); });
var q = 'hello';
function on_channel_open(err, ch) {
ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
if (err !== null) return bail(err, conn);
ch.consume(q, function(msg) { // message callback
console.log(" [x] Received '%s'", msg.content.toString());
}, {noAck: true}, function(_consumeOk) { // consume callback
console.log(' [*] Waiting for messages. To exit press CTRL+C');
});
});
}
conn.createChannel(on_channel_open);
}
amqp.connect(on_connect);
运行:
node send.js
它将创建一个名为 'hello' 的队列,并使用默认的 AMQP 交换将“1000”条样本消息充斥其中。
node receive.js
它将作为消费者订阅队列中等待的消息。
另一种可能性(对于未来的搜索者)
"Push" 消息传递方法似乎不尊重优先级。
http://rabbitmq.docs.pivotal.io/35/rabbit-web-docs/dotnet-api-guide.html.html
下面的引用了上面的URL。我把重要的部分加粗了。
通过订阅检索消息 ("push API")
接收消息的另一种方法是使用 IBasicConsumer 接口设置订阅。 消息将在到达时自动传送,而不必主动请求。实现消费者的一种方法是使用方便的 class EventingBasicConsumer,它会调度传送和其他消费者生命周期事件作为 C# 事件:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body;
// ... process the message
ch.BasicAck(ea.DeliveryTag, false);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);
通过更改为 "pull" 方法,优先级似乎得到了尊重。但是,在下面的 引述 中(来自上面相同的 url),看起来有一个权衡(我已经 加粗 )
正在获取个别消息 ("pull API")
要检索单个消息,请使用 IModel.BasicGet。返回值是BasicGetResult的一个实例,从中可以提取头部信息(属性)和消息体:
bool noAck = false;
BasicGetResult result = channel.BasicGet(queueName, noAck);
if (result == null) {
// No message available at this time.
} else {
IBasicProperties props = result.BasicProperties;
byte[] body = result.Body;
...
由于上面的noAck = false,你还必须调用IModel.BasicAck来确认你已经成功接收并处理了消息:
...
// acknowledge receipt of the message
channel.BasicAck(result.DeliveryTag, false);
}
请注意,使用此 API 获取消息的效率相对较低。 如果您更喜欢 RabbitMQ 将消息推送到客户端,请参阅下一节。
(本例中的 "next" 部分会将您带到此 post 顶部的 "push" 方法)
RabbitMQ 3.5 现在 supports message priority; 但是,我无法构建一个工作示例。我把我的代码放在下面。它包括我期望的输出和我实际的输出。我会对更多文档感兴趣,and/or 一个工作示例。
简而言之,我的问题是:如何让消息优先级在 Rabbit 3.5.0.0 中工作?
出版商:
using System;
using RabbitMQ.Client;
using System.Text;
using System.Collections.Generic;
class Publisher
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
IDictionary <String , Object> args = new Dictionary<String,Object>() ;
args.Add(" x-max-priority ", 10);
channel.QueueDeclare("task_queue1", true, false, true, args);
for (int i = 1 ; i<=10; i++ )
{
var message = "Message";
var body = Encoding.UTF8.GetBytes(message + " " + i);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
properties.Priority = Convert.ToByte(i);
channel.BasicPublish("", "task_queue1", properties, body);
}
}
}
}
}
消费者:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
using System.Collections.Generic;
namespace Consumer
{
class Worker
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
IDictionary<String, Object> args = new Dictionary<String, Object>();
channel.BasicQos(0, 1, false);
var consumer = new QueueingBasicConsumer(channel);
IDictionary<string, object> consumerArgs = new Dictionary<string, object>();
channel.BasicConsume( "task_queue1", false, "", args, consumer);
Console.WriteLine(" [*] Waiting for messages. " +
"To exit press CTRL+C");
while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
}
}
实际输出:
[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 1
[x] Received Message 2
[x] Received Message 3
[x] Received Message 4
[x] Received Message 5
[x] Received Message 6
[x] Received Message 7
[x] Received Message 8
[x] Received Message 9
[x] Received Message 10
预期输出:
[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 10
[x] Received Message 9
[x] Received Message 8
[x] Received Message 7
[x] Received Message 6
[x] Received Message 5
[x] Received Message 4
[x] Received Message 3
[x] Received Message 2
[x] Received Message 1
更新#1。 我在 Java here 中找到了一个示例。然而,它是 Rabbit 3.4.x.x。被合并到 3.5 中的插件。 我能看到的唯一区别是它们将优先级表示为 int 而我的是字节。但我觉得这是一条红鲱鱼。我在这里有点不知所措。
嗯,我解决了。 这是一个愚蠢的错误。 我写道:
args.Add(" x-max-priority ", 10);
应该是
args.Add("x-max-priority", 10);
我将保留它,以便其他人可以在 C# 中获得 Rabbitmq 3.5 优先级队列的工作示例。
Node JS 中类似的 RabbitMq 优先级队列实现
安装amqplib
为了测试,我们需要安装amqplib
npm install amqplib
发布者(send.js)
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
function on_connect(err, conn) {
if (err !== null) return bail(err);
// name of queue
var q = 'hello';
var msg = 'Hello World!';
var priorityValue = 0;
function on_channel_open(err, ch) {
if (err !== null) return bail(err, conn);
// maxPriority : max priority value supported by queue
ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
if (err !== null) return bail(err, conn);
for(var index=1; index<=100; index++) {
priorityValue = Math.floor((Math.random() * 10));
msg = 'Hello World!' + ' ' + index + ' ' + priorityValue;
ch.publish('', q, new Buffer(msg), {priority: priorityValue});
console.log(" [x] Sent '%s'", msg);
}
ch.close(function() { conn.close(); });
});
}
conn.createChannel(on_channel_open);
}
amqp.connect(on_connect);
订户(receive.js)
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
function on_connect(err, conn) {
if (err !== null) return bail(err);
process.once('SIGINT', function() { conn.close(); });
var q = 'hello';
function on_channel_open(err, ch) {
ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
if (err !== null) return bail(err, conn);
ch.consume(q, function(msg) { // message callback
console.log(" [x] Received '%s'", msg.content.toString());
}, {noAck: true}, function(_consumeOk) { // consume callback
console.log(' [*] Waiting for messages. To exit press CTRL+C');
});
});
}
conn.createChannel(on_channel_open);
}
amqp.connect(on_connect);
运行:
node send.js
它将创建一个名为 'hello' 的队列,并使用默认的 AMQP 交换将“1000”条样本消息充斥其中。
node receive.js
它将作为消费者订阅队列中等待的消息。
另一种可能性(对于未来的搜索者)
"Push" 消息传递方法似乎不尊重优先级。
http://rabbitmq.docs.pivotal.io/35/rabbit-web-docs/dotnet-api-guide.html.html
下面的引用了上面的URL。我把重要的部分加粗了。
通过订阅检索消息 ("push API")
接收消息的另一种方法是使用 IBasicConsumer 接口设置订阅。 消息将在到达时自动传送,而不必主动请求。实现消费者的一种方法是使用方便的 class EventingBasicConsumer,它会调度传送和其他消费者生命周期事件作为 C# 事件:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body;
// ... process the message
ch.BasicAck(ea.DeliveryTag, false);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);
通过更改为 "pull" 方法,优先级似乎得到了尊重。但是,在下面的 引述 中(来自上面相同的 url),看起来有一个权衡(我已经 加粗 )
正在获取个别消息 ("pull API") 要检索单个消息,请使用 IModel.BasicGet。返回值是BasicGetResult的一个实例,从中可以提取头部信息(属性)和消息体:
bool noAck = false;
BasicGetResult result = channel.BasicGet(queueName, noAck);
if (result == null) {
// No message available at this time.
} else {
IBasicProperties props = result.BasicProperties;
byte[] body = result.Body;
...
由于上面的noAck = false,你还必须调用IModel.BasicAck来确认你已经成功接收并处理了消息:
...
// acknowledge receipt of the message
channel.BasicAck(result.DeliveryTag, false);
}
请注意,使用此 API 获取消息的效率相对较低。 如果您更喜欢 RabbitMQ 将消息推送到客户端,请参阅下一节。
(本例中的 "next" 部分会将您带到此 post 顶部的 "push" 方法)