Django 周期性任务芹菜
Django periodic task celery
我是 Django 和 Celery 的新手。
请帮助我,我不明白它是如何工作的。我希望每 1 分钟在我的控制台中看到 "Hello world"。
tasks.py
from celery import Celery
from celery.schedules import crontab
from celery.task import periodic_task
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@periodic_task(run_every=(crontab(hour="*", minute=1)), ignore_result=True)
def hello_world():
return "Hello World"
celery.py
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "test.settings.local")
app = Celery('test')
app.config_from_object('celeryconfig')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
init.py
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
celeryconfig.py
broker_url = 'redis://localhost:6379'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
这是一个简单的芹菜设置和代码,但不起作用=\
celery -A tasks worker -B
什么也没发生。告诉我我做错了什么?谢谢!
您需要在 celeryconfig.py
中设置 beat_schedule
from celery.schedules import crontab
beat_schedule = {
'send_each_minute': {
'task': 'your.module.path.function',
'schedule': crontab(),
'args': (),
},
}
我是 Django 和 Celery 的新手。 请帮助我,我不明白它是如何工作的。我希望每 1 分钟在我的控制台中看到 "Hello world"。
tasks.py
from celery import Celery
from celery.schedules import crontab
from celery.task import periodic_task
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@periodic_task(run_every=(crontab(hour="*", minute=1)), ignore_result=True)
def hello_world():
return "Hello World"
celery.py
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "test.settings.local")
app = Celery('test')
app.config_from_object('celeryconfig')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
init.py
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
celeryconfig.py
broker_url = 'redis://localhost:6379'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
这是一个简单的芹菜设置和代码,但不起作用=\
celery -A tasks worker -B
什么也没发生。告诉我我做错了什么?谢谢!
您需要在 celeryconfig.py
中设置 beat_schedulefrom celery.schedules import crontab
beat_schedule = {
'send_each_minute': {
'task': 'your.module.path.function',
'schedule': crontab(),
'args': (),
},
}