使用 Flask 的 "app" 单例到 Dask Scheduler/Workers
Use Flask's "app" singleton to Dask Scheduler/Workers
案例:
我们有一些耗时的 functional/integration 测试,这些测试利用 Flask 的 current_app
进行配置(全局变量等)和一些日志记录。
我们正在尝试在集群上分发和并行化这些测试(目前是从 Dask's Docker image 创建的本地 "cluster")。
问题:
让我们假设以下示例:
一个耗时函数:
def will_take_my_time(n)
# Add the 'TAKE_YOUR_TIME' in the config in how many seconds you want
time.sleep(current_app.config['TAKE_YOUR_TIME'])
return n
一个耗时的测试:
def need_my_time_test(counter=None):
print(f"Test No. {will_take_my_time(counter)}")
创建 Dask Client
以连接到集群并执行 need_my_time_test
:
的 10 个测试的 Flask CLI 命令
@app.cli.command()
def itests(extended):
with Client(processes=False) as dask_client:
futures = dask_client.map(need_my_time_test, range(10))
print(f"Futures: {futures}")
print(f"Gathered: {dask_client.gather(futures)}")
编辑: 为了方便起见,让我们添加一个应用程序工厂以获得更容易重现的示例:
def create_app():
app = Flask(__name__)
app.config.from_mapping(
SECRET_KEY='dev',
DEBUG=True,
)
@app.route('/hello')
def hello():
return 'Hello, World!'
@app.cli.command()
def itests(extended):
with Client(processes=False) as dask_client:
futures = dask_client.map(need_my_time_test, range(10))
print(f"Futures: {futures}")
print(f"Gathered: {dask_client.gather(futures)}")
将以上内容与 flask itests
结合使用,我们 运行 出现以下错误(描述为 here):
RuntimeError: Working outside of application context.
This typically means that you attempted to use functionality that
needed to interface with the current application object in some way.
To solve this, set up an application context with app.app_context().
我们已经尝试过:
- 在应用单例创建上推送
app_context
(app.app_context().push()
)。
- 在 CLI 命令上使用
with current_app.app_context():
以及一些使用 current_app
的函数。
- 通过 Dask
Variable
发送 app_context
但它无法序列化上下文。
没有用。
题目:
- 是否有关于我们应该尝试什么的建议(包含 "where" 将不胜感激)?
- 是否有我们尝试正确但误用的东西,我们应该以不同的方式重试?
使用 current_app
代理时,假定 Flask 应用是在使用代理的同一进程中创建的。
提交给worker的任务不是运行的情况。
这些任务的执行与在提交任务的进程中创建的 Flask 应用程序隔离。
在任务中,定义 flask 应用程序并在那里提供应用程序上下文。
import time
from flask import Flask
from dask.distributed import Client
def _create_app():
app = Flask(__name__)
app.config.from_mapping(
SECRET_KEY='dev',
DEBUG=True,
TAKE_YOUR_TIME=0.2
)
return app
def will_take_my_time(n):
# Add the 'TAKE_YOUR_TIME' in the config in how many seconds you want
app = _create_app()
with app.app_context():
time.sleep(app.config['TAKE_YOUR_TIME'])
return n
def need_my_time_test(counter=None):
print(f"Test No. {will_take_my_time(counter)}")
def create_app():
app = _create_app()
@app.route('/hello')
def hello():
return 'Hello, World!'
@app.cli.command()
def itests():
with Client(processes=False) as dask_client:
futures = dask_client.map(need_my_time_test, range(10))
print(f"Futures: {futures}")
print(f"Gathered: {dask_client.gather(futures)}")
return app
app = create_app()
案例:
我们有一些耗时的 functional/integration 测试,这些测试利用 Flask 的 current_app
进行配置(全局变量等)和一些日志记录。
我们正在尝试在集群上分发和并行化这些测试(目前是从 Dask's Docker image 创建的本地 "cluster")。
问题:
让我们假设以下示例:
一个耗时函数:
def will_take_my_time(n)
# Add the 'TAKE_YOUR_TIME' in the config in how many seconds you want
time.sleep(current_app.config['TAKE_YOUR_TIME'])
return n
一个耗时的测试:
def need_my_time_test(counter=None):
print(f"Test No. {will_take_my_time(counter)}")
创建 Dask Client
以连接到集群并执行 need_my_time_test
:
@app.cli.command()
def itests(extended):
with Client(processes=False) as dask_client:
futures = dask_client.map(need_my_time_test, range(10))
print(f"Futures: {futures}")
print(f"Gathered: {dask_client.gather(futures)}")
编辑: 为了方便起见,让我们添加一个应用程序工厂以获得更容易重现的示例:
def create_app():
app = Flask(__name__)
app.config.from_mapping(
SECRET_KEY='dev',
DEBUG=True,
)
@app.route('/hello')
def hello():
return 'Hello, World!'
@app.cli.command()
def itests(extended):
with Client(processes=False) as dask_client:
futures = dask_client.map(need_my_time_test, range(10))
print(f"Futures: {futures}")
print(f"Gathered: {dask_client.gather(futures)}")
将以上内容与 flask itests
结合使用,我们 运行 出现以下错误(描述为 here):
RuntimeError: Working outside of application context.
This typically means that you attempted to use functionality that needed to interface with the current application object in some way. To solve this, set up an application context with app.app_context().
我们已经尝试过:
- 在应用单例创建上推送
app_context
(app.app_context().push()
)。 - 在 CLI 命令上使用
with current_app.app_context():
以及一些使用current_app
的函数。 - 通过 Dask
Variable
发送app_context
但它无法序列化上下文。
没有用。
题目:
- 是否有关于我们应该尝试什么的建议(包含 "where" 将不胜感激)?
- 是否有我们尝试正确但误用的东西,我们应该以不同的方式重试?
使用 current_app
代理时,假定 Flask 应用是在使用代理的同一进程中创建的。
提交给worker的任务不是运行的情况。 这些任务的执行与在提交任务的进程中创建的 Flask 应用程序隔离。
在任务中,定义 flask 应用程序并在那里提供应用程序上下文。
import time
from flask import Flask
from dask.distributed import Client
def _create_app():
app = Flask(__name__)
app.config.from_mapping(
SECRET_KEY='dev',
DEBUG=True,
TAKE_YOUR_TIME=0.2
)
return app
def will_take_my_time(n):
# Add the 'TAKE_YOUR_TIME' in the config in how many seconds you want
app = _create_app()
with app.app_context():
time.sleep(app.config['TAKE_YOUR_TIME'])
return n
def need_my_time_test(counter=None):
print(f"Test No. {will_take_my_time(counter)}")
def create_app():
app = _create_app()
@app.route('/hello')
def hello():
return 'Hello, World!'
@app.cli.command()
def itests():
with Client(processes=False) as dask_client:
futures = dask_client.map(need_my_time_test, range(10))
print(f"Futures: {futures}")
print(f"Gathered: {dask_client.gather(futures)}")
return app
app = create_app()