Python Asyncio run_forever() 和任务
Python Asyncio run_forever() and Tasks
我修改了此代码以在异步中使用 Google Cloud PubSub Python:https://github.com/cloudfind/google-pubsub-asyncio
import asyncio
import datetime
import functools
import os
from google.cloud import pubsub
from google.gax.errors import RetryError
from grpc import StatusCode
async def message_producer():
""" Publish messages which consist of the current datetime """
while True:
await asyncio.sleep(0.1)
async def proc_message(message):
await asyncio.sleep(0.1)
print(message)
message.ack()
def main():
""" Main program """
loop = asyncio.get_event_loop()
topic = "projects/{project_id}/topics/{topic}".format(
project_id=PROJECT, topic=TOPIC)
subscription_name = "projects/{project_id}/subscriptions/{subscription}".format(
project_id=PROJECT, subscription=SUBSCRIPTION)
subscription = make_subscription(
topic, subscription_name)
def create_proc_message_task(message):
""" Callback handler for the subscription; schedule a task on the event loop """
print("Task created!")
task = loop.create_task(proc_message(message))
subscription.open(create_proc_message_task)
# Produce some messages to consume
loop.create_task(message_producer())
print("Subscribed, let's do this!")
loop.run_forever()
def make_subscription(topic, subscription_name):
""" Make a publisher and subscriber client, and create the necessary resources """
subscriber = pubsub.SubscriberClient()
try:
subscriber.create_subscription(subscription_name, topic)
except:
pass
subscription = subscriber.subscribe(subscription_name)
return subscription
if __name__ == "__main__":
main()
我基本上去掉了发布码,只用订阅码。
但是,最初我没有包含 loop.create_task(message_producer())
行。我认为任务是按预期创建的,但实际上它们从来没有 运行 自己。只有当我添加上述行时,代码才能正确执行并创建所有任务 运行。是什么导致了这种行为?
PubSub 正在从另一个线程调用 create_proc_message_task
回调。由于 create_task
是 not thread-safe,因此只能从运行事件循环的线程(通常是主线程)调用它。要更正此问题,请将 loop.create_task(proc_message(message))
替换为 asyncio.run_coroutine_threadsafe(proc_message(message), loop)
,这样就不再需要 message_producer
。
至于为什么 message_producer
似乎修复了代码,请考虑 run_coroutine_threadsafe
与 create_task
相比做了两件额外的事情:
- 它以线程安全的方式运行,因此事件循环数据结构在并发完成时不会损坏。
- 它确保事件循环在尽可能快的机会唤醒,以便它可以处理新任务。
在你的例子中 create_task
将任务添加到循环的可运行队列(没有任何锁定),但未能确保唤醒,因为在事件循环线程中 运行 时不需要. message_producer
然后用于强制循环定期唤醒,这时它还检查并执行可运行的任务。
我修改了此代码以在异步中使用 Google Cloud PubSub Python:https://github.com/cloudfind/google-pubsub-asyncio
import asyncio
import datetime
import functools
import os
from google.cloud import pubsub
from google.gax.errors import RetryError
from grpc import StatusCode
async def message_producer():
""" Publish messages which consist of the current datetime """
while True:
await asyncio.sleep(0.1)
async def proc_message(message):
await asyncio.sleep(0.1)
print(message)
message.ack()
def main():
""" Main program """
loop = asyncio.get_event_loop()
topic = "projects/{project_id}/topics/{topic}".format(
project_id=PROJECT, topic=TOPIC)
subscription_name = "projects/{project_id}/subscriptions/{subscription}".format(
project_id=PROJECT, subscription=SUBSCRIPTION)
subscription = make_subscription(
topic, subscription_name)
def create_proc_message_task(message):
""" Callback handler for the subscription; schedule a task on the event loop """
print("Task created!")
task = loop.create_task(proc_message(message))
subscription.open(create_proc_message_task)
# Produce some messages to consume
loop.create_task(message_producer())
print("Subscribed, let's do this!")
loop.run_forever()
def make_subscription(topic, subscription_name):
""" Make a publisher and subscriber client, and create the necessary resources """
subscriber = pubsub.SubscriberClient()
try:
subscriber.create_subscription(subscription_name, topic)
except:
pass
subscription = subscriber.subscribe(subscription_name)
return subscription
if __name__ == "__main__":
main()
我基本上去掉了发布码,只用订阅码。
但是,最初我没有包含 loop.create_task(message_producer())
行。我认为任务是按预期创建的,但实际上它们从来没有 运行 自己。只有当我添加上述行时,代码才能正确执行并创建所有任务 运行。是什么导致了这种行为?
PubSub 正在从另一个线程调用 create_proc_message_task
回调。由于 create_task
是 not thread-safe,因此只能从运行事件循环的线程(通常是主线程)调用它。要更正此问题,请将 loop.create_task(proc_message(message))
替换为 asyncio.run_coroutine_threadsafe(proc_message(message), loop)
,这样就不再需要 message_producer
。
至于为什么 message_producer
似乎修复了代码,请考虑 run_coroutine_threadsafe
与 create_task
相比做了两件额外的事情:
- 它以线程安全的方式运行,因此事件循环数据结构在并发完成时不会损坏。
- 它确保事件循环在尽可能快的机会唤醒,以便它可以处理新任务。
在你的例子中 create_task
将任务添加到循环的可运行队列(没有任何锁定),但未能确保唤醒,因为在事件循环线程中 运行 时不需要. message_producer
然后用于强制循环定期唤醒,这时它还检查并执行可运行的任务。