在获取 Celery 任务的结果时遇到问题

Having issues getting back results for Celery tasks

所以,我设置了一个 Celery 系统,我在其中为每个任务动态创建一个云 VM 实例,一旦任务完成,VM 实例将自行删除。为此,我创建了一个新队列并将新创建实例上的工作人员分配给该队列,以便可以将任务发送到特定实例。这适用于 1 或 2 个并发任务,但如果我尝试更多,那么 celery 的 result.get 方法只会无限期地等待。我正在使用 Celery 版本 4.2.1 (windowlicker)。

这是我的 Celery config.py 文件:

"""A module that configures Celery"""
from os import environ

from utils.loggerFactory import make_logger

LOGGER = make_logger(__name__)

LOGGER.info('Celery initalizing...')
REDIS_BACKEND_HOST = None
if 'RedisDNS' in environ:
    REDIS_BACKEND_HOST = environ['RedisDNS']
    LOGGER.info('Set Redis instance hostname to {}'.format(REDIS_BACKEND_HOST))
else:
    LOGGER.warning('Couldn\'t fetch RedisDNS, defaulting to localhost...')
    REDIS_BACKEND_HOST = 'localhost'

BROKER_URL = 'redis://{}'.format(REDIS_BACKEND_HOST)
CELERY_RESULT_BACKEND = 'redis://{}'.format(REDIS_BACKEND_HOST)
CELERY_TRACK_STARTED = True
CELERY_TASK_CREATE_MISSING_QUEUES = True
CELERY_TASK_IGNORE_RESULT = False
LOGGER.info('Init complete')

执行任务的主要代码如下:

if ENV != 'development':
        # Create a new compute instance
        try:
            created_instance_name = create_worker_compute_instance(
                task_info['computeInstanceType'])
        except Exception as exc:
            LOGGER.error(
                '[{}] Couldn\'t create compute instance: {}'.format(request_id, str(exc)))
            try:
                LOGGER.info(
                    '[{}] Saving exception into redis...'.format(request_id))
                result = json.loads(REDIS_CLIENT.get(request_id))
                result['response'] = generate_response(
                    'Error: Couldn\'t create compute instance: {}'.format(str(exc)), None, 500)
                result['code'] = 500
                result['canDel'] = True
                REDIS_CLIENT.set(request_id, json.dumps(result))
            except Exception as exc:
                LOGGER.error(
                    '[{}] Couldn\'t save exception into redis: {}'.format(request_id, str(exc)))
                report_exception(ENV, exc)
            report_exception(ENV, exc)
            return

        celery_queue_name = 'queue-{}'.format(created_instance_name)

        LOGGER.info('[{}] Adding new Celery queue {}'.format(
            request_id, celery_queue_name))
        try:
            APP.control.add_consumer(celery_queue_name, reply=False, destination=[
                'worker1@{}'.format(created_instance_name)])
        except Exception as exc:
            LOGGER.error('[{}] Couldn\'t add queue {}: {}'.format(
                request_id, celery_queue_name, str(exc)))
            try:
                LOGGER.info('[{}] Saving exception into redis...'.format(request_id))
                result = json.loads(REDIS_CLIENT.get(request_id))
                result['response'] = generate_response(
                    'Error: Couldn\'t add queue {}: {}'.format(celery_queue_name, str(exc)), None, 500)
                result['code'] = 500
                result['canDel'] = True
                REDIS_CLIENT.set(request_id, json.dumps(result))
            except Exception as exc:
                LOGGER.error(
                    '[{}] Couldn\'t save exception into redis: {}'.format(request_id, str(exc)))
                report_exception(ENV, exc)
            report_exception(ENV, exc)
            return
        LOGGER.info('[{}] Queue added'.format(request_id))
    else:
        celery_queue_name = 'celery'

    # Execute the task
    LOGGER.info('[{}] Executing task...'.format(request_id))
    async_result = run_task.apply_async(
        args=(data, task_info, SERVICE_ACCOUNT_FILE_DATA), queue=celery_queue_name)

    LOGGER.info('[{}] Waiting for task to complete...'.format(request_id))
    task_result = None
    try:
        task_result = async_result.get()
    except Exception as exc:
        LOGGER.error(
            '[{}] Couldn\'t execute task {}: {}'.format(request_id, task, str(exc)))
        try:
            LOGGER.info('[{}] Saving exception into redis...'.format(request_id))
            result = json.loads(REDIS_CLIENT.get(request_id))
            result['response'] = generate_response('Error: Couldn\'t execute task {}: {}'.format(
                task, str(exc)), None, 500)
            result['code'] = 500
            result['canDel'] = True
            REDIS_CLIENT.set(request_id, json.dumps(result))
        except Exception as exc:
            LOGGER.error(
                '[{}] Couldn\'t save exception into redis: {}'.format(request_id, str(exc)))
            report_exception(ENV, exc)
        report_exception(ENV, exc)
        return

    LOGGER.info('[{}] Task executed successfully'.format(request_id))
    task_result['message'] = 'Ok, task {} executed successfully'.format(
        task)
    try:
        LOGGER.info('[{}] Saving result into redis...'.format(request_id))
        result = json.loads(REDIS_CLIENT.get(request_id))
        result['response'] = generate_response(
            None, task_result, 0)
        result['code'] = 200
        result['canDel'] = True
        REDIS_CLIENT.set(request_id, json.dumps(result))
    except Exception as exc:
        LOGGER.error(
            '[{}] Couldn\'t save result into redis: {}'.format(request_id, str(exc)))
        report_exception(ENV, exc)
        return

编辑:

下面是系统概览的小图:

好的,问题似乎出在 APP.control.add_consumer(celery_queue_name, reply=False, destination=['worker1@{}'.format(created_instance_name)])。即使该命令 returns 成功,工作人员仍未添加到队列中。

我通过将队列名称包含在带有 -Q 参数的 worker 启动命令中设法解决了这个问题。