使用带有回调的 Futures 时,带有异步期货的 InvalidStateError 和带有 aiohttp 的 RuntimeError

InvalidStateError with asyncio futures and RuntimeError with aiohttp when using Futures with callback

我是 asyncioaiohttp 的新手。我目前遇到此错误,不确定为什么我的 asyncio 未来会得到 InvalidStateError,而我的会话会得到 RuntimeError

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/runpy.py", line 184, in _run_module_as_main
    "__main__", mod_spec)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/tests/tracer.py", line 100, in <module>
    sys.exit(main(sys.argv))
  File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/tests/tracer.py", line 92, in main
    poster.post()
  File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/utils/poster.py", line 87, in post
    results = event_loop.run_until_complete(self.async_post_events(events))
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/utils/poster.py", line 79, in async_post_events
    task.add_done_callback(self.send_oracle, task.result(), session)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 268, in result
    raise InvalidStateError('Result is not ready.')
asyncio.futures.InvalidStateError: Result is not ready.
Task exception was never retrieved
future: <Task finished coro=<Poster.async_post_event() done, defined at /Users/bli1/Development/QE/idea/trinity-tracer/tracer/utils/poster.py:62> exception=RuntimeError('Session is closed',)>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/utils/poster.py", line 64, in async_post_event
    async with session.post(self.endpoint, data=event) as resp:
  File "/Users/bli1/Development/QE/idea/trinity-tracer/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__
    self._resp = yield from self._coro
  File "/Users/bli1/Development/QE/idea/trinity-tracer/lib/python3.5/site-packages/aiohttp/client.py", line 161, in _request
    raise RuntimeError('Session is closed')
RuntimeError: Session is closed

我想做的是 POST 到一个端点,然后使用相同的事件 POSTed 到 post 到另一个端点。这将是 运行 作为另一个 async 方法作为 callback

这是我的代码:

    async def async_post_event(self, event, session):
        async with session.post(self.endpoint, data=event) as resp:
            event["tracer"]["post"]["timestamp"] = time.time() * 1000.0
            event["tracer"]["post"]["statusCode"] = await resp.status
            return event

    async def send_oracle(self, event, session):
        async with session.post(self.oracle, data=event) as resp:
            return event["event"]["event_header"]["event_id"], await resp.status

    async def async_post_events(self, events):
        tasks = []
        conn = aiohttp.TCPConnector(verify_ssl=self.secure)
        async with aiohttp.ClientSession(connector=conn) as session:
            for event in events:
                task = asyncio.ensure_future(self.async_post_event(event, session))
                task.add_done_callback(self.send_oracle, task.result(), session)
                tasks.append(task)
        return await asyncio.gather(*tasks)

    def post(self):
        event_loop = asyncio.get_event_loop()
        try:
            events = [self.gen_random_event() for i in range(self.num_post)]
            results = event_loop.run_until_complete(self.async_post_events(events))
            print(results)
        finally:
            event_loop.close()

add_done_callback 接受 回调 ,而不是 协程 。 此外,它是非常低级别的一部分 API,临时开发人员应避免使用它。

但是你的主要错误是在 ClientSession 异步上下文管理器之外调用 session.post(),堆栈跟踪明确指向它。

我已经修改了你的代码片段以获得看起来像工作代码的东西:

async def async_post_event(self, event, session):
    async with session.post(self.endpoint, data=event) as resp:
        event["tracer"]["post"]["timestamp"] = time.time() * 1000.0
        event["tracer"]["post"]["statusCode"] = await resp.status
    async with session.post(self.oracle, data=event) as resp:
        return event["event"]["event_header"]["event_id"], await resp.status

async def async_post_events(self, events):
    coros = []
    conn = aiohttp.TCPConnector(verify_ssl=self.secure)
    async with aiohttp.ClientSession(connector=conn) as session:
        for event in events:
            coros.append(self.async_post_event(event, session))
        return await asyncio.gather(*coros)

def post(self):
    event_loop = asyncio.get_event_loop()
    try:
        events = [self.gen_random_event() for i in range(self.num_post)]
        results = event_loop.run_until_complete(self.async_post_events(events))
        print(results)
    finally:
        event_loop.close()

您可以从 async_post_event 中提取两个 posts 到单独的协程中,但主要思想保持不变。