Celery:检查任务是否完成以发送电子邮件至

Celery: check if a task is completed to send an email to

我是 celery 的新手,总体来说 python 菜鸟。我一定是在研究过程中偶然发现了正确的解决方案,但我似乎不明白我需要为看似简单的案例场景做些什么。 我跟着following guide学习了flask+celery

我的理解:

关于如何在第一个任务完成后触发任务,我似乎明显遗漏了一些东西。我尝试使用回调,使用循环,甚至尝试使用 Celery Flower 和 Celery beat 来意识到这与我正在做的事情无关...

目标:

填写表格后,我想发送一封带附件的电子邮件(任务结果)或发送一封失败的电子邮件。不必想知道我的用户在应用程序上做什么(无 HTTP 请求)

我的代码:

class ClassWithTheTask:
    def __init__(self, filename, proxies):
        # do stuff until a variable results is created
        self.results = 'this contains my result'

@app.route('/', methods=['GET', 'POST'])
@app.route('/index', methods=['GET', 'POST'])
def index():
    form = MyForm()

    if form.validate_on_submit():
        # ...
        # the task
        my_task = task1.delay(file_path, proxies)
        return redirect(url_for('taskstatus', task_id=my_task.id, filename=filename, email=form.email.data))

    return render_template('index.html',
                           form=form)

@celery.task(bind=True)
def task1(self, filepath, proxies):
    task = ClassWithTheTask(filepath, proxies)
    return results

@celery.task
def send_async_email(msg):
    """Background task to send an email with Flask-Mail."""
    with app.app_context():
        mail.send(msg)

@app.route('/status/<task_id>/<filename>/<email>')
def taskstatus(task_id, filename, email):
    task = task1.AsyncResult(task_id)

    if task.state == 'PENDING':
        # job did not start yet
        response = {
            'state': task.state,
            'status': 'Pending...'
        }
    elif task.state != 'FAILURE':
        response = {
            'state': task.state,
            'status': task.info.get('status', '')
        }
        if 'results' in task.info:
            response['results'] = task.info['results']
            response['untranslated'] = task.info['untranslated']

        msg = Message('Task Complete for %s !' % filename,
                      recipients=[email])

        msg.body = 'blabla'
        with app.open_resource(response['results']) as fp:
            msg.attach(response['results'], "text/csv", fp.read())
        with app.open_resource(response['untranslated']) as fp:
            msg.attach(response['untranslated'], "text/csv", fp.read())

        # the big problem here is that it will send the email only if the user refreshes the page and get the 'SUCCESS' status.

        send_async_email.delay(msg)
        flash('task finished. sent an email.')
        return redirect(url_for('index'))
    else:
        # something went wrong in the background job
        response = {
            'state': task.state,
            'status': str(task.info),  # this is the exception raised
        }
    return jsonify(response)

这似乎完全不对:

def task1(self, filepath, proxies):
    task = ClassWithTheTask(filepath, proxies)
    return results

代码中前面的 my_task = task1.delay(file_path, proxies) 行表明您想要 return task 但您 return results 未在任何地方定义。 (ClassWithTheTask 也未定义)。此代码会崩溃,您的任务将永远不会执行。

我不明白你的状态检查方法的目标。无论如何,您所描述的可以通过这种方式完成。

if form.validate_on_submit():
        # ...
        # the task
        my_task = (
                    task1.s(file_path, proxies).set(link_error=send_error_email.s(filename, error))
                    | send_async_email.s()
                  ).delay()
        return redirect(url_for('taskstatus', task_id=my_task.id, filename=filename, email=form.email.data))

那么您的错误任务将如下所示。正常任务可以保持原样。

@celery.task
def send_error_email(task_id, filename, email):
    task = AsyncResult(task_id)
    .....

这里发生的是你使用的是链条。您告诉 Celery 运行 您的 task1,如果成功完成则 运行 send_async_email,如果失败 运行 send_error_email。这应该可行,但您可能需要调整参数,将其视为伪代码。