异步子进程 Popen python 3.5

asynchronous subprocess Popen python 3.5

我正在尝试从子进程异步 运行 Popen 命令,这样我就可以在后台 运行 其他东西。

import subprocess
import requests
import asyncio
import asyncio.subprocess    

    async def x(message):
        if len(message.content.split()) > 1:
            #output = asyncio.create_subprocess_shell(message.content[3:], shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
            output = subprocess.Popen(message.content[3:], shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
            return output.communicate()[0].decode('utf-8')

我试图理解 https://docs.python.org/3/library/asyncio-subprocess.html 但我不确定什么是协议工厂。

使用 python 3.5 测试。有问题就问。

import threading
import time
import subprocess
import shlex
from sys import stdout


# Only data wihtin a class are actually shared by the threads.
# Let's use a class as communicator (there could be problems if you have more than
# a single thread)
class Communicator(object):
    counter = 0
    stop = False
    arg = None
    result = None

# Here we can define what you want to do. There are other methods to do that
# but this is the one I prefer.
class ThreadedFunction(threading.Thread):

    def run(self, *args, **kwargs):
        super().run()
        command = c.arg

        # Here what you want to do...
        command = shlex.split(command)
        print(time.time()) # this is just to check that the command (sleep 5) is executed
        output = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
        print('\n',time.time())
        c.result = output
        if c.stop: return None # This is useful only within loops within threads

# Create a class instance
c = Communicator()
c.arg = 'time sleep 5' # Here I used the 'time' only to have some output

# Create the thread and start it
t = ThreadedFunction()
t.start() # Start the thread and do something else...

# ...for example count the seconds in the mean time..
try:
    for j in range(100):
        c.counter += 1
        stdout.write('\r{:}'.format(c.counter))
        stdout.flush()
        time.sleep(1)
        if c.result != None:
            print(c.result)
            break
except:
    c.stop = True

这个简单多了,我是在另一个回复之后发现的,反正很有趣...所以我就放弃了。

import time
import subprocess
import shlex
from sys import stdout


command = 'time sleep 5' # Here I used the 'time' only to have some output

def x(command):
    cmd = shlex.split(command)
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    return p

# Start the subprocess and do something else...
p = x(command)
# ...for example count the seconds in the mean time..

try: # This take care of killing the subprocess if problems occur
    for j in range(100):
        stdout.write('\r{:}'.format(j))
        stdout.flush()
        time.sleep(1)
        if p.poll() != None:
            print(p.communicate())
            break
except:
    p.terminate() # or p.kill()

从 python 脚本在后台进程运行 sleep 命令时在 stdout 上打印计数器值这一事实可以看出异步性。 python 脚本在 ~5 秒打印 bash time 命令的输出后退出,同时打印计数器的事实证明该脚本有效。

我最终找到了我的问题的答案,它利用了异步。 http://pastebin.com/Zj8SK1CG

当我来到这个问题时,我希望答案真正使用 asyncio 进行进程间通信。

我发现以下资源很有用: https://github.com/python/asyncio/blob/master/examples/child_process.py

以下是我的简化示例(使用 3.5+ async/await 语法),它读取行并将它们排序输出:

import asyncio

from subprocess import Popen, PIPE


async def connect_write_pipe(file):
    """Return a write-only transport wrapping a writable pipe"""
    loop = asyncio.get_event_loop()
    transport, _ = await loop.connect_write_pipe(asyncio.Protocol, file)
    return transport


async def connect_read_pipe(file):
    """Wrap a readable pipe in a stream"""
    loop = asyncio.get_event_loop()
    stream_reader = asyncio.StreamReader(loop=loop)

    def factory():
        return asyncio.StreamReaderProtocol(stream_reader)

    transport, _ = await loop.connect_read_pipe(factory, file)
    return stream_reader, transport


async def main(loop):
    # start subprocess and wrap stdin, stdout, stderr
    p = Popen(['/usr/bin/sort'], stdin=PIPE, stdout=PIPE, stderr=PIPE)

    stdin = await connect_write_pipe(p.stdin)
    stdout, stdout_transport = await connect_read_pipe(p.stdout)
    stderr, stderr_transport = await connect_read_pipe(p.stderr)

    # interact with subprocess
    name = {stdout: 'OUT', stderr: 'ERR'}
    registered = {
        asyncio.Task(stderr.read()): stderr,
        asyncio.Task(stdout.read()): stdout
    }

    to_sort = b"one\ntwo\nthree\n"
    stdin.write(to_sort)
    stdin.close()  # this way we tell we do not have anything else

    # get and print lines from stdout, stderr
    timeout = None
    while registered:
        done, pending = await asyncio.wait(
            registered, timeout=timeout,
            return_when=asyncio.FIRST_COMPLETED)
        if not done:
            break
        for f in done:
            stream = registered.pop(f)
            res = f.result()
            if res != b'':
                print(name[stream], res.decode('ascii').rstrip())
                registered[asyncio.Task(stream.read())] = stream
        timeout = 0.0

    stdout_transport.close()
    stderr_transport.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

注意:如果不采取特殊措施,写入管道的数据量是有限的。在我的系统中,可以在用完管道缓冲区之前写入超过 700000 个字节。

那里还有其他示例,使用 create_subprocess_shell

我还没有在实际项目中使用过asyncio,欢迎评论区提出改进建议。

这是正确的方法...!使用

async/await

在 Python - 3.X [Windows, MacOS]

上进行了测试
import asyncio  from asyncio.subprocess import PIPE, STDOUT  import subprocess  import signal


def signal_handler(signal, frame):
    loop.stop()
    client.close()
    sys.exit(0)

async def run_async(loop = ''):
    cmd = 'sudo long_running_cmd --opt1=AAAA --opt2=BBBB'

    print ("[INFO] Starting script...")
    await asyncio.create_subprocess_shell(cmd1, stdin = PIPE, stdout = PIPE, stderr = STDOUT)
    print("[INFO] Script is complete.")


loop = asyncio.get_event_loop()  signal.signal(signal.SIGINT, signal_handler)  tasks = [loop.create_task(run_async())]  wait_tasks = asyncio.wait(tasks)  loop.run_until_complete(wait_tasks)

loop.close()

核心逻辑:

process = await asyncio.create_subprocess_shell(cmd1, stdin = PIPE, stdout PIPE, stderr = STDOUT)
await process.wait()