Asyncio 运行 Dash (Flask) 服务器与另一个协程并发

Asyncio run Dash (Flask) server with another coroutine concurrently

我创建了一个 dash 应用程序来显示另一个代码正在收集的信息,我想 运行 它们同时使用 Python 中的 asyncio 模块。

我的代码正在使用异步函数,而 Dash 应用程序(基于 Flask)在服务时阻止执行任何其他内容。

我不确定这是否需要打开更多线程。

这是我当前的代码,其中只有 运行 主协程。

async def main():
    some code here...

    while True:
        try:
            await client.handle_message()
        except ConnectionClosedError as error:
            logger.error(error)
    
        for strategy in strategies:
            await asyncio.create_task(...)
            some code here...

async def run_dashboard():
    app = create_app()
    app.run_server('0.0.0.0', 5000, debug=False)


if __name__ == '__main__':
    some code here...

    # Currently just runs the main coroutine
    asyncio.run(main())

如何同时 运行 main 和 run_dashboard?

运行 run_dashboard 在后台线程中。参考 document.

async def run():
    await asyncio.gather(
        asyncio.to_thread(run_dashboard),
        main()
    )

asyncio.run(run())

注意 asyncio.to_thread是3.9版本的新功能。 python 3.9之前的版本,复制以下代码 threads.py.

坦率地说,将 Dash(Flask)与一些异步工作结合在一个进程中是不好的设计,考虑运行不同进程中的 Flask 和异步活动(即应用程序)。

不过,如果你还是想运行全部在一个过程中,我可以给你下面的工作示例,请关注评论并询问你是否有任何问题:

from flask import Flask, jsonify
import asyncio
from threading import Thread

# ** Async Part **


async def some_print_task():
    """Some async function"""
    while True:
        await asyncio.sleep(2)
        print("Some Task")


async def another_task():
    """Another async function"""
    while True:
        await asyncio.sleep(3)
        print("Another Task")


async def async_main():
    """Main async function"""
    await asyncio.gather(some_print_task(), another_task())


def async_main_wrapper():
    """Not async Wrapper around async_main to run it as target function of Thread"""
    asyncio.run(async_main())

# *** Flask Part ***:


app = Flask(__name__)


@app.route("/", methods=["GET"])
def index():
    """just some function"""
    return jsonify({"hello": "world"})


if __name__ == '__main__':
    # run all async stuff in another thread
    th = Thread(target=async_main_wrapper)
    th.start()
    # run Flask server
    app.run(host="0.0.0.0", port=9999)
    th.join()

如果您真的希望它 运行 全部在一个过程中进行,以下方法对我有用:

from functools import partial
from threading import Thread    


partial_run = partial(app.run, host="0.0.0.0", port=5000, debug=True, use_reloader=False)
t = Thread(target=partial_run)
t.start()
asyncio.run(main())

这是一些代码 运行 一个 dash 应用程序(收集飞行数据 - 由 Jose Portilla - Udemy 提供)+ 运行 dash 应用程序的线程和一些异步任务。


from flask import Flask, jsonify
import asyncio
from threading import Thread


# Dash
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
import requests
import plotly.graph_objects as go 


# ** Async Part **

async def some_print_task():
    """Some async function"""
    while True:
        await asyncio.sleep(2)
        print("Some Task")


async def another_task():
    """Another async function"""
    while True:
        await asyncio.sleep(3)
        print("Another Task")


async def async_main():
    """Main async function"""
    await asyncio.gather(some_print_task(), another_task())


def async_main_wrapper():
    """Not async Wrapper around async_main to run it as target function of Thread"""
    asyncio.run(async_main())


# *** Dash Part ***:
app = dash.Dash()

app.layout = html.Div([

    # html.Div([
    #     html.Iframe(src="https://www.flightradar24.com",
    #                 height=500,width=200)
    # ]),
    html.Div([
        html.Pre(id='counter-text',children='Active Flights Worldwide'),
        dcc.Graph(id='live-update-graph',style={'width':1200}),
        dcc.Interval(   id='interval-component',
                        interval=6000,
                        n_intervals=0)
    ])
])
counter_list = []

@app.callback(  Output('counter-text','children'),
                [Input('interval-component','n_intervals')])
def update_layout(n):
    url = "https://data-live.flightradar24.com/zones/fcgi/feed.js?faa=1&mlat=1&flarm=1&adsb=1&gnd=1&air=1&vehicles=1&estimated=1&stats=1"
    res = requests.get(url, headers={'User-Agent' : 'Mozilla/5.0'})  # A fake header is necessary to access the site
    data = res.json()
    counter = 0
    for element in data["stats"]["total"]:
        counter += data["stats"]["total"][element]

    counter_list.append(counter)
    return "Active flights Worldwide: {}".format(counter)

@app.callback(  Output('live-update-graph','figure'),
                [Input('interval-component','n_intervals')])
def update_graph(n):
    fig = go.Figure(data=[
            go.Scatter(x=list(range(len(counter_list))),
                        y=counter_list,
                        mode='lines+markers')
    ])

    return fig

if __name__ == '__main__':
    # run all async stuff in another thread
    th = Thread(target=async_main_wrapper)
    th.start()
    # run Flask server
    # app.run(host="0.0.0.0", port=9999)
    app.run_server(debug=True)
    th.join()