处理 ensure_future 及其缺失的任务
Handling ensure_future and its missing tasks
我有一个流式应用程序,它几乎连续不断地将给定的数据作为输入并使用该值发送 HTTP 请求,并对返回值执行某些操作。
显然为了加快速度,我在 Python 3.7 中使用了 asyncio 和 aiohttp 库来获得最佳性能,但考虑到数据移动的速度,调试变得很困难。
这就是我的代码的样子
'''
Gets the final requests
'''
async def apiRequest(info, url, session, reqType, post_data=''):
if reqType:
async with session.post(url, data = post_data) as response:
info['response'] = await response.text()
else:
async with session.get(url+post_data) as response:
info['response'] = await response.text()
logger.debug(info)
return info
'''
Loops through the batches and sends it for request
'''
async def main(data, listOfData):
tasks = []
async with ClientSession() as session:
for reqData in listOfData:
try:
task = asyncio.ensure_future(apiRequest(**reqData))
tasks.append(task)
except Exception as e:
print(e)
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)
responses = await asyncio.gather(*tasks)
return responses #list of APIResponses
'''
Streams data in and prepares batches to send for requests
'''
async def Kconsumer(data, loop, batchsize=100):
consumer = AIOKafkaConsumer(**KafkaConfigs)
await consumer.start()
dataPoints = []
async for msg in consumer:
try:
sys.stdout.flush()
consumedMsg = loads(msg.value.decode('utf-8'))
if consumedMsg['tid']:
dataPoints.append(loads(msg.value.decode('utf-8')))
if len(dataPoints)==batchsize or time.time() - startTime>5:
'''
#1: The task below goes and sends HTTP GET requests in bulk using aiohttp
'''
task = asyncio.ensure_future(getRequests(data, dataPoints))
res = await asyncio.gather(*[task])
if task.done():
outputs = []
'''
#2: Does some ETL on the returned values
'''
ids = await asyncio.gather(*[doSomething(**{'tid':x['tid'],
'cid':x['cid'], 'tn':x['tn'],
'id':x['id'], 'ix':x['ix'],
'ac':x['ac'], 'output':to_dict(xmltodict.parse(x['response'],encoding='utf-8')),
'loop':loop, 'option':1}) for x in res[0]])
simplySaveDataIntoDataBase(id) # This is where I see some missing data in the database
dataPoints = []
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
logger.error(str(exc_type) +' '+ str(fname) +' '+ str(exc_tb.tb_lineno))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(Kconsumer(data, loop, batchsize=100))
loop.run_forever()
ensure_future 需要 await
吗?
aiohttp 如何处理比其他请求晚一点的请求?难道它不应该把整批都搁置起来而不是完全忘记它吗?
Does the ensure_future
need to be await
ed ?
是的,您的代码已经这样做了。 await asyncio.gather(*tasks)
等待提供的任务并 return 以相同的顺序返回它们的结果。
请注意 await asyncio.gather(*[task])
没有意义,因为它等同于 await asyncio.gather(task)
,后者又等同于 await task
。换句话说,当你需要getRequests(data, dataPoints)
的结果时,你可以写res = await getRequests(data, dataPoints)
而不需要先调用ensure_future()
再调用gather()
的仪式。
事实上,您几乎不需要自己调用 ensure_future
:
- 如果您需要等待多个任务,您可以将协程对象直接传递给
gather
,例如gather(coroutine1(), coroutine2())
.
- 如果需要启动后台任务,可以调用
asyncio.create_task(coroutine(...))
How does aiohttp handle requests that come a little later than the others? Shouldn't it hold the whole batch back instead of forgetting about it altoghter?
如果您使用 gather
,所有请求都必须在任何请求 return 之前完成。 (这不是 aiohttp 策略,它是 gather
的工作方式。)如果你需要实现超时,你可以使用 asyncio.wait_for
或类似的。
我有一个流式应用程序,它几乎连续不断地将给定的数据作为输入并使用该值发送 HTTP 请求,并对返回值执行某些操作。
显然为了加快速度,我在 Python 3.7 中使用了 asyncio 和 aiohttp 库来获得最佳性能,但考虑到数据移动的速度,调试变得很困难。
这就是我的代码的样子
'''
Gets the final requests
'''
async def apiRequest(info, url, session, reqType, post_data=''):
if reqType:
async with session.post(url, data = post_data) as response:
info['response'] = await response.text()
else:
async with session.get(url+post_data) as response:
info['response'] = await response.text()
logger.debug(info)
return info
'''
Loops through the batches and sends it for request
'''
async def main(data, listOfData):
tasks = []
async with ClientSession() as session:
for reqData in listOfData:
try:
task = asyncio.ensure_future(apiRequest(**reqData))
tasks.append(task)
except Exception as e:
print(e)
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)
responses = await asyncio.gather(*tasks)
return responses #list of APIResponses
'''
Streams data in and prepares batches to send for requests
'''
async def Kconsumer(data, loop, batchsize=100):
consumer = AIOKafkaConsumer(**KafkaConfigs)
await consumer.start()
dataPoints = []
async for msg in consumer:
try:
sys.stdout.flush()
consumedMsg = loads(msg.value.decode('utf-8'))
if consumedMsg['tid']:
dataPoints.append(loads(msg.value.decode('utf-8')))
if len(dataPoints)==batchsize or time.time() - startTime>5:
'''
#1: The task below goes and sends HTTP GET requests in bulk using aiohttp
'''
task = asyncio.ensure_future(getRequests(data, dataPoints))
res = await asyncio.gather(*[task])
if task.done():
outputs = []
'''
#2: Does some ETL on the returned values
'''
ids = await asyncio.gather(*[doSomething(**{'tid':x['tid'],
'cid':x['cid'], 'tn':x['tn'],
'id':x['id'], 'ix':x['ix'],
'ac':x['ac'], 'output':to_dict(xmltodict.parse(x['response'],encoding='utf-8')),
'loop':loop, 'option':1}) for x in res[0]])
simplySaveDataIntoDataBase(id) # This is where I see some missing data in the database
dataPoints = []
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
logger.error(str(exc_type) +' '+ str(fname) +' '+ str(exc_tb.tb_lineno))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(Kconsumer(data, loop, batchsize=100))
loop.run_forever()
ensure_future 需要 await
吗?
aiohttp 如何处理比其他请求晚一点的请求?难道它不应该把整批都搁置起来而不是完全忘记它吗?
Does the
ensure_future
need to beawait
ed ?
是的,您的代码已经这样做了。 await asyncio.gather(*tasks)
等待提供的任务并 return 以相同的顺序返回它们的结果。
请注意 await asyncio.gather(*[task])
没有意义,因为它等同于 await asyncio.gather(task)
,后者又等同于 await task
。换句话说,当你需要getRequests(data, dataPoints)
的结果时,你可以写res = await getRequests(data, dataPoints)
而不需要先调用ensure_future()
再调用gather()
的仪式。
事实上,您几乎不需要自己调用 ensure_future
:
- 如果您需要等待多个任务,您可以将协程对象直接传递给
gather
,例如gather(coroutine1(), coroutine2())
. - 如果需要启动后台任务,可以调用
asyncio.create_task(coroutine(...))
How does aiohttp handle requests that come a little later than the others? Shouldn't it hold the whole batch back instead of forgetting about it altoghter?
如果您使用 gather
,所有请求都必须在任何请求 return 之前完成。 (这不是 aiohttp 策略,它是 gather
的工作方式。)如果你需要实现超时,你可以使用 asyncio.wait_for
或类似的。