我们可以通过 RabbitMQ 加快发布消息的速度吗
Can we speed up publishing messages via RabbitMQ
我正在 运行 我的 Ubuntu 工作站上进行一些测试。这些基准测试从填充一个运行非常缓慢的队列开始:
import pika
import datetime
if __name__ == '__main__':
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello_durable', durable=True)
started_at = datetime.datetime.now()
properties = pika.BasicProperties(delivery_mode=2)
for i in range(0, 100000):
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
properties=properties)
if i%10000 == 0:
duration = datetime.datetime.now() - started_at
print(i, duration.total_seconds())
print(" [x] Sent 'Hello World!'")
connection.close()
now = datetime.datetime.now()
duration = now - started_at
print(duration.total_seconds())
except Exception as e:
print(e)
发送10K条消息需要30多秒。根据top命令,工作站有12个核心,这些核心不忙。有超过 8Gb 的可用内存。队列是否持久并不重要。
我们怎样才能加快发送消息的速度?
我假设您 运行 没有任何消费者 These benchmarks start with populating a queue
。
由于您只发布消息,因此 rabbitmq 切换到流状态。更准确地说,您的交换 and/or 队列进入流动状态。
引自 rabbitmq blog
This (roughly) means that the client is being rate-limited; it would
like to publish faster but the server can't keep up
我敢肯定,如果您仔细观察,您会发现消息的第一部分(在初始设置时,队列为空)速度很快,但发送速率在某些时候急剧下降。
从 BlockingConnection 切换到 SelectConnection 产生了巨大的变化,将过程加快了将近五十倍。我需要做的就是修改 the following tutorial: 中的示例,循环发布消息:
import pika
# Step #3
def on_open(connection):
connection.channel(on_channel_open)
# Step #4
def on_channel_open(channel):
channel.basic_publish('test_exchange',
'test_routing_key',
'message body value',
pika.BasicProperties(content_type='text/plain',
delivery_mode=1))
connection.close()
# Step #1: Connect to RabbitMQ
parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
on_open_callback=on_open)
try:
# Step #2 - Block on the IOLoop
connection.ioloop.start()
# Catch a Keyboard Interrupt to make sure that the connection is closed cleanly
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Start the IOLoop again so Pika can communicate, it will stop on its own when the connection is closed
connection.ioloop.start()
我正在 运行 我的 Ubuntu 工作站上进行一些测试。这些基准测试从填充一个运行非常缓慢的队列开始:
import pika
import datetime
if __name__ == '__main__':
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello_durable', durable=True)
started_at = datetime.datetime.now()
properties = pika.BasicProperties(delivery_mode=2)
for i in range(0, 100000):
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
properties=properties)
if i%10000 == 0:
duration = datetime.datetime.now() - started_at
print(i, duration.total_seconds())
print(" [x] Sent 'Hello World!'")
connection.close()
now = datetime.datetime.now()
duration = now - started_at
print(duration.total_seconds())
except Exception as e:
print(e)
发送10K条消息需要30多秒。根据top命令,工作站有12个核心,这些核心不忙。有超过 8Gb 的可用内存。队列是否持久并不重要。
我们怎样才能加快发送消息的速度?
我假设您 运行 没有任何消费者 These benchmarks start with populating a queue
。
由于您只发布消息,因此 rabbitmq 切换到流状态。更准确地说,您的交换 and/or 队列进入流动状态。
引自 rabbitmq blog
This (roughly) means that the client is being rate-limited; it would like to publish faster but the server can't keep up
我敢肯定,如果您仔细观察,您会发现消息的第一部分(在初始设置时,队列为空)速度很快,但发送速率在某些时候急剧下降。
从 BlockingConnection 切换到 SelectConnection 产生了巨大的变化,将过程加快了将近五十倍。我需要做的就是修改 the following tutorial: 中的示例,循环发布消息:
import pika
# Step #3
def on_open(connection):
connection.channel(on_channel_open)
# Step #4
def on_channel_open(channel):
channel.basic_publish('test_exchange',
'test_routing_key',
'message body value',
pika.BasicProperties(content_type='text/plain',
delivery_mode=1))
connection.close()
# Step #1: Connect to RabbitMQ
parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
on_open_callback=on_open)
try:
# Step #2 - Block on the IOLoop
connection.ioloop.start()
# Catch a Keyboard Interrupt to make sure that the connection is closed cleanly
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Start the IOLoop again so Pika can communicate, it will stop on its own when the connection is closed
connection.ioloop.start()