如何将 tasks.py 之外定义的函数转换为 Celery 任务,并让 Celery worker 在 Django 中处理它

How to convert a function defined outside the tasks.py to a Celery task and make the Celery worker to process it in Django

我正在尝试在我的 Django 项目中使用 Celery 说“MySite”。我的目录结构如下

mysite
  - applications
    - api
      -tasks.py
      -urls.py
      -apps.py
      -__init__.py
      -v0
        -__init__.py
        -urls.py
        -views.py
        -utils.py 
      -v1
        -__init__.py
        -urls.py
        -views.py
        -utils.py
    - backbone
      -tasks.py
      -urls.py
      -apps.py
      -views.py
      -models.py
      -__init__.py
  -mysite
    - settings.py

我有两个应用程序 apibackbone。我在 api 目录中创建了 tasks.py ,其中定义了各种 Celery 任务。我在 utils.pyv0v1 目录中定义了各种函数。上述结构仅代表实际项目。该项目更大、更复杂。

我在 v0/utils.py

中说 func1
def func1:
    #code

如何在不移动 位置的情况下将 utils.py 中的 func1 转换为 Celery 任务func1 或将其导入 tasks.py ?我只想修改 func1 使其表现得像 celery 任务,我应该能够应用 retry 选项等

在您的 tasks.py 模块 (my_celery_task_function()) 中创建一个新任务,然后从那里调用您的实用程序函数,(util_function_1()util_function_2()

尝试这样的事情

#v0/utils.py
def util_function_1():
    # your code
    pass

# v1/utils.py
def util_function_2():
    # your code
    pass


#api/tasks.py
@app.task
def my_celery_task_function():
    from v1.utils import util_function_1
    from v2.utils import util_function_2
    util_function_1()
    util_function_2()

UPDATE-1
这是例子。您可以将 任何有效参数 添加到 @app.tasks() 装饰器。

# v0/utils.py
def func1():
    # your code
    pass


# api/tasks.py
@app.task(bind=True, default_retry_delay=30 * 60)
def my_celery_task_function_1(self, *args, **kwargs):
    from v1.utils import func1
    func1()