芹菜任务没有注册?
Celery task is not getting registered?
我正在使用 Django + Celery 来动态安排任务。调度任务的时间将从用户数据库查询或由用户动态提供。
我正在尝试将参数传递给 start_task 函数,但它破坏了代码,因为调度程序无法将其添加到其条目中。它在没有附加参数 (schedule_start_time) 的情况下工作正常。
我觉得跟信号有关!!
def clean_time(time):
time = time.get("created_at")
hh = time.strftime("%H")
mm = time.strftime("%M")
ss = time.strftime("%S")
dd = time.strftime("%d")
mnth = time.strftime("%m")
yy = time.strftime("%Y")
return hh, mm, ss, dd, mnth, yy
@app.on_after_configure.connect
def start_time(schedule_start_date, **kwargs):
hh, mm, ss, dd, mnth, yy = clean_time(schedule_start_date)
app.add_periodic_task(crontab((hour='{}'.format(hh), minute='{}'.format(mm), day_of_month='{}'.format(dd), month_of_year='{}'.format(mnth)), test.s("hello"))
@app.task
def test(arg):
print(arg)
H3ll0.
您应该使用 django-celery-beat 来管理 django 中的定期 celery 任务。
这是我创建的用于测试 django-celery-beat 的存储库。
记住你应该 运行 文档中描述了 worker 和节拍。
我正在使用 Django + Celery 来动态安排任务。调度任务的时间将从用户数据库查询或由用户动态提供。 我正在尝试将参数传递给 start_task 函数,但它破坏了代码,因为调度程序无法将其添加到其条目中。它在没有附加参数 (schedule_start_time) 的情况下工作正常。 我觉得跟信号有关!!
def clean_time(time):
time = time.get("created_at")
hh = time.strftime("%H")
mm = time.strftime("%M")
ss = time.strftime("%S")
dd = time.strftime("%d")
mnth = time.strftime("%m")
yy = time.strftime("%Y")
return hh, mm, ss, dd, mnth, yy
@app.on_after_configure.connect
def start_time(schedule_start_date, **kwargs):
hh, mm, ss, dd, mnth, yy = clean_time(schedule_start_date)
app.add_periodic_task(crontab((hour='{}'.format(hh), minute='{}'.format(mm), day_of_month='{}'.format(dd), month_of_year='{}'.format(mnth)), test.s("hello"))
@app.task
def test(arg):
print(arg)
H3ll0.
您应该使用 django-celery-beat 来管理 django 中的定期 celery 任务。
这是我创建的用于测试 django-celery-beat 的存储库。
记住你应该 运行 文档中描述了 worker 和节拍。