python 和 celery:覆盖用于 gevent 池的硬超时

python and celery: override hard timeouts for use with gevent pool

有没有办法覆盖 celery 中的硬超时?我知道我可以通过失败作业的任务继承来做到这一点。

class MyTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
    print('{0!r} failed: {1!r}'.format(task_id, exc))

@app.task(base=MyTask, soft_time_limit=5, time_limit=10)
def add(x, y):
    raise KeyError()

但是硬超时并不是失败的工作。我之所以要这样做是因为软超时不适用于 gevent 池,只能用于硬超时。

我花了一点时间才弄明白,但这就是你的做法。继承自Request,再继承自Task。从 MyTask 调用 Request(on_failure 方法所在的位置)。

class MyRequest(Request):
    def on_timeout(self, soft, timeout):
        super(MyRequest, self).on_timeout(soft, timeout)
        if not soft:
           logger.warning(
               'A hard timeout was enforced for task %s',
               self.task.name
           )


class MyTask(Task):
    Request = MyRequest  # you can use a FQN 'my.package:MyRequest'

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('{0!r} failed: {1!r}'.format(task_id, exc))


def run_time_job():
    a = random.randrange(0, 20)
    print('sleeping for', a)
    time.sleep(a)


@app.task(base=MyTask, soft_time_limit=5, time_limit=10)
def add(x, y):
    results = None
    try:
        run_time_job()
        results = x + y
    except SoftTimeLimitExceeded:
        print('time limit exceeded')
        redis_db.sadd('failed_jobs', 'failed at {} + {}'.format(x, y))
    except TimeLimitExceeded:
        raise KeyError()
    return results