异步子进程 Popen python 3.5
asynchronous subprocess Popen python 3.5
我正在尝试从子进程异步 运行 Popen 命令,这样我就可以在后台 运行 其他东西。
import subprocess
import requests
import asyncio
import asyncio.subprocess
async def x(message):
if len(message.content.split()) > 1:
#output = asyncio.create_subprocess_shell(message.content[3:], shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
output = subprocess.Popen(message.content[3:], shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
return output.communicate()[0].decode('utf-8')
我试图理解 https://docs.python.org/3/library/asyncio-subprocess.html 但我不确定什么是协议工厂。
使用 python 3.5 测试。有问题就问。
import threading
import time
import subprocess
import shlex
from sys import stdout
# Only data wihtin a class are actually shared by the threads.
# Let's use a class as communicator (there could be problems if you have more than
# a single thread)
class Communicator(object):
counter = 0
stop = False
arg = None
result = None
# Here we can define what you want to do. There are other methods to do that
# but this is the one I prefer.
class ThreadedFunction(threading.Thread):
def run(self, *args, **kwargs):
super().run()
command = c.arg
# Here what you want to do...
command = shlex.split(command)
print(time.time()) # this is just to check that the command (sleep 5) is executed
output = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
print('\n',time.time())
c.result = output
if c.stop: return None # This is useful only within loops within threads
# Create a class instance
c = Communicator()
c.arg = 'time sleep 5' # Here I used the 'time' only to have some output
# Create the thread and start it
t = ThreadedFunction()
t.start() # Start the thread and do something else...
# ...for example count the seconds in the mean time..
try:
for j in range(100):
c.counter += 1
stdout.write('\r{:}'.format(c.counter))
stdout.flush()
time.sleep(1)
if c.result != None:
print(c.result)
break
except:
c.stop = True
这个简单多了,我是在另一个回复之后发现的,反正很有趣...所以我就放弃了。
import time
import subprocess
import shlex
from sys import stdout
command = 'time sleep 5' # Here I used the 'time' only to have some output
def x(command):
cmd = shlex.split(command)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return p
# Start the subprocess and do something else...
p = x(command)
# ...for example count the seconds in the mean time..
try: # This take care of killing the subprocess if problems occur
for j in range(100):
stdout.write('\r{:}'.format(j))
stdout.flush()
time.sleep(1)
if p.poll() != None:
print(p.communicate())
break
except:
p.terminate() # or p.kill()
从 python 脚本在后台进程运行 sleep
命令时在 stdout 上打印计数器值这一事实可以看出异步性。 python 脚本在 ~5 秒打印 bash time
命令的输出后退出,同时打印计数器的事实证明该脚本有效。
我最终找到了我的问题的答案,它利用了异步。
http://pastebin.com/Zj8SK1CG
当我来到这个问题时,我希望答案真正使用 asyncio 进行进程间通信。
我发现以下资源很有用:
https://github.com/python/asyncio/blob/master/examples/child_process.py
以下是我的简化示例(使用 3.5+ async/await 语法),它读取行并将它们排序输出:
import asyncio
from subprocess import Popen, PIPE
async def connect_write_pipe(file):
"""Return a write-only transport wrapping a writable pipe"""
loop = asyncio.get_event_loop()
transport, _ = await loop.connect_write_pipe(asyncio.Protocol, file)
return transport
async def connect_read_pipe(file):
"""Wrap a readable pipe in a stream"""
loop = asyncio.get_event_loop()
stream_reader = asyncio.StreamReader(loop=loop)
def factory():
return asyncio.StreamReaderProtocol(stream_reader)
transport, _ = await loop.connect_read_pipe(factory, file)
return stream_reader, transport
async def main(loop):
# start subprocess and wrap stdin, stdout, stderr
p = Popen(['/usr/bin/sort'], stdin=PIPE, stdout=PIPE, stderr=PIPE)
stdin = await connect_write_pipe(p.stdin)
stdout, stdout_transport = await connect_read_pipe(p.stdout)
stderr, stderr_transport = await connect_read_pipe(p.stderr)
# interact with subprocess
name = {stdout: 'OUT', stderr: 'ERR'}
registered = {
asyncio.Task(stderr.read()): stderr,
asyncio.Task(stdout.read()): stdout
}
to_sort = b"one\ntwo\nthree\n"
stdin.write(to_sort)
stdin.close() # this way we tell we do not have anything else
# get and print lines from stdout, stderr
timeout = None
while registered:
done, pending = await asyncio.wait(
registered, timeout=timeout,
return_when=asyncio.FIRST_COMPLETED)
if not done:
break
for f in done:
stream = registered.pop(f)
res = f.result()
if res != b'':
print(name[stream], res.decode('ascii').rstrip())
registered[asyncio.Task(stream.read())] = stream
timeout = 0.0
stdout_transport.close()
stderr_transport.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
注意:如果不采取特殊措施,写入管道的数据量是有限的。在我的系统中,可以在用完管道缓冲区之前写入超过 700000 个字节。
那里还有其他示例,使用 create_subprocess_shell
。
我还没有在实际项目中使用过asyncio,欢迎评论区提出改进建议。
这是正确的方法...!使用
async/await
在 Python - 3.X [Windows, MacOS]
上进行了测试
import asyncio from asyncio.subprocess import PIPE, STDOUT import subprocess import signal
def signal_handler(signal, frame):
loop.stop()
client.close()
sys.exit(0)
async def run_async(loop = ''):
cmd = 'sudo long_running_cmd --opt1=AAAA --opt2=BBBB'
print ("[INFO] Starting script...")
await asyncio.create_subprocess_shell(cmd1, stdin = PIPE, stdout = PIPE, stderr = STDOUT)
print("[INFO] Script is complete.")
loop = asyncio.get_event_loop() signal.signal(signal.SIGINT, signal_handler) tasks = [loop.create_task(run_async())] wait_tasks = asyncio.wait(tasks) loop.run_until_complete(wait_tasks)
loop.close()
核心逻辑:
process = await asyncio.create_subprocess_shell(cmd1, stdin = PIPE, stdout PIPE, stderr = STDOUT)
await process.wait()
我正在尝试从子进程异步 运行 Popen 命令,这样我就可以在后台 运行 其他东西。
import subprocess
import requests
import asyncio
import asyncio.subprocess
async def x(message):
if len(message.content.split()) > 1:
#output = asyncio.create_subprocess_shell(message.content[3:], shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
output = subprocess.Popen(message.content[3:], shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
return output.communicate()[0].decode('utf-8')
我试图理解 https://docs.python.org/3/library/asyncio-subprocess.html 但我不确定什么是协议工厂。
使用 python 3.5 测试。有问题就问。
import threading
import time
import subprocess
import shlex
from sys import stdout
# Only data wihtin a class are actually shared by the threads.
# Let's use a class as communicator (there could be problems if you have more than
# a single thread)
class Communicator(object):
counter = 0
stop = False
arg = None
result = None
# Here we can define what you want to do. There are other methods to do that
# but this is the one I prefer.
class ThreadedFunction(threading.Thread):
def run(self, *args, **kwargs):
super().run()
command = c.arg
# Here what you want to do...
command = shlex.split(command)
print(time.time()) # this is just to check that the command (sleep 5) is executed
output = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
print('\n',time.time())
c.result = output
if c.stop: return None # This is useful only within loops within threads
# Create a class instance
c = Communicator()
c.arg = 'time sleep 5' # Here I used the 'time' only to have some output
# Create the thread and start it
t = ThreadedFunction()
t.start() # Start the thread and do something else...
# ...for example count the seconds in the mean time..
try:
for j in range(100):
c.counter += 1
stdout.write('\r{:}'.format(c.counter))
stdout.flush()
time.sleep(1)
if c.result != None:
print(c.result)
break
except:
c.stop = True
这个简单多了,我是在另一个回复之后发现的,反正很有趣...所以我就放弃了。
import time
import subprocess
import shlex
from sys import stdout
command = 'time sleep 5' # Here I used the 'time' only to have some output
def x(command):
cmd = shlex.split(command)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return p
# Start the subprocess and do something else...
p = x(command)
# ...for example count the seconds in the mean time..
try: # This take care of killing the subprocess if problems occur
for j in range(100):
stdout.write('\r{:}'.format(j))
stdout.flush()
time.sleep(1)
if p.poll() != None:
print(p.communicate())
break
except:
p.terminate() # or p.kill()
从 python 脚本在后台进程运行 sleep
命令时在 stdout 上打印计数器值这一事实可以看出异步性。 python 脚本在 ~5 秒打印 bash time
命令的输出后退出,同时打印计数器的事实证明该脚本有效。
我最终找到了我的问题的答案,它利用了异步。 http://pastebin.com/Zj8SK1CG
当我来到这个问题时,我希望答案真正使用 asyncio 进行进程间通信。
我发现以下资源很有用: https://github.com/python/asyncio/blob/master/examples/child_process.py
以下是我的简化示例(使用 3.5+ async/await 语法),它读取行并将它们排序输出:
import asyncio
from subprocess import Popen, PIPE
async def connect_write_pipe(file):
"""Return a write-only transport wrapping a writable pipe"""
loop = asyncio.get_event_loop()
transport, _ = await loop.connect_write_pipe(asyncio.Protocol, file)
return transport
async def connect_read_pipe(file):
"""Wrap a readable pipe in a stream"""
loop = asyncio.get_event_loop()
stream_reader = asyncio.StreamReader(loop=loop)
def factory():
return asyncio.StreamReaderProtocol(stream_reader)
transport, _ = await loop.connect_read_pipe(factory, file)
return stream_reader, transport
async def main(loop):
# start subprocess and wrap stdin, stdout, stderr
p = Popen(['/usr/bin/sort'], stdin=PIPE, stdout=PIPE, stderr=PIPE)
stdin = await connect_write_pipe(p.stdin)
stdout, stdout_transport = await connect_read_pipe(p.stdout)
stderr, stderr_transport = await connect_read_pipe(p.stderr)
# interact with subprocess
name = {stdout: 'OUT', stderr: 'ERR'}
registered = {
asyncio.Task(stderr.read()): stderr,
asyncio.Task(stdout.read()): stdout
}
to_sort = b"one\ntwo\nthree\n"
stdin.write(to_sort)
stdin.close() # this way we tell we do not have anything else
# get and print lines from stdout, stderr
timeout = None
while registered:
done, pending = await asyncio.wait(
registered, timeout=timeout,
return_when=asyncio.FIRST_COMPLETED)
if not done:
break
for f in done:
stream = registered.pop(f)
res = f.result()
if res != b'':
print(name[stream], res.decode('ascii').rstrip())
registered[asyncio.Task(stream.read())] = stream
timeout = 0.0
stdout_transport.close()
stderr_transport.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
注意:如果不采取特殊措施,写入管道的数据量是有限的。在我的系统中,可以在用完管道缓冲区之前写入超过 700000 个字节。
那里还有其他示例,使用 create_subprocess_shell
。
我还没有在实际项目中使用过asyncio,欢迎评论区提出改进建议。
这是正确的方法...!使用
async/await
在 Python - 3.X [Windows, MacOS]
上进行了测试import asyncio from asyncio.subprocess import PIPE, STDOUT import subprocess import signal
def signal_handler(signal, frame):
loop.stop()
client.close()
sys.exit(0)
async def run_async(loop = ''):
cmd = 'sudo long_running_cmd --opt1=AAAA --opt2=BBBB'
print ("[INFO] Starting script...")
await asyncio.create_subprocess_shell(cmd1, stdin = PIPE, stdout = PIPE, stderr = STDOUT)
print("[INFO] Script is complete.")
loop = asyncio.get_event_loop() signal.signal(signal.SIGINT, signal_handler) tasks = [loop.create_task(run_async())] wait_tasks = asyncio.wait(tasks) loop.run_until_complete(wait_tasks)
loop.close()
核心逻辑:
process = await asyncio.create_subprocess_shell(cmd1, stdin = PIPE, stdout PIPE, stderr = STDOUT)
await process.wait()