python 3.6 asyncio 错误 not iterable while not iterated through async object
python 3.6 asyncio error not iterable while not iterating through async object
我有一个 class 创建一个 url 和一些 json 以在看起来像那样的 post 方法中执行,我正在关注 this guide
import vk_api
from vk_api.execute import VkFunction
import time
from datetime import datetime
import numpy as np
import asyncio
from ratelimit import limits
import requests
import aiohttp
class Execute:
def __init__(self, access_token):
self.access_token = access_token
def posts_to_push(self, posts, limit):
arr = []
data = list(self.posts_chunks_limit(posts, limit))
for i in range(len(data)):
code = f"data.push(API.wall.getById( {{'posts': {data[i]} }} )); "
arr.append(code)
return arr
def posts_execute_command(self, posts): # TODO make async
limit = 100
code = self.posts_to_push(posts, limit)
execute_limit = 25
for i in range(len(code)):
data = ''.join(code[i * execute_limit: (i * execute_limit) + execute_limit])
var = f'var data = []; {data} return data ;'
yield var
async def fetch(url, json_data, session):
async with session.post(url, json=json_data) as response:
return await response.read()
async def result_posts(self, posts):
result = []
command = self.posts_execute_command(posts)
async with aiohttp.ClientSession() as session:
for i in command:
execute = asyncio.ensure_future(self.fetch(url="https://api.vk.com/method/execute",
json_data={
"code": i,
"access_token": self.access_token,
"v": 5.101,
}), session)
result.append(execute)
responses = await asyncio.gather(*result)
print(responses)
async def posts_chunks_limit(self, data, limit):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(data), limit):
await asyncio.sleep(0.1)
yield data[i:i + limit]
def run_async(self, posts):
loop = asyncio.get_event_loop()
loop.run_until_complete(self.result_posts(posts))
然后我运行就这样
df = pd.read_csv('/some_path')
arr = []
for i in df['ids']:
arr.append(i)
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(vk.result_posts(arr))
loop.run_until_complete(future)
错误消息如下所示
Traceback (most recent call last):
File "../test_python.py", line 83, in <module>
loop.run_until_complete(future)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
return future.result()
File "../test_python.py", line 45, in result_posts
for i in command:
File "../test_python.py", line 29, in posts_execute_command
code = self.posts_to_push(posts, limit)
File "../test_python.py", line 21, in posts_to_push
data = list(self.posts_chunks_limit(posts, limit))
TypeError: 'async_generator' object is not iterable
这是我第一次使用 aiohttp/asyncio,我觉得它很复杂,很容易迷路,也许我可以得到一些指导或解决方案?
这一行:
data = list(self.posts_chunks_limit(posts, limit))
由于 post_chunks_limit
是一个异步迭代器,list
不知道如何处理它。您需要使用 async for
或异步列表理解对其进行迭代:
data = [x async for x in self.posts_chunks_limit(posts, limit)]
这需要用async def
定义posts_to_push
和posts_execute_command
。另外 posts_execute_command
必须 await
调用 posts_to_push
并且 result_posts
需要等待调用 posts_execute_command
.
在@user4815162342 和一堆 SO 帖子的帮助下,我能够解决我的问题,我的代码如下所示。
问题是我是 calling/awaiting 一个在我的 result_posts
方法中不可迭代的生成器。
import vk_api
from vk_api.execute import VkFunction
import time
from datetime import datetime
import numpy as np
import asyncio
from ratelimit import limits
import requests
import aiohttp
import socket
from concurrent.futures import ThreadPoolExecutor
class Execute: # TODO auth, parsers, limits, timeouts
def __init__(self, access_token):
self.access_token = access_token
async def posts_to_push(self, posts, limit):
arr = []
data = [x async for x in self.posts_chunks_limit(posts, limit)]
for i in range(len(data)):
code = f"data.push(API.wall.getById( {{'posts': {data[i]} }} )); "
arr.append(code)
return arr # < len() = 1000, 1k lists with 100 post IDs inside for 100k total ids
async def posts_execute_command(self, posts): # TODO make async
limit = 100
code = await self.posts_to_push(posts, limit)
execute_limit = 25
for i in range(len(code)):
data = ''.join(code[i * execute_limit: (i * execute_limit) + execute_limit])
var = f'var data = []; {data} return data ;'
print(var, '---var---')
yield var
async def fetch(self, url, json_data, session):
async with session.post(url, data=json_data) as response:
return await response.read()
@limits(calls=1, period=1)
async def result_posts(self, posts):
result = []
command = [i async for i in self.posts_execute_command(posts) ] #<note this iteration
conn = aiohttp.TCPConnector(
family=socket.AF_INET,
verify_ssl=False,)
async with aiohttp.ClientSession(connector=conn) as session:
for i in command:
print('---code---', len(command)) #TODO fix command range that's the bug
execute = asyncio.ensure_future(self.fetch(url="https://api.vk.com/method/execute",
json_data={
"code": i,
"access_token": self.access_token,
"v": 5.101,
}, session = session))
await asyncio.sleep(1)
result.append(execute)
responses = await asyncio.gather(*result)
print(responses, 'responses')
return 'Done'
async def posts_chunks_limit(self, data, limit):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(data), limit):
yield data[i:i + limit]
我有一个 class 创建一个 url 和一些 json 以在看起来像那样的 post 方法中执行,我正在关注 this guide
import vk_api
from vk_api.execute import VkFunction
import time
from datetime import datetime
import numpy as np
import asyncio
from ratelimit import limits
import requests
import aiohttp
class Execute:
def __init__(self, access_token):
self.access_token = access_token
def posts_to_push(self, posts, limit):
arr = []
data = list(self.posts_chunks_limit(posts, limit))
for i in range(len(data)):
code = f"data.push(API.wall.getById( {{'posts': {data[i]} }} )); "
arr.append(code)
return arr
def posts_execute_command(self, posts): # TODO make async
limit = 100
code = self.posts_to_push(posts, limit)
execute_limit = 25
for i in range(len(code)):
data = ''.join(code[i * execute_limit: (i * execute_limit) + execute_limit])
var = f'var data = []; {data} return data ;'
yield var
async def fetch(url, json_data, session):
async with session.post(url, json=json_data) as response:
return await response.read()
async def result_posts(self, posts):
result = []
command = self.posts_execute_command(posts)
async with aiohttp.ClientSession() as session:
for i in command:
execute = asyncio.ensure_future(self.fetch(url="https://api.vk.com/method/execute",
json_data={
"code": i,
"access_token": self.access_token,
"v": 5.101,
}), session)
result.append(execute)
responses = await asyncio.gather(*result)
print(responses)
async def posts_chunks_limit(self, data, limit):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(data), limit):
await asyncio.sleep(0.1)
yield data[i:i + limit]
def run_async(self, posts):
loop = asyncio.get_event_loop()
loop.run_until_complete(self.result_posts(posts))
然后我运行就这样
df = pd.read_csv('/some_path')
arr = []
for i in df['ids']:
arr.append(i)
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(vk.result_posts(arr))
loop.run_until_complete(future)
错误消息如下所示
Traceback (most recent call last):
File "../test_python.py", line 83, in <module>
loop.run_until_complete(future)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
return future.result()
File "../test_python.py", line 45, in result_posts
for i in command:
File "../test_python.py", line 29, in posts_execute_command
code = self.posts_to_push(posts, limit)
File "../test_python.py", line 21, in posts_to_push
data = list(self.posts_chunks_limit(posts, limit))
TypeError: 'async_generator' object is not iterable
这是我第一次使用 aiohttp/asyncio,我觉得它很复杂,很容易迷路,也许我可以得到一些指导或解决方案?
这一行:
data = list(self.posts_chunks_limit(posts, limit))
由于 post_chunks_limit
是一个异步迭代器,list
不知道如何处理它。您需要使用 async for
或异步列表理解对其进行迭代:
data = [x async for x in self.posts_chunks_limit(posts, limit)]
这需要用async def
定义posts_to_push
和posts_execute_command
。另外 posts_execute_command
必须 await
调用 posts_to_push
并且 result_posts
需要等待调用 posts_execute_command
.
在@user4815162342 和一堆 SO 帖子的帮助下,我能够解决我的问题,我的代码如下所示。
问题是我是 calling/awaiting 一个在我的 result_posts
方法中不可迭代的生成器。
import vk_api
from vk_api.execute import VkFunction
import time
from datetime import datetime
import numpy as np
import asyncio
from ratelimit import limits
import requests
import aiohttp
import socket
from concurrent.futures import ThreadPoolExecutor
class Execute: # TODO auth, parsers, limits, timeouts
def __init__(self, access_token):
self.access_token = access_token
async def posts_to_push(self, posts, limit):
arr = []
data = [x async for x in self.posts_chunks_limit(posts, limit)]
for i in range(len(data)):
code = f"data.push(API.wall.getById( {{'posts': {data[i]} }} )); "
arr.append(code)
return arr # < len() = 1000, 1k lists with 100 post IDs inside for 100k total ids
async def posts_execute_command(self, posts): # TODO make async
limit = 100
code = await self.posts_to_push(posts, limit)
execute_limit = 25
for i in range(len(code)):
data = ''.join(code[i * execute_limit: (i * execute_limit) + execute_limit])
var = f'var data = []; {data} return data ;'
print(var, '---var---')
yield var
async def fetch(self, url, json_data, session):
async with session.post(url, data=json_data) as response:
return await response.read()
@limits(calls=1, period=1)
async def result_posts(self, posts):
result = []
command = [i async for i in self.posts_execute_command(posts) ] #<note this iteration
conn = aiohttp.TCPConnector(
family=socket.AF_INET,
verify_ssl=False,)
async with aiohttp.ClientSession(connector=conn) as session:
for i in command:
print('---code---', len(command)) #TODO fix command range that's the bug
execute = asyncio.ensure_future(self.fetch(url="https://api.vk.com/method/execute",
json_data={
"code": i,
"access_token": self.access_token,
"v": 5.101,
}, session = session))
await asyncio.sleep(1)
result.append(execute)
responses = await asyncio.gather(*result)
print(responses, 'responses')
return 'Done'
async def posts_chunks_limit(self, data, limit):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(data), limit):
yield data[i:i + limit]