路由芹菜任务
Routing celery tasks
我在尝试创建两个独立的专职工作人员时无法将任务发送到 celery。我已经阅读了文档和 this question,但它并没有改善我的情况。
我的配置如下:
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = f'redis://{env("REDIS_HOST")}:{env("REDIS_PORT")}/{env("REDIS_CELERY_DB")}'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('media', Exchange('media'), routing_key='media'),
)
CELERY_ROUTES = {
'books.tasks.resize_book_photo': {
'queue': 'media',
'routing_key': 'media',
},
}
任务在 tasks.py
文件中按以下方式定义:
import logging
import time
from celery import shared_task
from books.models import Author, Book
from books.commands import resize_book_photo as resize_book_photo_command
logger = logging.getLogger(__name__)
@shared_task
def list_test_books_per_author():
time.sleep(5)
queryset = Author.objects.all()
for author in queryset:
for book in author.testing_books:
logger.info(book.title)
@shared_task
def resize_book_photo(book_id: int):
resize_book_photo_command(Book.objects.get(id=book_id))
他们被称为 apply_async
:
list_test_books_per_author.apply_async()
resize_book_photo.apply_async((book.id,))
当我 运行 芹菜花时,我看到没有任务出现在队列中。
工作人员开始使用:
celery -A blacksheep worker -l info --autoscale=10,1 -Q media --host=media@%h
celery -A blacksheep worker -l info --autoscale=10,1 -Q default --host=default@%h
我能做的是使用 redis-cli
和 127.0.0.1:6379> LRANGE celery 1 100
命令确认它们最终位于 celery
键下(这是芹菜的默认键)。好像没有工人消费。
编辑 仔细查看 this part of documentation 后,我发现我的命名有误。将设置更改为:
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = f'redis://{env("REDIS_HOST")}:{env("REDIS_PORT")}/{env("REDIS_CELERY_DB")}'
CELERY_TASK_DEFAULT_QUEUE = 'default'
# CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('media', Exchange('media'), routing_key='media'),
)
CELERY_ROUTES = {
'books.tasks.resize_book_photo': {
'queue': 'media',
'routing_key': 'media',
},
}
情况改善:任务从default
队列消费,但是我想去media
队列的任务也去了default
.
EDIT2 我试图通过将其调用更改为 resize_book_photo.apply_async((book.id,), queue='media')
来明确告诉某些任务转到其他队列。任务已正确分派到适当的队列并被使用。但是,我更希望自动处理这个问题,这样我就不必在调用 apply_async
时定义队列
尝试 CELERY_TASK_ROUTES
而不是 CELERY_ROUTES
。这最近对我有用 django 集成。
解释隐藏在这条评论中:
我在尝试创建两个独立的专职工作人员时无法将任务发送到 celery。我已经阅读了文档和 this question,但它并没有改善我的情况。
我的配置如下:
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = f'redis://{env("REDIS_HOST")}:{env("REDIS_PORT")}/{env("REDIS_CELERY_DB")}'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('media', Exchange('media'), routing_key='media'),
)
CELERY_ROUTES = {
'books.tasks.resize_book_photo': {
'queue': 'media',
'routing_key': 'media',
},
}
任务在 tasks.py
文件中按以下方式定义:
import logging
import time
from celery import shared_task
from books.models import Author, Book
from books.commands import resize_book_photo as resize_book_photo_command
logger = logging.getLogger(__name__)
@shared_task
def list_test_books_per_author():
time.sleep(5)
queryset = Author.objects.all()
for author in queryset:
for book in author.testing_books:
logger.info(book.title)
@shared_task
def resize_book_photo(book_id: int):
resize_book_photo_command(Book.objects.get(id=book_id))
他们被称为 apply_async
:
list_test_books_per_author.apply_async()
resize_book_photo.apply_async((book.id,))
当我 运行 芹菜花时,我看到没有任务出现在队列中。
工作人员开始使用:
celery -A blacksheep worker -l info --autoscale=10,1 -Q media --host=media@%h
celery -A blacksheep worker -l info --autoscale=10,1 -Q default --host=default@%h
我能做的是使用 redis-cli
和 127.0.0.1:6379> LRANGE celery 1 100
命令确认它们最终位于 celery
键下(这是芹菜的默认键)。好像没有工人消费。
编辑 仔细查看 this part of documentation 后,我发现我的命名有误。将设置更改为:
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = f'redis://{env("REDIS_HOST")}:{env("REDIS_PORT")}/{env("REDIS_CELERY_DB")}'
CELERY_TASK_DEFAULT_QUEUE = 'default'
# CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('media', Exchange('media'), routing_key='media'),
)
CELERY_ROUTES = {
'books.tasks.resize_book_photo': {
'queue': 'media',
'routing_key': 'media',
},
}
情况改善:任务从default
队列消费,但是我想去media
队列的任务也去了default
.
EDIT2 我试图通过将其调用更改为 resize_book_photo.apply_async((book.id,), queue='media')
来明确告诉某些任务转到其他队列。任务已正确分派到适当的队列并被使用。但是,我更希望自动处理这个问题,这样我就不必在调用 apply_async
尝试 CELERY_TASK_ROUTES
而不是 CELERY_ROUTES
。这最近对我有用 django 集成。
解释隐藏在这条评论中: