Python Tornado - 如何创建后台进程?

Python Tornado - How to create background process?

是否可以让 Python Tornado 运行 一些较长的后台进程,但同时它也为所有处理程序服务?

我有一个为某些网页提供服务的 Tornado Webapp。但是我还有一个消息队列,我希望 Tornado 以订阅者的身份轮询消息队列。这可以在 Tornado 中完成吗?

我搜索了用户指南,似乎有一个叫做 periodic_call_back 的东西我可以在 ioloop 中使用。听起来我可以使用读取消息队列的回调函数。但是,有没有办法创建一个永不停止的协程?

感谢任何帮助,谢谢!

从零 MQ 读取:

  1. 安装零 MQ Python 库
  2. 在 application.listen()
  3. 之前安装 IOLoop
  4. 使用执行器(对于python2,你可以从python3安装执行器库)来执行一个消息队列监听器,它设置tornado来监听一个消息队列,然后它会利用收到数据时回调。

示例 (main.py):

# Import tornado libraries
import tornado.ioloop
import tornado.web
# Import URL mappings
from url import application
# Import zeroMQ libraries
from zmq.eventloop import ioloop
# Import zeroMQ.py functions
from zeroMQ import startListenToMessageQueue
# Import zeroMQ settings
import zeroMQ_settings
# Import our executor
import executors
# Import our db_settings
import db_settings

# main.py is the main access point of the tornado app, to run the application, just run "python main.py"

# What this will do is listen to port 8888, and then we can access the app using
# http://localhost:8888 on any browser, or using python requests library
if __name__ == "__main__":
    # Install PyZMQ's IOLoop
    ioloop.install()

    # Set the application to listen to port 8888
    application.listen(8888)

    # Get the current IOLoop
    currentIOLoop = tornado.ioloop.IOLoop.current()

    # Execute ZeroMQ Subscriber for our topics
    executors.executor.submit(startListenToMessageQueue(zeroMQ_settings.server_subscriber_ports,
                                                        zeroMQ_settings.server_subscriber_IP,
                                                        zeroMQ_settings.server_subscribe_list))

    # Test if the connection to our database is successful before we start the IOLoop
    db_settings.testForDatabase(db_settings.database)

    # Start the IOLoop
    currentIOLoop.start()

示例 (zeroMQ.py):

# Import our executor
import executors
# Import zeroMQ libraries
import zmq
from zmq.eventloop import ioloop, zmqstream
# Import db functions to process the message
import db

# zeroMQ.py deals with the communication between a zero message queue

def startListenToMessageQueue(subscribe_ports, subscribe_IP, subscribe_topic):
    # Usage:
    #       This function starts the subscriber for our application that will listen to the
    #       address and ports specified in the zeroMQ_settings.py, it will spawn a callback when we
    #       received anything relevant to our topic.
    # Arguments:
    #       None
    # Return:
    #       None

    # Get zmq context
    context = zmq.Context()

    # Get the context socket
    socket_sub = context.socket(zmq.SUB)

    # Connect to multiple subscriber ports
    for ports in subscribe_ports:
        socket_sub.connect("tcp://"+str(subscribe_IP)+":"+str(ports))

    # Subscribe to our relevant topics
    for topic in subscribe_topic:
        socket_sub.setsockopt(zmq.SUBSCRIBE, topic)

    # Setup ZMQ Stream with our socket
    stream_sub = zmqstream.ZMQStream(socket_sub)

    # When we recieve our data, we will process the data by using a callback
    stream_sub.on_recv(processMessage)

    # Print the Information to Console
    print "Connected to publisher with IP:" + \
          str(subscribe_IP) + ", Port" + str(subscribe_ports) + ", Topic:" + str(subscribe_topic)

def processMessage(message):
    # Usage:
    #       This function processes the data using a callback format. The on_recv will call this function
    #       and populate the message variable with the data that we recieved through the message queue
    # Arguments:
    #       message: a string containing the data that we recieved from the message queue
    # Return:
    #       None

    # Process the message with an executor, and use the addData function in our db to process the message
    executors.executor.submit(db.addData, message)

示例 (executors.py):

# Import futures library
from concurrent import futures

# executors.py will create our threadpools, and this can be shared around different python files
# which will not re-create 10 threadpools when we call it.
# we can a handful of executors for running synchronous tasks

# Create a 10 thread threadpool that we can use to call any synchronous/blocking functions
executor = futures.ThreadPoolExecutor(10)

示例 (zeroMQ_settings.py):

# zeroMQ_settings.py keep the settings for zeroMQ, for example port, IP, and topics that
# we need to subscribe

# Set the Port to 5558
server_subscriber_ports = ["5556", "5558"]

# Set IP to localhost
server_subscriber_IP = "localhost"

# Set Message to Subscribe: metrics.dat
server_subscriber_topic_metrics = "metrics.dat"

# Set Message to Subscribe: test-010
server_subscribe_topics_test_010 = "test-010"

# List of Subscriptions
server_subscribe_list = [server_subscriber_topic_metrics, server_subscribe_topics_test_010]

特别感谢@dano