芹菜中的异常处理?
Exception handling in Celery?
我有一组任务,每个任务都向标准 oauth API 端点发出请求,并且依赖于 bearer_token。如果 bearer_token 已过期,任务将在响应处理期间引发异常。还有一个 refresh_bearer_token
任务,处理令牌过期后的更新。
这是一个伪代码:
from proj.celery import app
bearer_token = '1234'
class OauthError(Exception):
pass
@app.task
def api_request():
response = request(bearer_token, ...)
if response.bearer_token_expired:
raise OauthError('oauth')
@app.task
def refresh_bearer_token():
...
如何安排 refresh_bearer_token
任务在引发 OauthError
时执行?
我能找到的唯一解决方案是像这样使用 link_error
kwarg:
@app.task
def error_callback(uuid):
exception_msg = AsyncResult(uuid).get(propagate=False, disable_sync_subtasks=False)
if exception_msg = 'oauth':
refresh_bearer_token.delay()
else:
raise
api_request.apply_async(link_error=error_callback.s())
但这似乎不是最佳选择,原因有几个,最明显的是因为它在另一个同步子任务中生成了一个同步子任务,strongly discourged 在文档中。
Is there a more pythonic way of exception catching in celery?
例如:
def catch(func_that_requires_oauth):
try:
func_that_requires_oauth.delay()
except OauthError:
refresh_bearer_token.delay() | func_that_requires_oauth.delay()
只是抛出一些想法。您可以创建一个基础任务,如果 refresh_bearer_token 任务在调用时已获取锁,它会等待或重试。当它失败时,它会启动 refresh_bearer_token 任务并自行退出。
重试会将 运行 任务的副本放到队列的后面
现在您必须实施某种锁定,如果已获得锁定,refresh_bearer_token 什么都不做,因为另一个任务应该更新它。您还需要为此 "lock" 添加 TTL,以防止 refresh_bearer_token
任务失败时出现某些情况
@app.task
def refresh_bearer_token():
try:
with aquire_lock(timeout=0):
refresh_token()
except TimeoutError:
pass
class RequiresOauthTask(app.Task):
abstract = True
def __call__(self, *args, **kwargs):
if lock_is_present():
self.retry() # or wait?
return super().__call__(*args, **kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
if isinstance(exc, OauthError):
refresh_bearer_token.delay()
self.retry()
super().on_failure(exc, task_id, args, kwargs, einfo)
@app.task(base=RequiresOauthTask)
def my_task():
pass
我有一组任务,每个任务都向标准 oauth API 端点发出请求,并且依赖于 bearer_token。如果 bearer_token 已过期,任务将在响应处理期间引发异常。还有一个 refresh_bearer_token
任务,处理令牌过期后的更新。
这是一个伪代码:
from proj.celery import app
bearer_token = '1234'
class OauthError(Exception):
pass
@app.task
def api_request():
response = request(bearer_token, ...)
if response.bearer_token_expired:
raise OauthError('oauth')
@app.task
def refresh_bearer_token():
...
如何安排 refresh_bearer_token
任务在引发 OauthError
时执行?
我能找到的唯一解决方案是像这样使用 link_error
kwarg:
@app.task
def error_callback(uuid):
exception_msg = AsyncResult(uuid).get(propagate=False, disable_sync_subtasks=False)
if exception_msg = 'oauth':
refresh_bearer_token.delay()
else:
raise
api_request.apply_async(link_error=error_callback.s())
但这似乎不是最佳选择,原因有几个,最明显的是因为它在另一个同步子任务中生成了一个同步子任务,strongly discourged 在文档中。
Is there a more pythonic way of exception catching in celery?
例如:
def catch(func_that_requires_oauth):
try:
func_that_requires_oauth.delay()
except OauthError:
refresh_bearer_token.delay() | func_that_requires_oauth.delay()
只是抛出一些想法。您可以创建一个基础任务,如果 refresh_bearer_token 任务在调用时已获取锁,它会等待或重试。当它失败时,它会启动 refresh_bearer_token 任务并自行退出。
重试会将 运行 任务的副本放到队列的后面
现在您必须实施某种锁定,如果已获得锁定,refresh_bearer_token 什么都不做,因为另一个任务应该更新它。您还需要为此 "lock" 添加 TTL,以防止 refresh_bearer_token
任务失败时出现某些情况
@app.task
def refresh_bearer_token():
try:
with aquire_lock(timeout=0):
refresh_token()
except TimeoutError:
pass
class RequiresOauthTask(app.Task):
abstract = True
def __call__(self, *args, **kwargs):
if lock_is_present():
self.retry() # or wait?
return super().__call__(*args, **kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
if isinstance(exc, OauthError):
refresh_bearer_token.delay()
self.retry()
super().on_failure(exc, task_id, args, kwargs, einfo)
@app.task(base=RequiresOauthTask)
def my_task():
pass