使用 Flask 的 "app" 单例到 Dask Scheduler/Workers

Use Flask's "app" singleton to Dask Scheduler/Workers

案例:

我们有一些耗时的 functional/integration 测试,这些测试利用 Flask 的 current_app 进行配置(全局变量等)和一些日志记录。
我们正在尝试在集群上分发和并行化这些测试(目前是从 Dask's Docker image 创建的本地 "cluster")。

问题:

让我们假设以下示例:

一个耗时函数:

def will_take_my_time(n)
    # Add the 'TAKE_YOUR_TIME' in the config in how many seconds you want
    time.sleep(current_app.config['TAKE_YOUR_TIME'])
    return n

一个耗时的测试:

def need_my_time_test(counter=None):
    print(f"Test No. {will_take_my_time(counter)}")

创建 Dask Client 以连接到集群并执行 need_my_time_test:

的 10 个测试的 Flask CLI 命令
@app.cli.command()
def itests(extended):
    with Client(processes=False) as dask_client:
        futures = dask_client.map(need_my_time_test, range(10))
        print(f"Futures: {futures}")
        print(f"Gathered: {dask_client.gather(futures)}")

编辑: 为了方便起见,让我们添加一个应用程序工厂以获得更容易重现的示例:

def create_app():
    app = Flask(__name__)
    app.config.from_mapping(
        SECRET_KEY='dev',
        DEBUG=True,
    )

    @app.route('/hello')
    def hello():
        return 'Hello, World!'

    @app.cli.command()
    def itests(extended):
        with Client(processes=False) as dask_client:
            futures = dask_client.map(need_my_time_test, range(10))
            print(f"Futures: {futures}")
            print(f"Gathered: {dask_client.gather(futures)}")

将以上内容与 flask itests 结合使用,我们 运行 出现以下错误(描述为 here):

RuntimeError: Working outside of application context.

This typically means that you attempted to use functionality that needed to interface with the current application object in some way. To solve this, set up an application context with app.app_context().

我们已经尝试过:

没有用。

题目:

使用 current_app 代理时,假定 Flask 应用是在使用代理的同一进程中创建的。

提交给worker的任务不是运行的情况。 这些任务的执行与在提交任务的进程中创建的 Flask 应用程序隔离。

在任务中,定义 flask 应用程序并在那里提供应用程序上下文。

import time

from flask import Flask
from dask.distributed import Client


def _create_app():
    app = Flask(__name__)
    app.config.from_mapping(
        SECRET_KEY='dev',
        DEBUG=True,
        TAKE_YOUR_TIME=0.2
    )
    return app

def will_take_my_time(n):
    # Add the 'TAKE_YOUR_TIME' in the config in how many seconds you want
    app = _create_app()
    with app.app_context():
        time.sleep(app.config['TAKE_YOUR_TIME'])
    return n

def need_my_time_test(counter=None):
    print(f"Test No. {will_take_my_time(counter)}")

def create_app():
    app = _create_app()

    @app.route('/hello')
    def hello():
        return 'Hello, World!'

    @app.cli.command()
    def itests():
        with Client(processes=False) as dask_client:
            futures = dask_client.map(need_my_time_test, range(10))
            print(f"Futures: {futures}")
            print(f"Gathered: {dask_client.gather(futures)}")
    return app

app = create_app()