python 和 celery:覆盖用于 gevent 池的硬超时
python and celery: override hard timeouts for use with gevent pool
有没有办法覆盖 celery 中的硬超时?我知道我可以通过失败作业的任务继承来做到这一点。
class MyTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
@app.task(base=MyTask, soft_time_limit=5, time_limit=10)
def add(x, y):
raise KeyError()
但是硬超时并不是失败的工作。我之所以要这样做是因为软超时不适用于 gevent 池,只能用于硬超时。
我花了一点时间才弄明白,但这就是你的做法。继承自Request,再继承自Task。从 MyTask 调用 Request(on_failure 方法所在的位置)。
class MyRequest(Request):
def on_timeout(self, soft, timeout):
super(MyRequest, self).on_timeout(soft, timeout)
if not soft:
logger.warning(
'A hard timeout was enforced for task %s',
self.task.name
)
class MyTask(Task):
Request = MyRequest # you can use a FQN 'my.package:MyRequest'
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
def run_time_job():
a = random.randrange(0, 20)
print('sleeping for', a)
time.sleep(a)
@app.task(base=MyTask, soft_time_limit=5, time_limit=10)
def add(x, y):
results = None
try:
run_time_job()
results = x + y
except SoftTimeLimitExceeded:
print('time limit exceeded')
redis_db.sadd('failed_jobs', 'failed at {} + {}'.format(x, y))
except TimeLimitExceeded:
raise KeyError()
return results
有没有办法覆盖 celery 中的硬超时?我知道我可以通过失败作业的任务继承来做到这一点。
class MyTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
@app.task(base=MyTask, soft_time_limit=5, time_limit=10)
def add(x, y):
raise KeyError()
但是硬超时并不是失败的工作。我之所以要这样做是因为软超时不适用于 gevent 池,只能用于硬超时。
我花了一点时间才弄明白,但这就是你的做法。继承自Request,再继承自Task。从 MyTask 调用 Request(on_failure 方法所在的位置)。
class MyRequest(Request):
def on_timeout(self, soft, timeout):
super(MyRequest, self).on_timeout(soft, timeout)
if not soft:
logger.warning(
'A hard timeout was enforced for task %s',
self.task.name
)
class MyTask(Task):
Request = MyRequest # you can use a FQN 'my.package:MyRequest'
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
def run_time_job():
a = random.randrange(0, 20)
print('sleeping for', a)
time.sleep(a)
@app.task(base=MyTask, soft_time_limit=5, time_limit=10)
def add(x, y):
results = None
try:
run_time_job()
results = x + y
except SoftTimeLimitExceeded:
print('time limit exceeded')
redis_db.sadd('failed_jobs', 'failed at {} + {}'.format(x, y))
except TimeLimitExceeded:
raise KeyError()
return results