Flask - 作业不是 运行 作为后台进程
Flask - job not running as a background process
我正在尝试 运行 一个 Flask
应用程序,其中包含:
- 即时生成 API 个请求
- 将每个请求上传到
SQLalchemy
数据库
- 运行 作业
1
和 2
作为后台进程
为此,我有以下代码:
import concurrent.futures
import queue
from concurrent.futures import ThreadPoolExecutor
from flask import Flask, current_app
app = Flask(__name__)
q = queue.Queue()
def build_cache():
# 1. Yielding API requests on the fly
track_and_features = spotify.query_tracks() # <- a generator
while True:
q.put(next(track_and_features))
def upload_cache(tracks_and_features):
# 2. Uploading each request to a `SQLalchemy` database
with app.app_context():
Upload_Tracks(filtered_dataset=track_and_features)
return "UPLOADING TRACKS TO DATABASE"
@app.route("/cache")
def cache():
# 3. Do `1` and `2` as a background process
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_track = {executor.submit(build_cache): "TRACKER DONE"}
while future_to_track:
# check for status of the futures which are currently working
done, not_done = concurrent.futures.wait(
future_to_track,
timeout=0.25,
return_when=concurrent.futures.FIRST_COMPLETED,
)
# if there is incoming work, start a new future
while not q.empty():
# fetch a track from the queue
track = q.get()
# Start the load operation and mark the future with its TRACK
future_to_track[executor.submit(upload_cache, track)] = track
# process any completed futures
for future in done:
track = future_to_track[future]
try:
data = future.result()
except Exception as exc:
print("%r generated an exception: %s" % (track, exc))
del future_to_track[future]
return "Cacheing playlist in the background..."
以上所有工作,但不是作为后台进程。当 cache()
被调用时,应用程序挂起,只有在进程完成后才会恢复。
我运行它与gunicorn -c gconfig.py app:app -w 4 --threads 12
我做错了什么?
EDIT: If simplify things in order do debug this, and write simply:
# 1st background process
def build_cache():
# only ONE JOB
tracks_and_features = spotify.query_tracks() # <- not a generator
while True:
print(next(tracks_and_features))
# background cache
@app.route("/cache")
def cache():
executor.submit(build_cache)
return "Cacheing playlist in the background..."
然后进程 运行 在后台运行。
但是,如果我添加另一份工作:
def build_cache():
tracks_and_features = spotify.query_tracks()
while True:
# SQLalchemy db
Upload_Tracks(filtered_dataset=next(tracks_and_features))
后台又不能用了。
简而言之:
后台只有在我一次 运行 一个作业时才有效(这首先是使用队列的想法背后的限制).
好像问题出在将后台进程绑定到 SQLalchemy 上,不知道。完全迷失在这里。
仍然不确定你的意思
I mean the app waits for all requests to be made at login and only then goes to homepage. It should go right away to homepage with requests being made at background
这里有几个问题:
- 您的队列对于进程是全局的即每个 gunicorn worker 只有一个队列;您可能希望将队列绑定到您的请求,以便多个请求不会在内存中共享同一个队列。考虑使用 context locals
- 如果
UploadTracks
正在写入数据库,则 table 上可能有锁。检查您的索引并检查数据库中的锁等待。
- SQLAlchemy 可能配置了一个小 connection pool,第二个
UploadTracks
正在等待第一个 return 它的连接。
在您的第一个示例中,端点在 returning 之前等待所有 futures 完成,而在您的第二个示例中,端点 return 在向执行程序提交任务后立即完成。如果你想让flask在任务还在后台线程运行时快速响应,去掉with concurrent.futures.ThreadPoolExecutor() as executor:
并在模块顶部构建一个全局线程池。
使用 with
,上下文管理器在退出之前等待所有提交的任务,但我不确定这是否是您的主要问题。
尝试在路由处理程序之外创建 ThreadPoolExecutor
。
import time
from concurrent.futures import ThreadPoolExecutor
from flask import Flask
def foo(*args):
while True:
print("foo", args)
time.sleep(10)
app = Flask(__name__)
executor = ThreadPoolExecutor()
@app.route("/cache")
def cache():
executor.submit(foo, "1")
executor.submit(foo, "2")
return "in cache"
我正在尝试 运行 一个 Flask
应用程序,其中包含:
- 即时生成 API 个请求
- 将每个请求上传到
SQLalchemy
数据库 - 运行 作业
1
和2
作为后台进程
为此,我有以下代码:
import concurrent.futures
import queue
from concurrent.futures import ThreadPoolExecutor
from flask import Flask, current_app
app = Flask(__name__)
q = queue.Queue()
def build_cache():
# 1. Yielding API requests on the fly
track_and_features = spotify.query_tracks() # <- a generator
while True:
q.put(next(track_and_features))
def upload_cache(tracks_and_features):
# 2. Uploading each request to a `SQLalchemy` database
with app.app_context():
Upload_Tracks(filtered_dataset=track_and_features)
return "UPLOADING TRACKS TO DATABASE"
@app.route("/cache")
def cache():
# 3. Do `1` and `2` as a background process
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_track = {executor.submit(build_cache): "TRACKER DONE"}
while future_to_track:
# check for status of the futures which are currently working
done, not_done = concurrent.futures.wait(
future_to_track,
timeout=0.25,
return_when=concurrent.futures.FIRST_COMPLETED,
)
# if there is incoming work, start a new future
while not q.empty():
# fetch a track from the queue
track = q.get()
# Start the load operation and mark the future with its TRACK
future_to_track[executor.submit(upload_cache, track)] = track
# process any completed futures
for future in done:
track = future_to_track[future]
try:
data = future.result()
except Exception as exc:
print("%r generated an exception: %s" % (track, exc))
del future_to_track[future]
return "Cacheing playlist in the background..."
以上所有工作,但不是作为后台进程。当 cache()
被调用时,应用程序挂起,只有在进程完成后才会恢复。
我运行它与gunicorn -c gconfig.py app:app -w 4 --threads 12
我做错了什么?
EDIT: If simplify things in order do debug this, and write simply:
# 1st background process
def build_cache():
# only ONE JOB
tracks_and_features = spotify.query_tracks() # <- not a generator
while True:
print(next(tracks_and_features))
# background cache
@app.route("/cache")
def cache():
executor.submit(build_cache)
return "Cacheing playlist in the background..."
然后进程 运行 在后台运行。
但是,如果我添加另一份工作:
def build_cache():
tracks_and_features = spotify.query_tracks()
while True:
# SQLalchemy db
Upload_Tracks(filtered_dataset=next(tracks_and_features))
后台又不能用了。
简而言之:
后台只有在我一次 运行 一个作业时才有效(这首先是使用队列的想法背后的限制).
好像问题出在将后台进程绑定到 SQLalchemy 上,不知道。完全迷失在这里。
仍然不确定你的意思
I mean the app waits for all requests to be made at login and only then goes to homepage. It should go right away to homepage with requests being made at background
这里有几个问题:
- 您的队列对于进程是全局的即每个 gunicorn worker 只有一个队列;您可能希望将队列绑定到您的请求,以便多个请求不会在内存中共享同一个队列。考虑使用 context locals
- 如果
UploadTracks
正在写入数据库,则 table 上可能有锁。检查您的索引并检查数据库中的锁等待。 - SQLAlchemy 可能配置了一个小 connection pool,第二个
UploadTracks
正在等待第一个 return 它的连接。
在您的第一个示例中,端点在 returning 之前等待所有 futures 完成,而在您的第二个示例中,端点 return 在向执行程序提交任务后立即完成。如果你想让flask在任务还在后台线程运行时快速响应,去掉with concurrent.futures.ThreadPoolExecutor() as executor:
并在模块顶部构建一个全局线程池。
使用 with
,上下文管理器在退出之前等待所有提交的任务,但我不确定这是否是您的主要问题。
尝试在路由处理程序之外创建 ThreadPoolExecutor
。
import time
from concurrent.futures import ThreadPoolExecutor
from flask import Flask
def foo(*args):
while True:
print("foo", args)
time.sleep(10)
app = Flask(__name__)
executor = ThreadPoolExecutor()
@app.route("/cache")
def cache():
executor.submit(foo, "1")
executor.submit(foo, "2")
return "in cache"