芹菜任务没有注册?

Celery task is not getting registered?

我正在使用 Django + Celery 来动态安排任务。调度任务的时间将从用户数据库查询或由用户动态提供。 我正在尝试将参数传递给 start_task 函数,但它破坏了代码,因为调度程序无法将其添加到其条目中。它在没有附加参数 (schedule_start_time) 的情况下工作正常。 我觉得跟信号有关!!

def clean_time(time):
    time = time.get("created_at")
    hh = time.strftime("%H")
    mm = time.strftime("%M")
    ss = time.strftime("%S")
    dd = time.strftime("%d")
    mnth = time.strftime("%m")
    yy = time.strftime("%Y")
    return hh, mm, ss, dd, mnth, yy


@app.on_after_configure.connect
def start_time(schedule_start_date, **kwargs):
    hh, mm, ss, dd, mnth, yy = clean_time(schedule_start_date)
    app.add_periodic_task(crontab((hour='{}'.format(hh), minute='{}'.format(mm), day_of_month='{}'.format(dd), month_of_year='{}'.format(mnth)), test.s("hello"))


@app.task
def test(arg):
    print(arg)

H3ll0.

您应该使用 django-celery-beat 来管理 django 中的定期 celery 任务。

这是我创建的用于测试 django-celery-beat 的存储库。

Periodicall Repo

记住你应该 运行 文档中描述了 worker 和节拍。

django-celery-beat-documentation