Rabbitmq 使用龙卷风不阻塞消息
Rabbitmq consume message to be non blocking with tornado
class WSHandler(tornado.websocket.WebSocketHandler):
clients = []
def open(self, name):
# WSHandler.clients.append(self)
# liveWebSockets.add(self)
self.id = name
self.clients.append(self)
# self.application.pc.add_event_listener(self)
print 'new connection'
def on_message(self, message):
print 'message received: %s' % message
# Reverse Message and send it back
print 'sending back message: %s' % message[::-1]
# pika sending message
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
# clients.append(self)
channel.queue_declare(queue='hello')
# print dir(self)
message_rabbit_mq = {
'web_socket': self.id,
'message': message
}
message_rabbit_mq = json.dumps(message_rabbit_mq)
channel.basic_publish(exchange='',
routing_key='hello',
body=message_rabbit_mq)
connection.close()
self.rabbit_connect()
# def rabbit_connect():
# pika receving message
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
self.write_message(body)
time.sleep(4)
body_obj = json.loads(body)
if 'message' in body:
if body_obj['message'] == "crack":
channel.stop_consuming()
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
self.write_message("closed reference")
上面代码中的问题是
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
self.write_message(body)
time.sleep(4)
body_obj = json.loads(body)
if 'message' in body:
if body_obj['message'] == "crack":
channel.stop_consuming()
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
以上部分阻止了 on_message 函数中的其余逻辑。我如何使上述部分 运行 与其余逻辑异步?
这使得来自客户端的更多 websocket 消息无法处理。
尝试使用此代码:
https://github.com/Gsantomaggio/rabbitmqexample/tree/master/webSocketPython
def threaded_rmq():
channel.queue_declare(queue="my_queue")
logging.info('consumer ready, on my_queue')
channel.basic_consume(consumer_callback, queue="my_queue", no_ack=True)
channel.start_consuming()
和
if __name__ == "__main__":
logging.info('Starting thread RabbitMQ')
threadRMQ = Thread(target=threaded_rmq)
threadRMQ.start()
class WSHandler(tornado.websocket.WebSocketHandler):
clients = []
def open(self, name):
# WSHandler.clients.append(self)
# liveWebSockets.add(self)
self.id = name
self.clients.append(self)
# self.application.pc.add_event_listener(self)
print 'new connection'
def on_message(self, message):
print 'message received: %s' % message
# Reverse Message and send it back
print 'sending back message: %s' % message[::-1]
# pika sending message
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
# clients.append(self)
channel.queue_declare(queue='hello')
# print dir(self)
message_rabbit_mq = {
'web_socket': self.id,
'message': message
}
message_rabbit_mq = json.dumps(message_rabbit_mq)
channel.basic_publish(exchange='',
routing_key='hello',
body=message_rabbit_mq)
connection.close()
self.rabbit_connect()
# def rabbit_connect():
# pika receving message
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
self.write_message(body)
time.sleep(4)
body_obj = json.loads(body)
if 'message' in body:
if body_obj['message'] == "crack":
channel.stop_consuming()
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
self.write_message("closed reference")
上面代码中的问题是
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
self.write_message(body)
time.sleep(4)
body_obj = json.loads(body)
if 'message' in body:
if body_obj['message'] == "crack":
channel.stop_consuming()
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
以上部分阻止了 on_message 函数中的其余逻辑。我如何使上述部分 运行 与其余逻辑异步? 这使得来自客户端的更多 websocket 消息无法处理。
尝试使用此代码:
https://github.com/Gsantomaggio/rabbitmqexample/tree/master/webSocketPython
def threaded_rmq():
channel.queue_declare(queue="my_queue")
logging.info('consumer ready, on my_queue')
channel.basic_consume(consumer_callback, queue="my_queue", no_ack=True)
channel.start_consuming()
和
if __name__ == "__main__":
logging.info('Starting thread RabbitMQ')
threadRMQ = Thread(target=threaded_rmq)
threadRMQ.start()