Python Tornado - 如何创建后台进程?
Python Tornado - How to create background process?
是否可以让 Python Tornado 运行 一些较长的后台进程,但同时它也为所有处理程序服务?
我有一个为某些网页提供服务的 Tornado Webapp。但是我还有一个消息队列,我希望 Tornado 以订阅者的身份轮询消息队列。这可以在 Tornado 中完成吗?
我搜索了用户指南,似乎有一个叫做 periodic_call_back
的东西我可以在 ioloop 中使用。听起来我可以使用读取消息队列的回调函数。但是,有没有办法创建一个永不停止的协程?
感谢任何帮助,谢谢!
从零 MQ 读取:
- 安装零 MQ Python 库
- 在 application.listen()
之前安装 IOLoop
- 使用执行器(对于python2,你可以从python3安装执行器库)来执行一个消息队列监听器,它设置tornado来监听一个消息队列,然后它会利用收到数据时回调。
示例 (main.py):
# Import tornado libraries
import tornado.ioloop
import tornado.web
# Import URL mappings
from url import application
# Import zeroMQ libraries
from zmq.eventloop import ioloop
# Import zeroMQ.py functions
from zeroMQ import startListenToMessageQueue
# Import zeroMQ settings
import zeroMQ_settings
# Import our executor
import executors
# Import our db_settings
import db_settings
# main.py is the main access point of the tornado app, to run the application, just run "python main.py"
# What this will do is listen to port 8888, and then we can access the app using
# http://localhost:8888 on any browser, or using python requests library
if __name__ == "__main__":
# Install PyZMQ's IOLoop
ioloop.install()
# Set the application to listen to port 8888
application.listen(8888)
# Get the current IOLoop
currentIOLoop = tornado.ioloop.IOLoop.current()
# Execute ZeroMQ Subscriber for our topics
executors.executor.submit(startListenToMessageQueue(zeroMQ_settings.server_subscriber_ports,
zeroMQ_settings.server_subscriber_IP,
zeroMQ_settings.server_subscribe_list))
# Test if the connection to our database is successful before we start the IOLoop
db_settings.testForDatabase(db_settings.database)
# Start the IOLoop
currentIOLoop.start()
示例 (zeroMQ.py):
# Import our executor
import executors
# Import zeroMQ libraries
import zmq
from zmq.eventloop import ioloop, zmqstream
# Import db functions to process the message
import db
# zeroMQ.py deals with the communication between a zero message queue
def startListenToMessageQueue(subscribe_ports, subscribe_IP, subscribe_topic):
# Usage:
# This function starts the subscriber for our application that will listen to the
# address and ports specified in the zeroMQ_settings.py, it will spawn a callback when we
# received anything relevant to our topic.
# Arguments:
# None
# Return:
# None
# Get zmq context
context = zmq.Context()
# Get the context socket
socket_sub = context.socket(zmq.SUB)
# Connect to multiple subscriber ports
for ports in subscribe_ports:
socket_sub.connect("tcp://"+str(subscribe_IP)+":"+str(ports))
# Subscribe to our relevant topics
for topic in subscribe_topic:
socket_sub.setsockopt(zmq.SUBSCRIBE, topic)
# Setup ZMQ Stream with our socket
stream_sub = zmqstream.ZMQStream(socket_sub)
# When we recieve our data, we will process the data by using a callback
stream_sub.on_recv(processMessage)
# Print the Information to Console
print "Connected to publisher with IP:" + \
str(subscribe_IP) + ", Port" + str(subscribe_ports) + ", Topic:" + str(subscribe_topic)
def processMessage(message):
# Usage:
# This function processes the data using a callback format. The on_recv will call this function
# and populate the message variable with the data that we recieved through the message queue
# Arguments:
# message: a string containing the data that we recieved from the message queue
# Return:
# None
# Process the message with an executor, and use the addData function in our db to process the message
executors.executor.submit(db.addData, message)
示例 (executors.py):
# Import futures library
from concurrent import futures
# executors.py will create our threadpools, and this can be shared around different python files
# which will not re-create 10 threadpools when we call it.
# we can a handful of executors for running synchronous tasks
# Create a 10 thread threadpool that we can use to call any synchronous/blocking functions
executor = futures.ThreadPoolExecutor(10)
示例 (zeroMQ_settings.py):
# zeroMQ_settings.py keep the settings for zeroMQ, for example port, IP, and topics that
# we need to subscribe
# Set the Port to 5558
server_subscriber_ports = ["5556", "5558"]
# Set IP to localhost
server_subscriber_IP = "localhost"
# Set Message to Subscribe: metrics.dat
server_subscriber_topic_metrics = "metrics.dat"
# Set Message to Subscribe: test-010
server_subscribe_topics_test_010 = "test-010"
# List of Subscriptions
server_subscribe_list = [server_subscriber_topic_metrics, server_subscribe_topics_test_010]
特别感谢@dano
是否可以让 Python Tornado 运行 一些较长的后台进程,但同时它也为所有处理程序服务?
我有一个为某些网页提供服务的 Tornado Webapp。但是我还有一个消息队列,我希望 Tornado 以订阅者的身份轮询消息队列。这可以在 Tornado 中完成吗?
我搜索了用户指南,似乎有一个叫做 periodic_call_back
的东西我可以在 ioloop 中使用。听起来我可以使用读取消息队列的回调函数。但是,有没有办法创建一个永不停止的协程?
感谢任何帮助,谢谢!
从零 MQ 读取:
- 安装零 MQ Python 库
- 在 application.listen() 之前安装 IOLoop
- 使用执行器(对于python2,你可以从python3安装执行器库)来执行一个消息队列监听器,它设置tornado来监听一个消息队列,然后它会利用收到数据时回调。
示例 (main.py):
# Import tornado libraries
import tornado.ioloop
import tornado.web
# Import URL mappings
from url import application
# Import zeroMQ libraries
from zmq.eventloop import ioloop
# Import zeroMQ.py functions
from zeroMQ import startListenToMessageQueue
# Import zeroMQ settings
import zeroMQ_settings
# Import our executor
import executors
# Import our db_settings
import db_settings
# main.py is the main access point of the tornado app, to run the application, just run "python main.py"
# What this will do is listen to port 8888, and then we can access the app using
# http://localhost:8888 on any browser, or using python requests library
if __name__ == "__main__":
# Install PyZMQ's IOLoop
ioloop.install()
# Set the application to listen to port 8888
application.listen(8888)
# Get the current IOLoop
currentIOLoop = tornado.ioloop.IOLoop.current()
# Execute ZeroMQ Subscriber for our topics
executors.executor.submit(startListenToMessageQueue(zeroMQ_settings.server_subscriber_ports,
zeroMQ_settings.server_subscriber_IP,
zeroMQ_settings.server_subscribe_list))
# Test if the connection to our database is successful before we start the IOLoop
db_settings.testForDatabase(db_settings.database)
# Start the IOLoop
currentIOLoop.start()
示例 (zeroMQ.py):
# Import our executor
import executors
# Import zeroMQ libraries
import zmq
from zmq.eventloop import ioloop, zmqstream
# Import db functions to process the message
import db
# zeroMQ.py deals with the communication between a zero message queue
def startListenToMessageQueue(subscribe_ports, subscribe_IP, subscribe_topic):
# Usage:
# This function starts the subscriber for our application that will listen to the
# address and ports specified in the zeroMQ_settings.py, it will spawn a callback when we
# received anything relevant to our topic.
# Arguments:
# None
# Return:
# None
# Get zmq context
context = zmq.Context()
# Get the context socket
socket_sub = context.socket(zmq.SUB)
# Connect to multiple subscriber ports
for ports in subscribe_ports:
socket_sub.connect("tcp://"+str(subscribe_IP)+":"+str(ports))
# Subscribe to our relevant topics
for topic in subscribe_topic:
socket_sub.setsockopt(zmq.SUBSCRIBE, topic)
# Setup ZMQ Stream with our socket
stream_sub = zmqstream.ZMQStream(socket_sub)
# When we recieve our data, we will process the data by using a callback
stream_sub.on_recv(processMessage)
# Print the Information to Console
print "Connected to publisher with IP:" + \
str(subscribe_IP) + ", Port" + str(subscribe_ports) + ", Topic:" + str(subscribe_topic)
def processMessage(message):
# Usage:
# This function processes the data using a callback format. The on_recv will call this function
# and populate the message variable with the data that we recieved through the message queue
# Arguments:
# message: a string containing the data that we recieved from the message queue
# Return:
# None
# Process the message with an executor, and use the addData function in our db to process the message
executors.executor.submit(db.addData, message)
示例 (executors.py):
# Import futures library
from concurrent import futures
# executors.py will create our threadpools, and this can be shared around different python files
# which will not re-create 10 threadpools when we call it.
# we can a handful of executors for running synchronous tasks
# Create a 10 thread threadpool that we can use to call any synchronous/blocking functions
executor = futures.ThreadPoolExecutor(10)
示例 (zeroMQ_settings.py):
# zeroMQ_settings.py keep the settings for zeroMQ, for example port, IP, and topics that
# we need to subscribe
# Set the Port to 5558
server_subscriber_ports = ["5556", "5558"]
# Set IP to localhost
server_subscriber_IP = "localhost"
# Set Message to Subscribe: metrics.dat
server_subscriber_topic_metrics = "metrics.dat"
# Set Message to Subscribe: test-010
server_subscribe_topics_test_010 = "test-010"
# List of Subscriptions
server_subscribe_list = [server_subscriber_topic_metrics, server_subscribe_topics_test_010]
特别感谢@dano