烧瓶芹菜任务不起作用
Flask celery tasks not working
我配置了我的项目,参考这个答案:
How to use Flask-SQLAlchemy in a Celery task
我的 extension.py
文件:
import flask
from flask.ext.sqlalchemy import SQLAlchemy
from config import BaseConfig
from celery import Celery
from flask_mail import Mail
from celery.schedules import crontab
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
mail = Mail()
db = SQLAlchemy()
settings = BaseConfig()
celery = FlaskCelery()
然后在我的 app_settings.py
中,我创建了这个应用程序:
app = Flask('app', instance_relative_config=True)
并配置芹菜:
celery.init_app(app)
我 运行 烧瓶项目 python manage.py run
:
app.run(
debug=settings.get('DEBUG', False),
host=settings.get('HOST', '127.0.0.1'),
port=settings.get('PORT', 5000)
)
和运行芹菜:
celery -A manage.celery worker --beat -l debug
芹菜日志看起来不错:
[tasks]
. app.api.tasks.spin_file
. app.main.tasks.send_async_email
. celery.backend_cleanup
. celery.chain
...
然后在views.py
,我调用这个任务:
send_async_email.delay(*args, **kwargs)
但所有任务都被 Celery 忽略了。没有任何反应,没有错误,没有警告。没有什么。我做错了什么?
编辑:当我用这个命令启动芹菜时:celery -A manage.celery worker --beat -l debug
我收到以下警告:
[2015-09-21 10:04:32,220: WARNING/MainProcess] /home/.virtualenvs/myproject/local/lib/python2.7/site-packages/celery/app/control.py:36: DuplicateNodenameWarning: Received multiple replies from node name: 'name'.
Please make sure you give each node a unique nodename using the `-n` option.
pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),
我不确定这是否对您有帮助,但每当我需要 celery 时,我都会在我的许多项目中使用此代码:
from flask import Flask, request, jsonify as jsn
from celery import Celery
app = Flask(__name__)
app.config.update(dict(
SECRET_KEY='blabla'
)
)
# Celery configuration
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'database'
app.config['CELERY_RESULT_DBURI'] = 'sqlite:///temp.db'
app.config['CELERY_TRACK_STARTED'] = True
app.config['CELERY_SEND_EVENTS'] = True
# Initialize Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def do_something(data):
from celery import current_task
import os
import subprocess
with app.app_context():
#run some bash script with some params in my case
然后我 运行 芹菜与主管通过:
#!/bin/bash
cd /project/location && . venv/bin/activate && celery worker -A appname.celery --loglevel=info --purge #appname is my main flask file
当然在我的路线中我有类似
的东西
@app.route('/someroute', methods=["POST"])
def someroute():
result = do_something.delay(data)
print result.id
我配置了我的项目,参考这个答案: How to use Flask-SQLAlchemy in a Celery task
我的 extension.py
文件:
import flask
from flask.ext.sqlalchemy import SQLAlchemy
from config import BaseConfig
from celery import Celery
from flask_mail import Mail
from celery.schedules import crontab
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
mail = Mail()
db = SQLAlchemy()
settings = BaseConfig()
celery = FlaskCelery()
然后在我的 app_settings.py
中,我创建了这个应用程序:
app = Flask('app', instance_relative_config=True)
并配置芹菜:
celery.init_app(app)
我 运行 烧瓶项目 python manage.py run
:
app.run(
debug=settings.get('DEBUG', False),
host=settings.get('HOST', '127.0.0.1'),
port=settings.get('PORT', 5000)
)
和运行芹菜:
celery -A manage.celery worker --beat -l debug
芹菜日志看起来不错:
[tasks]
. app.api.tasks.spin_file
. app.main.tasks.send_async_email
. celery.backend_cleanup
. celery.chain
...
然后在views.py
,我调用这个任务:
send_async_email.delay(*args, **kwargs)
但所有任务都被 Celery 忽略了。没有任何反应,没有错误,没有警告。没有什么。我做错了什么?
编辑:当我用这个命令启动芹菜时:celery -A manage.celery worker --beat -l debug
我收到以下警告:
[2015-09-21 10:04:32,220: WARNING/MainProcess] /home/.virtualenvs/myproject/local/lib/python2.7/site-packages/celery/app/control.py:36: DuplicateNodenameWarning: Received multiple replies from node name: 'name'.
Please make sure you give each node a unique nodename using the `-n` option.
pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),
我不确定这是否对您有帮助,但每当我需要 celery 时,我都会在我的许多项目中使用此代码:
from flask import Flask, request, jsonify as jsn
from celery import Celery
app = Flask(__name__)
app.config.update(dict(
SECRET_KEY='blabla'
)
)
# Celery configuration
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'database'
app.config['CELERY_RESULT_DBURI'] = 'sqlite:///temp.db'
app.config['CELERY_TRACK_STARTED'] = True
app.config['CELERY_SEND_EVENTS'] = True
# Initialize Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def do_something(data):
from celery import current_task
import os
import subprocess
with app.app_context():
#run some bash script with some params in my case
然后我 运行 芹菜与主管通过:
#!/bin/bash
cd /project/location && . venv/bin/activate && celery worker -A appname.celery --loglevel=info --purge #appname is my main flask file
当然在我的路线中我有类似
的东西@app.route('/someroute', methods=["POST"])
def someroute():
result = do_something.delay(data)
print result.id