如何使用 Asyncio 在 3 个子进程(使用管道)之间流式处理数据并使用结果数据
How to use Asyncio to stream process data between 3 subprocesses (using pipes) and consume the resulting data
我有 3 个脚本需要合并才能处理管道中的数据。脚本 运行 永远,直到执行被用户中断。这是它们在终端内的执行方式:
script1_producer.sh | script2_processor.sh | script3_processor.sh
script1_producer.sh
生成要处理的数据(例如它只打印递增的数字)
i=1
while true; do
echo $i
i=$(($i+1))
sleep 1
done
script2_processor.sh
从Script1中消耗数据并计算出新的数据流(每个数字乘以*2):
while read -r line
do
echo "$(($line*2))"
done < "${1:-/dev/stdin}"
script3_processor.sh
使用 Script2 中的数据并计算新的数据流(为每个数字添加一个字母):
while read -r line
do
echo "A$(($line))"
done < "${1:-/dev/stdin}"
运行宁script1_producer.sh | script2_processor.sh | script3_processor.sh
时的结果输出:
A2
A4
A6
...
现在我希望这些脚本由 Python 使用管道的子进程控制。
最后我需要处理 script3_processor.sh
的输出并对每一行执行操作。
我正在尝试使用 asyncio 来实现它,尽管如果可能的话不使用 asyncio 也可以。
这是我的-非常幼稚的尝试process_pipes.py
:
import asyncio
import subprocess
import os
async def async_receive():
p1 = await asyncio.create_subprocess_exec(
"./script1_producer.sh",
stdout=subprocess.PIPE,
)
p2 = await asyncio.create_subprocess_exec(
"./script2_processor.sh",
stdin=p1.stdout,
stdout=subprocess.PIPE,
)
p3 = await asyncio.create_subprocess_exec(
"./script3_processor.sh",
stdin=p2.stdout,
stdout=subprocess.PIPE,
)
# Read just one line to test
data = await p3.stdout.readline()
print(data)
asyncio.run(async_receive())
不幸的是,我在执行此脚本时遇到以下异常:
Traceback (most recent call last):
File "process_pipes.py", line 28, in <module>
asyncio.run(async_receive())
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "process_pipes.py", line 12, in async_receive
p2 = await asyncio.create_subprocess_exec(
File "/usr/lib/python3.8/asyncio/subprocess.py", line 236, in create_subprocess_exec
transport, protocol = await loop.subprocess_exec(
File "/usr/lib/python3.8/asyncio/base_events.py", line 1630, in subprocess_exec
transport = await self._make_subprocess_transport(
File "/usr/lib/python3.8/asyncio/unix_events.py", line 197, in _make_subprocess_transport
transp = _UnixSubprocessTransport(self, protocol, args, shell,
File "/usr/lib/python3.8/asyncio/base_subprocess.py", line 36, in __init__
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
File "/usr/lib/python3.8/asyncio/unix_events.py", line 789, in _start
self._proc = subprocess.Popen(
File "/usr/lib/python3.8/subprocess.py", line 808, in __init__
errread, errwrite) = self._get_handles(stdin, stdout, stderr)
File "/usr/lib/python3.8/subprocess.py", line 1477, in _get_handles
p2cread = stdin.fileno()
AttributeError: 'StreamReader' object has no attribute 'fileno'
我在 Whosebug 和其他地方阅读了一些示例,告诉我以不同方式处理管道,但无法在我的场景中使用这些示例。
如何模仿运行ning script1_producer.sh | script2_processor.sh | script3_processor.sh
并处理Python中script3的输出?
这是在没有 asyncio 的情况下解决问题的方法 - 只需将 Popen 与 shell=True
一起使用并将管道放入命令中:
import subprocess
import os
def receive():
p = subprocess.Popen(
"./script1_producer.sh "
"| ./script2_processor.sh "
"| ./script3_processor.sh",
stdout=subprocess.PIPE, shell=True)
while True:
line = p.stdout.readline()
if line:
print(line.decode().strip())
if __name__ == '__main__':
receive()
我找到了另一个解决方案,通过这个问题指导我:
在此之前,要注意的一件事是脚本存在语法错误,因为在 echo "$(($line*2))"
这样的行中,它应该有更多的空格,像这样 echo "$(( $line * 2 ))"
,bash 对空格有点傻。除此之外,一切都好。
这里要记住的一件事是,管道有两端,一端用于 read,另一端用于 write。所以在第一个过程中,它就像这个草图:
- 写完(WE)
- 阅读结束(RE)
p0 ---> | pipe 1 | ---> p1
WE RE
您应该使用来自 os
的管道,如上述问题中所述。这部分将是这样的:
read1, write1 = os.pipe()
p0 = await asyncio.create_subprocess_exec(
"./script1_producer.sh",
stdout=write1
)
stdout 将是管道的 WE,而对于 p1 我们有
| pipe 1 | ---> p1 -------> | pipe 2|
WE RE=stdin stdout=WE
stdin 是第一个管道的 RE,stdout 是第二个管道的 WE,像这样:
read2, write2 = os.pipe()
p2 = await asyncio.create_subprocess_exec(
"./script2_processor.sh",
stdin=read1,
stdout=write2,
)
而在第三个过程中
| pipe 2 | ---> p3 -------> | asyncio PIPE|
WE RE=stdin stdout=WE
我们一起加入
import asyncio
import subprocess
import os
async def async_receive():
read1, write1 = os.pipe()
p0 = await asyncio.create_subprocess_exec(
"./script1_producer.sh",
stdout=write1
)
read2, write2 = os.pipe()
p2 = await asyncio.create_subprocess_exec(
"./script2_processor.sh",
stdin=read1,
stdout=write2,
)
p3 = await asyncio.create_subprocess_exec(
"./script3_processor.sh",
stdin=read2,
stdout=asyncio.subprocess.PIPE,
)
# Read just one line to test
while True:
data = await p3.stdout.readline()
data = data.decode('ascii').rstrip()
print(data)
print("Sleeping 1 sec...")
await asyncio.sleep(1)
asyncio.run(async_receive())
这样你仍然可以使用asyncio。
我有 3 个脚本需要合并才能处理管道中的数据。脚本 运行 永远,直到执行被用户中断。这是它们在终端内的执行方式:
script1_producer.sh | script2_processor.sh | script3_processor.sh
script1_producer.sh
生成要处理的数据(例如它只打印递增的数字)
i=1
while true; do
echo $i
i=$(($i+1))
sleep 1
done
script2_processor.sh
从Script1中消耗数据并计算出新的数据流(每个数字乘以*2):
while read -r line
do
echo "$(($line*2))"
done < "${1:-/dev/stdin}"
script3_processor.sh
使用 Script2 中的数据并计算新的数据流(为每个数字添加一个字母):
while read -r line
do
echo "A$(($line))"
done < "${1:-/dev/stdin}"
运行宁script1_producer.sh | script2_processor.sh | script3_processor.sh
时的结果输出:
A2
A4
A6
...
现在我希望这些脚本由 Python 使用管道的子进程控制。
最后我需要处理 script3_processor.sh
的输出并对每一行执行操作。
我正在尝试使用 asyncio 来实现它,尽管如果可能的话不使用 asyncio 也可以。
这是我的-非常幼稚的尝试process_pipes.py
:
import asyncio
import subprocess
import os
async def async_receive():
p1 = await asyncio.create_subprocess_exec(
"./script1_producer.sh",
stdout=subprocess.PIPE,
)
p2 = await asyncio.create_subprocess_exec(
"./script2_processor.sh",
stdin=p1.stdout,
stdout=subprocess.PIPE,
)
p3 = await asyncio.create_subprocess_exec(
"./script3_processor.sh",
stdin=p2.stdout,
stdout=subprocess.PIPE,
)
# Read just one line to test
data = await p3.stdout.readline()
print(data)
asyncio.run(async_receive())
不幸的是,我在执行此脚本时遇到以下异常:
Traceback (most recent call last):
File "process_pipes.py", line 28, in <module>
asyncio.run(async_receive())
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "process_pipes.py", line 12, in async_receive
p2 = await asyncio.create_subprocess_exec(
File "/usr/lib/python3.8/asyncio/subprocess.py", line 236, in create_subprocess_exec
transport, protocol = await loop.subprocess_exec(
File "/usr/lib/python3.8/asyncio/base_events.py", line 1630, in subprocess_exec
transport = await self._make_subprocess_transport(
File "/usr/lib/python3.8/asyncio/unix_events.py", line 197, in _make_subprocess_transport
transp = _UnixSubprocessTransport(self, protocol, args, shell,
File "/usr/lib/python3.8/asyncio/base_subprocess.py", line 36, in __init__
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
File "/usr/lib/python3.8/asyncio/unix_events.py", line 789, in _start
self._proc = subprocess.Popen(
File "/usr/lib/python3.8/subprocess.py", line 808, in __init__
errread, errwrite) = self._get_handles(stdin, stdout, stderr)
File "/usr/lib/python3.8/subprocess.py", line 1477, in _get_handles
p2cread = stdin.fileno()
AttributeError: 'StreamReader' object has no attribute 'fileno'
我在 Whosebug 和其他地方阅读了一些示例,告诉我以不同方式处理管道,但无法在我的场景中使用这些示例。
如何模仿运行ning script1_producer.sh | script2_processor.sh | script3_processor.sh
并处理Python中script3的输出?
这是在没有 asyncio 的情况下解决问题的方法 - 只需将 Popen 与 shell=True
一起使用并将管道放入命令中:
import subprocess
import os
def receive():
p = subprocess.Popen(
"./script1_producer.sh "
"| ./script2_processor.sh "
"| ./script3_processor.sh",
stdout=subprocess.PIPE, shell=True)
while True:
line = p.stdout.readline()
if line:
print(line.decode().strip())
if __name__ == '__main__':
receive()
我找到了另一个解决方案,通过这个问题指导我:
在此之前,要注意的一件事是脚本存在语法错误,因为在 echo "$(($line*2))"
这样的行中,它应该有更多的空格,像这样 echo "$(( $line * 2 ))"
,bash 对空格有点傻。除此之外,一切都好。
这里要记住的一件事是,管道有两端,一端用于 read,另一端用于 write。所以在第一个过程中,它就像这个草图:
- 写完(WE)
- 阅读结束(RE)
p0 ---> | pipe 1 | ---> p1
WE RE
您应该使用来自 os
的管道,如上述问题中所述。这部分将是这样的:
read1, write1 = os.pipe()
p0 = await asyncio.create_subprocess_exec(
"./script1_producer.sh",
stdout=write1
)
stdout 将是管道的 WE,而对于 p1 我们有
| pipe 1 | ---> p1 -------> | pipe 2|
WE RE=stdin stdout=WE
stdin 是第一个管道的 RE,stdout 是第二个管道的 WE,像这样:
read2, write2 = os.pipe()
p2 = await asyncio.create_subprocess_exec(
"./script2_processor.sh",
stdin=read1,
stdout=write2,
)
而在第三个过程中
| pipe 2 | ---> p3 -------> | asyncio PIPE|
WE RE=stdin stdout=WE
我们一起加入
import asyncio
import subprocess
import os
async def async_receive():
read1, write1 = os.pipe()
p0 = await asyncio.create_subprocess_exec(
"./script1_producer.sh",
stdout=write1
)
read2, write2 = os.pipe()
p2 = await asyncio.create_subprocess_exec(
"./script2_processor.sh",
stdin=read1,
stdout=write2,
)
p3 = await asyncio.create_subprocess_exec(
"./script3_processor.sh",
stdin=read2,
stdout=asyncio.subprocess.PIPE,
)
# Read just one line to test
while True:
data = await p3.stdout.readline()
data = data.decode('ascii').rstrip()
print(data)
print("Sleeping 1 sec...")
await asyncio.sleep(1)
asyncio.run(async_receive())
这样你仍然可以使用asyncio。