在获取 Celery 任务的结果时遇到问题
Having issues getting back results for Celery tasks
所以,我设置了一个 Celery 系统,我在其中为每个任务动态创建一个云 VM 实例,一旦任务完成,VM 实例将自行删除。为此,我创建了一个新队列并将新创建实例上的工作人员分配给该队列,以便可以将任务发送到特定实例。这适用于 1 或 2 个并发任务,但如果我尝试更多,那么 celery 的 result.get 方法只会无限期地等待。我正在使用 Celery 版本 4.2.1 (windowlicker)。
这是我的 Celery config.py 文件:
"""A module that configures Celery"""
from os import environ
from utils.loggerFactory import make_logger
LOGGER = make_logger(__name__)
LOGGER.info('Celery initalizing...')
REDIS_BACKEND_HOST = None
if 'RedisDNS' in environ:
REDIS_BACKEND_HOST = environ['RedisDNS']
LOGGER.info('Set Redis instance hostname to {}'.format(REDIS_BACKEND_HOST))
else:
LOGGER.warning('Couldn\'t fetch RedisDNS, defaulting to localhost...')
REDIS_BACKEND_HOST = 'localhost'
BROKER_URL = 'redis://{}'.format(REDIS_BACKEND_HOST)
CELERY_RESULT_BACKEND = 'redis://{}'.format(REDIS_BACKEND_HOST)
CELERY_TRACK_STARTED = True
CELERY_TASK_CREATE_MISSING_QUEUES = True
CELERY_TASK_IGNORE_RESULT = False
LOGGER.info('Init complete')
执行任务的主要代码如下:
if ENV != 'development':
# Create a new compute instance
try:
created_instance_name = create_worker_compute_instance(
task_info['computeInstanceType'])
except Exception as exc:
LOGGER.error(
'[{}] Couldn\'t create compute instance: {}'.format(request_id, str(exc)))
try:
LOGGER.info(
'[{}] Saving exception into redis...'.format(request_id))
result = json.loads(REDIS_CLIENT.get(request_id))
result['response'] = generate_response(
'Error: Couldn\'t create compute instance: {}'.format(str(exc)), None, 500)
result['code'] = 500
result['canDel'] = True
REDIS_CLIENT.set(request_id, json.dumps(result))
except Exception as exc:
LOGGER.error(
'[{}] Couldn\'t save exception into redis: {}'.format(request_id, str(exc)))
report_exception(ENV, exc)
report_exception(ENV, exc)
return
celery_queue_name = 'queue-{}'.format(created_instance_name)
LOGGER.info('[{}] Adding new Celery queue {}'.format(
request_id, celery_queue_name))
try:
APP.control.add_consumer(celery_queue_name, reply=False, destination=[
'worker1@{}'.format(created_instance_name)])
except Exception as exc:
LOGGER.error('[{}] Couldn\'t add queue {}: {}'.format(
request_id, celery_queue_name, str(exc)))
try:
LOGGER.info('[{}] Saving exception into redis...'.format(request_id))
result = json.loads(REDIS_CLIENT.get(request_id))
result['response'] = generate_response(
'Error: Couldn\'t add queue {}: {}'.format(celery_queue_name, str(exc)), None, 500)
result['code'] = 500
result['canDel'] = True
REDIS_CLIENT.set(request_id, json.dumps(result))
except Exception as exc:
LOGGER.error(
'[{}] Couldn\'t save exception into redis: {}'.format(request_id, str(exc)))
report_exception(ENV, exc)
report_exception(ENV, exc)
return
LOGGER.info('[{}] Queue added'.format(request_id))
else:
celery_queue_name = 'celery'
# Execute the task
LOGGER.info('[{}] Executing task...'.format(request_id))
async_result = run_task.apply_async(
args=(data, task_info, SERVICE_ACCOUNT_FILE_DATA), queue=celery_queue_name)
LOGGER.info('[{}] Waiting for task to complete...'.format(request_id))
task_result = None
try:
task_result = async_result.get()
except Exception as exc:
LOGGER.error(
'[{}] Couldn\'t execute task {}: {}'.format(request_id, task, str(exc)))
try:
LOGGER.info('[{}] Saving exception into redis...'.format(request_id))
result = json.loads(REDIS_CLIENT.get(request_id))
result['response'] = generate_response('Error: Couldn\'t execute task {}: {}'.format(
task, str(exc)), None, 500)
result['code'] = 500
result['canDel'] = True
REDIS_CLIENT.set(request_id, json.dumps(result))
except Exception as exc:
LOGGER.error(
'[{}] Couldn\'t save exception into redis: {}'.format(request_id, str(exc)))
report_exception(ENV, exc)
report_exception(ENV, exc)
return
LOGGER.info('[{}] Task executed successfully'.format(request_id))
task_result['message'] = 'Ok, task {} executed successfully'.format(
task)
try:
LOGGER.info('[{}] Saving result into redis...'.format(request_id))
result = json.loads(REDIS_CLIENT.get(request_id))
result['response'] = generate_response(
None, task_result, 0)
result['code'] = 200
result['canDel'] = True
REDIS_CLIENT.set(request_id, json.dumps(result))
except Exception as exc:
LOGGER.error(
'[{}] Couldn\'t save result into redis: {}'.format(request_id, str(exc)))
report_exception(ENV, exc)
return
编辑:
下面是系统概览的小图:
好的,问题似乎出在 APP.control.add_consumer(celery_queue_name, reply=False, destination=['worker1@{}'.format(created_instance_name)])。即使该命令 returns 成功,工作人员仍未添加到队列中。
我通过将队列名称包含在带有 -Q 参数的 worker 启动命令中设法解决了这个问题。
所以,我设置了一个 Celery 系统,我在其中为每个任务动态创建一个云 VM 实例,一旦任务完成,VM 实例将自行删除。为此,我创建了一个新队列并将新创建实例上的工作人员分配给该队列,以便可以将任务发送到特定实例。这适用于 1 或 2 个并发任务,但如果我尝试更多,那么 celery 的 result.get 方法只会无限期地等待。我正在使用 Celery 版本 4.2.1 (windowlicker)。
这是我的 Celery config.py 文件:
"""A module that configures Celery"""
from os import environ
from utils.loggerFactory import make_logger
LOGGER = make_logger(__name__)
LOGGER.info('Celery initalizing...')
REDIS_BACKEND_HOST = None
if 'RedisDNS' in environ:
REDIS_BACKEND_HOST = environ['RedisDNS']
LOGGER.info('Set Redis instance hostname to {}'.format(REDIS_BACKEND_HOST))
else:
LOGGER.warning('Couldn\'t fetch RedisDNS, defaulting to localhost...')
REDIS_BACKEND_HOST = 'localhost'
BROKER_URL = 'redis://{}'.format(REDIS_BACKEND_HOST)
CELERY_RESULT_BACKEND = 'redis://{}'.format(REDIS_BACKEND_HOST)
CELERY_TRACK_STARTED = True
CELERY_TASK_CREATE_MISSING_QUEUES = True
CELERY_TASK_IGNORE_RESULT = False
LOGGER.info('Init complete')
执行任务的主要代码如下:
if ENV != 'development':
# Create a new compute instance
try:
created_instance_name = create_worker_compute_instance(
task_info['computeInstanceType'])
except Exception as exc:
LOGGER.error(
'[{}] Couldn\'t create compute instance: {}'.format(request_id, str(exc)))
try:
LOGGER.info(
'[{}] Saving exception into redis...'.format(request_id))
result = json.loads(REDIS_CLIENT.get(request_id))
result['response'] = generate_response(
'Error: Couldn\'t create compute instance: {}'.format(str(exc)), None, 500)
result['code'] = 500
result['canDel'] = True
REDIS_CLIENT.set(request_id, json.dumps(result))
except Exception as exc:
LOGGER.error(
'[{}] Couldn\'t save exception into redis: {}'.format(request_id, str(exc)))
report_exception(ENV, exc)
report_exception(ENV, exc)
return
celery_queue_name = 'queue-{}'.format(created_instance_name)
LOGGER.info('[{}] Adding new Celery queue {}'.format(
request_id, celery_queue_name))
try:
APP.control.add_consumer(celery_queue_name, reply=False, destination=[
'worker1@{}'.format(created_instance_name)])
except Exception as exc:
LOGGER.error('[{}] Couldn\'t add queue {}: {}'.format(
request_id, celery_queue_name, str(exc)))
try:
LOGGER.info('[{}] Saving exception into redis...'.format(request_id))
result = json.loads(REDIS_CLIENT.get(request_id))
result['response'] = generate_response(
'Error: Couldn\'t add queue {}: {}'.format(celery_queue_name, str(exc)), None, 500)
result['code'] = 500
result['canDel'] = True
REDIS_CLIENT.set(request_id, json.dumps(result))
except Exception as exc:
LOGGER.error(
'[{}] Couldn\'t save exception into redis: {}'.format(request_id, str(exc)))
report_exception(ENV, exc)
report_exception(ENV, exc)
return
LOGGER.info('[{}] Queue added'.format(request_id))
else:
celery_queue_name = 'celery'
# Execute the task
LOGGER.info('[{}] Executing task...'.format(request_id))
async_result = run_task.apply_async(
args=(data, task_info, SERVICE_ACCOUNT_FILE_DATA), queue=celery_queue_name)
LOGGER.info('[{}] Waiting for task to complete...'.format(request_id))
task_result = None
try:
task_result = async_result.get()
except Exception as exc:
LOGGER.error(
'[{}] Couldn\'t execute task {}: {}'.format(request_id, task, str(exc)))
try:
LOGGER.info('[{}] Saving exception into redis...'.format(request_id))
result = json.loads(REDIS_CLIENT.get(request_id))
result['response'] = generate_response('Error: Couldn\'t execute task {}: {}'.format(
task, str(exc)), None, 500)
result['code'] = 500
result['canDel'] = True
REDIS_CLIENT.set(request_id, json.dumps(result))
except Exception as exc:
LOGGER.error(
'[{}] Couldn\'t save exception into redis: {}'.format(request_id, str(exc)))
report_exception(ENV, exc)
report_exception(ENV, exc)
return
LOGGER.info('[{}] Task executed successfully'.format(request_id))
task_result['message'] = 'Ok, task {} executed successfully'.format(
task)
try:
LOGGER.info('[{}] Saving result into redis...'.format(request_id))
result = json.loads(REDIS_CLIENT.get(request_id))
result['response'] = generate_response(
None, task_result, 0)
result['code'] = 200
result['canDel'] = True
REDIS_CLIENT.set(request_id, json.dumps(result))
except Exception as exc:
LOGGER.error(
'[{}] Couldn\'t save result into redis: {}'.format(request_id, str(exc)))
report_exception(ENV, exc)
return
编辑:
下面是系统概览的小图:
好的,问题似乎出在 APP.control.add_consumer(celery_queue_name, reply=False, destination=['worker1@{}'.format(created_instance_name)])。即使该命令 returns 成功,工作人员仍未添加到队列中。
我通过将队列名称包含在带有 -Q 参数的 worker 启动命令中设法解决了这个问题。