Flask - 作业不是 运行 作为后台进程

Flask - job not running as a background process

我正在尝试 运行 一个 Flask 应用程序,其中包含:

  1. 即时生成 API 个请求
  2. 将每个请求上传到 SQLalchemy 数据库
  3. 运行 作业 12 作为后台进程

为此,我有以下代码:

import concurrent.futures
import queue
from concurrent.futures import ThreadPoolExecutor

from flask import Flask, current_app

app = Flask(__name__)
q = queue.Queue()


def build_cache():
    # 1. Yielding API requests on the fly
    track_and_features = spotify.query_tracks()  # <- a generator
    while True:
        q.put(next(track_and_features))


def upload_cache(tracks_and_features):
    # 2. Uploading each request to a `SQLalchemy` database
    with app.app_context():
        Upload_Tracks(filtered_dataset=track_and_features)

    return "UPLOADING TRACKS TO DATABASE"


@app.route("/cache")
def cache():
    # 3. Do `1` and `2` as a background process
    with concurrent.futures.ThreadPoolExecutor() as executor:

        future_to_track = {executor.submit(build_cache): "TRACKER DONE"}

        while future_to_track:
            # check for status of the futures which are currently working
            done, not_done = concurrent.futures.wait(
                future_to_track,
                timeout=0.25,
                return_when=concurrent.futures.FIRST_COMPLETED,
            )

            # if there is incoming work, start a new future
            while not q.empty():

                # fetch a track from the queue
                track = q.get()

                # Start the load operation and mark the future with its TRACK
                future_to_track[executor.submit(upload_cache, track)] = track
            # process any completed futures
            for future in done:
                track = future_to_track[future]
                try:
                    data = future.result()
                except Exception as exc:
                    print("%r generated an exception: %s" % (track, exc))

                del future_to_track[future]

    return "Cacheing playlist in the background..."

以上所有工作,但不是作为后台进程。当 cache() 被调用时,应用程序挂起,只有在进程完成后才会恢复。

我运行它与gunicorn -c gconfig.py app:app -w 4 --threads 12

我做错了什么?


EDIT: If simplify things in order do debug this, and write simply:

# 1st background process
def build_cache():
    # only ONE JOB
    tracks_and_features = spotify.query_tracks()  # <- not a generator
    while True:
        print(next(tracks_and_features))


# background cache
@app.route("/cache")
def cache():
    executor.submit(build_cache)
    return "Cacheing playlist in the background..."

然后进程 运行 在后台运行。

但是,如果我添加另一份工作:

def build_cache():

    tracks_and_features = spotify.query_tracks()
    while True:
        # SQLalchemy db
        Upload_Tracks(filtered_dataset=next(tracks_and_features))

后台又不能用了。

简而言之:

后台只有在我一次 运行 一个作业时才有效(这首先是使用队列的想法背后的限制).

好像问题出在将后台进程绑定到 SQLalchemy 上,不知道。完全迷失在这里。

仍然不确定你的意思

I mean the app waits for all requests to be made at login and only then goes to homepage. It should go right away to homepage with requests being made at background

这里有几个问题:

  • 您的队列对于进程是全局的每个 gunicorn worker 只有一个队列;您可能希望将队列绑定到您的请求,以便多个请求不会在内存中共享同一个队列。考虑使用 context locals
  • 如果 UploadTracks 正在写入数据库,则 table 上可能有锁。检查您的索引并检查数据库中的锁等待。
  • SQLAlchemy 可能配置了一个小 connection pool,第二个 UploadTracks 正在等待第一个 return 它的连接。

在您的第一个示例中,端点在 returning 之前等待所有 futures 完成,而在您的第二个示例中,端点 return 在向执行程序提交任务后立即完成。如果你想让flask在任务还在后台线程运行时快速响应,去掉with concurrent.futures.ThreadPoolExecutor() as executor:并在模块顶部构建一个全局线程池。

使用 with,上下文管理器在退出之前等待所有提交的任务,但我不确定这是否是您的主要问题。

尝试在路由处理程序之外创建 ThreadPoolExecutor

import time
from concurrent.futures import ThreadPoolExecutor

from flask import Flask


def foo(*args):
    while True:
        print("foo", args)
        time.sleep(10)


app = Flask(__name__)

executor = ThreadPoolExecutor()


@app.route("/cache")
def cache():
    executor.submit(foo, "1")
    executor.submit(foo, "2")
    return "in cache"