使用 Asyncio 创建新的 Python 进程
Using Asyncio to create new Python Processes
我正在设置一个函数以异步启动一个新进程到 运行 一个非常 cpu 繁重的函数。大多数文档都没有彻底涵盖这一点,而且我拼凑起来的内容似乎无法异步工作。
我有一个函数 procManager
,它接受一个函数、要传递给函数的参数以及用于基本日志记录的对象名称。
async def procManager(f,a,o):
print(f"{o} started at {time.strftime('%X')}")
p = Process(target=f, args=(a,))
p_parent = os.getppid() # parent process
p_curr = os.getpid() # current process
print("parent process:", p_parent)
print("current process:", p_curr)
p.start()
p.join()
print(f"{o} finished at {time.strftime('%X')}")
print("=========")
我有这个 cpu 重要功能,运行s Louvain 在我传递给 def procManager
的 networkX 图上进行社区检测以生成新进程。
def community(cg):
start = timer()
partition = c.best_partition(cg) #default louvain community detection
v = {} #create dict to group nodes by community
for key, value in sorted(partition.items()):
v.setdefault(value, []).append(key)
stop = timer()
print(stop-start)
主要功能看起来是这样的。我正在分别初始化 3000 和 1000 个节点的 2 个图 A 和 B,平均度数为 5。我正在使用 Jupyter notebook 来 运行 这个,所以我使用 await main()
而不是 asyncio.run
.
A = nx.barabasi_albert_graph(3000,5)
B = nx.barabasi_albert_graph(1000,5)
async def main():
task1 = asyncio.create_task(
procManager(community, A, "A"))
task2 = asyncio.create_task(
procManager(community, B, "B"))
print("async start")
await main()
我想要做的是让 A 和 B 异步处理(即同时开始)但在不同的进程上。当前输出看起来像这样,其中 A 和 B 在新进程上处理但处于阻塞状态。我需要以异步方式计算 A 和 B 社区,因为它们将由 rabbitMQ 流触发并且响应需要是非阻塞的。
async done
A started at 06:03:48
parent process: 5783
current process: 12121
11.424800566000158
A finished at 06:03:59
=========
B started at 06:03:59
parent process: 5783
current process: 12121
0.037437027999885686
B finished at 06:03:59
=========
希望大家帮帮忙!
在你的情况下,问题是 join()
方法。它阻塞直到进程完成。此外,您甚至不需要 asyncio
。看看这个简单的例子:
import time
from multiprocessing import Process
def procManager(f,a,o):
print(f"{o} started at {time.strftime('%X')}")
p = Process(target=f, args=(a,))
p.start()
# p.join()
print(f"{o} finished at {time.strftime('%X')}") # This will occur immediately
print("=========")
def community(cg):
for i in range(10):
print("%s - %s" %(cg, i))
time.sleep(1)
procManager(community, "This is A", "A")
procManager(community, "This is B", "B")
这应该能让您了解如何解决您的问题。希望对您有所帮助!
关于Asyncio,需要用到asyncio.create_task
方法。这种方法的诀窍是你应该只指定你声明为异步的函数。为了 运行 他们,你应该使用 await asyncio.gather
.
示例为:
import asyncio
async def print_hello(name):
print("Hello! {}".format(name))
name_list = ["billy", "bob", "buffalo bob"]
for item in name_list:
await asyncio.gather(print_hello(item))
使用 asyncio 创建和 运行 宁子进程的最简单形式是 create_task 方法,如下所述:Asyncio Docs
希望对您有所帮助!
我正在设置一个函数以异步启动一个新进程到 运行 一个非常 cpu 繁重的函数。大多数文档都没有彻底涵盖这一点,而且我拼凑起来的内容似乎无法异步工作。
我有一个函数 procManager
,它接受一个函数、要传递给函数的参数以及用于基本日志记录的对象名称。
async def procManager(f,a,o):
print(f"{o} started at {time.strftime('%X')}")
p = Process(target=f, args=(a,))
p_parent = os.getppid() # parent process
p_curr = os.getpid() # current process
print("parent process:", p_parent)
print("current process:", p_curr)
p.start()
p.join()
print(f"{o} finished at {time.strftime('%X')}")
print("=========")
我有这个 cpu 重要功能,运行s Louvain 在我传递给 def procManager
的 networkX 图上进行社区检测以生成新进程。
def community(cg):
start = timer()
partition = c.best_partition(cg) #default louvain community detection
v = {} #create dict to group nodes by community
for key, value in sorted(partition.items()):
v.setdefault(value, []).append(key)
stop = timer()
print(stop-start)
主要功能看起来是这样的。我正在分别初始化 3000 和 1000 个节点的 2 个图 A 和 B,平均度数为 5。我正在使用 Jupyter notebook 来 运行 这个,所以我使用 await main()
而不是 asyncio.run
.
A = nx.barabasi_albert_graph(3000,5)
B = nx.barabasi_albert_graph(1000,5)
async def main():
task1 = asyncio.create_task(
procManager(community, A, "A"))
task2 = asyncio.create_task(
procManager(community, B, "B"))
print("async start")
await main()
我想要做的是让 A 和 B 异步处理(即同时开始)但在不同的进程上。当前输出看起来像这样,其中 A 和 B 在新进程上处理但处于阻塞状态。我需要以异步方式计算 A 和 B 社区,因为它们将由 rabbitMQ 流触发并且响应需要是非阻塞的。
async done
A started at 06:03:48
parent process: 5783
current process: 12121
11.424800566000158
A finished at 06:03:59
=========
B started at 06:03:59
parent process: 5783
current process: 12121
0.037437027999885686
B finished at 06:03:59
=========
希望大家帮帮忙!
在你的情况下,问题是 join()
方法。它阻塞直到进程完成。此外,您甚至不需要 asyncio
。看看这个简单的例子:
import time
from multiprocessing import Process
def procManager(f,a,o):
print(f"{o} started at {time.strftime('%X')}")
p = Process(target=f, args=(a,))
p.start()
# p.join()
print(f"{o} finished at {time.strftime('%X')}") # This will occur immediately
print("=========")
def community(cg):
for i in range(10):
print("%s - %s" %(cg, i))
time.sleep(1)
procManager(community, "This is A", "A")
procManager(community, "This is B", "B")
这应该能让您了解如何解决您的问题。希望对您有所帮助!
关于Asyncio,需要用到asyncio.create_task
方法。这种方法的诀窍是你应该只指定你声明为异步的函数。为了 运行 他们,你应该使用 await asyncio.gather
.
示例为:
import asyncio
async def print_hello(name):
print("Hello! {}".format(name))
name_list = ["billy", "bob", "buffalo bob"]
for item in name_list:
await asyncio.gather(print_hello(item))
使用 asyncio 创建和 运行 宁子进程的最简单形式是 create_task 方法,如下所述:Asyncio Docs
希望对您有所帮助!