Tornado 协程递增地从另一个协程产生

Tornado coroutine yield from another coroutine incrementally

我想重构我的 Tornado 应用程序的一部分,因此我为 returning phone 数字创建了一个特殊函数:

@gen.coroutine
def get_phones(self, listname):
    phones = []
    logging.info("fetching phones")
    cursor = self._mongo.contacts.aggregate(self.get_query(
        subscription_filter={
            "$ne": [{"$ifNull": ["$$subscription.events.{listname}", None]}, None]
        },
        handler_filter={
            "handler.meta.is_active": True,
            "handler.meta.type": "phone"
        }
    ))
    try:
        while (yield cursor.fetch_next):
            contact = cursor.next_object()
            logging.info(contact)
            try:
                phones += [handler['subject'] for handler in contact['handlers']]
                if len(phones) > 50:
                    yield phones
                    phones = []
            except Exception:
                self._logger.warning("Could not get phone no")
    except Exception:
        phones = []
        logging.warning("Could not fetch contacts")

    if len(phones) > 0:
        yield phones

我想用它实现的是从我的数据库中异步获取最多 50 个联系人的批次,并将它们 return 到调用协程。

这是我的调用协程:

@gen.coroutine
def on_heartbeat_status_update(self, status):
    phonegen = self.get_phones("ews:admin")
    logging.info(phonegen)
    while True:
        phones = yield phonegen
        logging.info(phones)
        if phones is None:
            break
        logging.info(len(phones))

它不起作用。 "phones" 总是 None。有人可以建议实现这一目标的正确方法吗?谢谢!

您必须使用 Python 3.6 本机协同程序才能工作。这是 Python 的第一个版本,它在同一函数中同时支持 yieldawait。没有这个,系统就不可能区分使用 yield 作为协程和生成结果作为生成器。

@gen.coroutine def替换为async def并在get_phones中使用await cursor.fetch_nextyield phones。然后就可以在on_heartbeat_status_update中使用async for phones in self.get_phones(...)了。