如果任务失败,我如何召回 "task"?
How can I recall "task" if task is failed?
有函数 make_request
向 API 发出 http 请求。而且我每秒不能发出超过 3 个请求。
我做了类似的事情
coroutines = [make_request(...) for ... in ...]
tasks = []
for coroutine in coroutines:
tasks.append(asyncio.create_task(coroutine))
await asyncio.sleep(1 / 3)
responses = asyncio.gather(*tasks)
而且我每小时不能发出超过 1000 个请求。 (可能,我可以延迟 3600 / 1000
。)如果互联网连接丢失怎么办?我应该尝试再次提出请求。
我可以这样包装 make_request
:
async def try_make_request(...):
while True:
try:
return await make_request(...)
exception APIError as err:
logging.exception(...)
在这种情况下,每秒可能会发出超过 3 个请求。
我找到了 解决方案,但我不确定这是不是最好的解决方案
pending = []
coroutines = [...]
for coroutine in coroutines:
pending.append(asyncio.create_task(coroutine))
await asyncio.sleep(1 / 3)
result = []
while True:
finished, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_EXCEPTION
)
for task in finished:
exc = task.exception()
if isinstance(exc, APIError) and exc.code == 29:
pending.add(task.get_coro()) # since python 3.8
if exc:
logging.exception(...)
else:
result.append(task.result())
if not pending:
break
如果我对要求的理解正确,您发起 连接的间隔不得超过 3.6 秒。实现这一目标的一种方法是设置一个计时器,每次启动连接时该计时器都会重置,并在 3.6 秒后到期,从而允许启动下一个连接。例如:
class Limiter:
def __init__(self, delay):
self.delay = delay
self._ready = asyncio.Event()
self._ready.set()
async def wait(self):
# wait in a loop because if there are multiple waiters,
# the wakeup can be spurious
while not self._ready.is_set():
await self._ready.wait()
# We got the slot and can proceed with the download.
# Before doing so, clear the ready flag to prevent other waiters
# from commencing downloads until the delay elapses again.
self._ready.clear()
asyncio.get_event_loop().call_later(self.delay, self._ready.set)
那么 try_make_request
可能是这样的:
async def try_make_request(limiter, ...):
while True:
await limiter.wait()
try:
return await make_request(...)
exception APIError as err:
logging.exception(...)
...并且主协程可以并行等待 try_make_request
s:
limiter = Limiter(3600/1000)
responses = await asyncio.gather(*[try_make_request(limiter, ...) for ... in ...])
有函数 make_request
向 API 发出 http 请求。而且我每秒不能发出超过 3 个请求。
我做了类似的事情
coroutines = [make_request(...) for ... in ...]
tasks = []
for coroutine in coroutines:
tasks.append(asyncio.create_task(coroutine))
await asyncio.sleep(1 / 3)
responses = asyncio.gather(*tasks)
而且我每小时不能发出超过 1000 个请求。 (可能,我可以延迟 3600 / 1000
。)如果互联网连接丢失怎么办?我应该尝试再次提出请求。
我可以这样包装 make_request
:
async def try_make_request(...):
while True:
try:
return await make_request(...)
exception APIError as err:
logging.exception(...)
在这种情况下,每秒可能会发出超过 3 个请求。
我找到了
pending = []
coroutines = [...]
for coroutine in coroutines:
pending.append(asyncio.create_task(coroutine))
await asyncio.sleep(1 / 3)
result = []
while True:
finished, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_EXCEPTION
)
for task in finished:
exc = task.exception()
if isinstance(exc, APIError) and exc.code == 29:
pending.add(task.get_coro()) # since python 3.8
if exc:
logging.exception(...)
else:
result.append(task.result())
if not pending:
break
如果我对要求的理解正确,您发起 连接的间隔不得超过 3.6 秒。实现这一目标的一种方法是设置一个计时器,每次启动连接时该计时器都会重置,并在 3.6 秒后到期,从而允许启动下一个连接。例如:
class Limiter:
def __init__(self, delay):
self.delay = delay
self._ready = asyncio.Event()
self._ready.set()
async def wait(self):
# wait in a loop because if there are multiple waiters,
# the wakeup can be spurious
while not self._ready.is_set():
await self._ready.wait()
# We got the slot and can proceed with the download.
# Before doing so, clear the ready flag to prevent other waiters
# from commencing downloads until the delay elapses again.
self._ready.clear()
asyncio.get_event_loop().call_later(self.delay, self._ready.set)
那么 try_make_request
可能是这样的:
async def try_make_request(limiter, ...):
while True:
await limiter.wait()
try:
return await make_request(...)
exception APIError as err:
logging.exception(...)
...并且主协程可以并行等待 try_make_request
s:
limiter = Limiter(3600/1000)
responses = await asyncio.gather(*[try_make_request(limiter, ...) for ... in ...])