Celery:检查任务是否完成以发送电子邮件至
Celery: check if a task is completed to send an email to
我是 celery 的新手,总体来说 python 菜鸟。我一定是在研究过程中偶然发现了正确的解决方案,但我似乎不明白我需要为看似简单的案例场景做些什么。
我跟着following guide学习了flask+celery
我的理解:
关于如何在第一个任务完成后触发任务,我似乎明显遗漏了一些东西。我尝试使用回调,使用循环,甚至尝试使用 Celery Flower 和 Celery beat 来意识到这与我正在做的事情无关...
目标:
填写表格后,我想发送一封带附件的电子邮件(任务结果)或发送一封失败的电子邮件。不必想知道我的用户在应用程序上做什么(无 HTTP 请求)
我的代码:
class ClassWithTheTask:
def __init__(self, filename, proxies):
# do stuff until a variable results is created
self.results = 'this contains my result'
@app.route('/', methods=['GET', 'POST'])
@app.route('/index', methods=['GET', 'POST'])
def index():
form = MyForm()
if form.validate_on_submit():
# ...
# the task
my_task = task1.delay(file_path, proxies)
return redirect(url_for('taskstatus', task_id=my_task.id, filename=filename, email=form.email.data))
return render_template('index.html',
form=form)
@celery.task(bind=True)
def task1(self, filepath, proxies):
task = ClassWithTheTask(filepath, proxies)
return results
@celery.task
def send_async_email(msg):
"""Background task to send an email with Flask-Mail."""
with app.app_context():
mail.send(msg)
@app.route('/status/<task_id>/<filename>/<email>')
def taskstatus(task_id, filename, email):
task = task1.AsyncResult(task_id)
if task.state == 'PENDING':
# job did not start yet
response = {
'state': task.state,
'status': 'Pending...'
}
elif task.state != 'FAILURE':
response = {
'state': task.state,
'status': task.info.get('status', '')
}
if 'results' in task.info:
response['results'] = task.info['results']
response['untranslated'] = task.info['untranslated']
msg = Message('Task Complete for %s !' % filename,
recipients=[email])
msg.body = 'blabla'
with app.open_resource(response['results']) as fp:
msg.attach(response['results'], "text/csv", fp.read())
with app.open_resource(response['untranslated']) as fp:
msg.attach(response['untranslated'], "text/csv", fp.read())
# the big problem here is that it will send the email only if the user refreshes the page and get the 'SUCCESS' status.
send_async_email.delay(msg)
flash('task finished. sent an email.')
return redirect(url_for('index'))
else:
# something went wrong in the background job
response = {
'state': task.state,
'status': str(task.info), # this is the exception raised
}
return jsonify(response)
这似乎完全不对:
def task1(self, filepath, proxies):
task = ClassWithTheTask(filepath, proxies)
return results
代码中前面的 my_task = task1.delay(file_path, proxies)
行表明您想要 return task
但您 return results
未在任何地方定义。 (ClassWithTheTask
也未定义)。此代码会崩溃,您的任务将永远不会执行。
我不明白你的状态检查方法的目标。无论如何,您所描述的可以通过这种方式完成。
if form.validate_on_submit():
# ...
# the task
my_task = (
task1.s(file_path, proxies).set(link_error=send_error_email.s(filename, error))
| send_async_email.s()
).delay()
return redirect(url_for('taskstatus', task_id=my_task.id, filename=filename, email=form.email.data))
那么您的错误任务将如下所示。正常任务可以保持原样。
@celery.task
def send_error_email(task_id, filename, email):
task = AsyncResult(task_id)
.....
这里发生的是你使用的是链条。您告诉 Celery 运行 您的 task1
,如果成功完成则 运行 send_async_email
,如果失败 运行 send_error_email
。这应该可行,但您可能需要调整参数,将其视为伪代码。
我是 celery 的新手,总体来说 python 菜鸟。我一定是在研究过程中偶然发现了正确的解决方案,但我似乎不明白我需要为看似简单的案例场景做些什么。 我跟着following guide学习了flask+celery
我的理解:
关于如何在第一个任务完成后触发任务,我似乎明显遗漏了一些东西。我尝试使用回调,使用循环,甚至尝试使用 Celery Flower 和 Celery beat 来意识到这与我正在做的事情无关...
目标:
填写表格后,我想发送一封带附件的电子邮件(任务结果)或发送一封失败的电子邮件。不必想知道我的用户在应用程序上做什么(无 HTTP 请求)
我的代码:
class ClassWithTheTask:
def __init__(self, filename, proxies):
# do stuff until a variable results is created
self.results = 'this contains my result'
@app.route('/', methods=['GET', 'POST'])
@app.route('/index', methods=['GET', 'POST'])
def index():
form = MyForm()
if form.validate_on_submit():
# ...
# the task
my_task = task1.delay(file_path, proxies)
return redirect(url_for('taskstatus', task_id=my_task.id, filename=filename, email=form.email.data))
return render_template('index.html',
form=form)
@celery.task(bind=True)
def task1(self, filepath, proxies):
task = ClassWithTheTask(filepath, proxies)
return results
@celery.task
def send_async_email(msg):
"""Background task to send an email with Flask-Mail."""
with app.app_context():
mail.send(msg)
@app.route('/status/<task_id>/<filename>/<email>')
def taskstatus(task_id, filename, email):
task = task1.AsyncResult(task_id)
if task.state == 'PENDING':
# job did not start yet
response = {
'state': task.state,
'status': 'Pending...'
}
elif task.state != 'FAILURE':
response = {
'state': task.state,
'status': task.info.get('status', '')
}
if 'results' in task.info:
response['results'] = task.info['results']
response['untranslated'] = task.info['untranslated']
msg = Message('Task Complete for %s !' % filename,
recipients=[email])
msg.body = 'blabla'
with app.open_resource(response['results']) as fp:
msg.attach(response['results'], "text/csv", fp.read())
with app.open_resource(response['untranslated']) as fp:
msg.attach(response['untranslated'], "text/csv", fp.read())
# the big problem here is that it will send the email only if the user refreshes the page and get the 'SUCCESS' status.
send_async_email.delay(msg)
flash('task finished. sent an email.')
return redirect(url_for('index'))
else:
# something went wrong in the background job
response = {
'state': task.state,
'status': str(task.info), # this is the exception raised
}
return jsonify(response)
这似乎完全不对:
def task1(self, filepath, proxies):
task = ClassWithTheTask(filepath, proxies)
return results
代码中前面的 my_task = task1.delay(file_path, proxies)
行表明您想要 return task
但您 return results
未在任何地方定义。 (ClassWithTheTask
也未定义)。此代码会崩溃,您的任务将永远不会执行。
我不明白你的状态检查方法的目标。无论如何,您所描述的可以通过这种方式完成。
if form.validate_on_submit():
# ...
# the task
my_task = (
task1.s(file_path, proxies).set(link_error=send_error_email.s(filename, error))
| send_async_email.s()
).delay()
return redirect(url_for('taskstatus', task_id=my_task.id, filename=filename, email=form.email.data))
那么您的错误任务将如下所示。正常任务可以保持原样。
@celery.task
def send_error_email(task_id, filename, email):
task = AsyncResult(task_id)
.....
这里发生的是你使用的是链条。您告诉 Celery 运行 您的 task1
,如果成功完成则 运行 send_async_email
,如果失败 运行 send_error_email
。这应该可行,但您可能需要调整参数,将其视为伪代码。