在 Powershell 中创建类 Unix 连续管道的最简单方法
Simplest way to create Unix-like continuous pipeline in Powershell
我有两个程序:
emitter.exe
- 运行 是一个无限循环,并一直在 stdout
上打印一条消息。
receiver.exe
- 也是 运行 一个无限循环,等待 stdin
上的消息并处理它们。
在 Unix-line 系统上,我可以 运行 类似的东西并且一切正常:
$ emitter | receiver
它当然不适用于 PowerShell。
PS 首先尝试缓冲整个 emitter
的输出,如果完成,则将其传递给 receiver
。在我们的例子中,由于两个程序中的无限循环,输出将永远不会被传递。
模拟类 Unix 行为的最简单解决方法是什么?
我想一个简单的方法是将 emitter.exe
和 receiver.exe
封装在指定为管道处理器的两个 PowerShell 函数中。
这是完整 PowerShell 中的一个简单示例脚本:
function emitter() {
[CmdletBinding(SupportsShouldProcess=$True)]
[OutputType([String])] Param ()
Begin {}
Process {
sleep -m 1000
Write-Output "FIRST"
sleep -m 5000
Write-Output "TWO"
sleep -m 4000
Write-Output "THREE"
}
End {}
}
function receiver() {
[CmdletBinding(SupportsShouldProcess=$True)]
[OutputType([String])]
Param ([Parameter(Mandatory=$true, ValueFromPipeline=$true)][String[]] $msg)
Begin {
Write-Host "Start"
}
Process {
Write-Host "Received $msg"
}
End {
Write-Host "quit"
}
}
emitter | receiver
将 emitter
函数中的 Write-Output
... 替换为对 emitter.exe
的调用,将 receiver
函数中的 Write-Host "Received $msg"
替换为对 receiver.exe
。
或者更好的是,将它们作为参数传递给这两个函数。对于接收器,您必须使用 System.Diagnostics.Process
,因为我们需要传输来自发射器的输出 即时 。内置的 Powerhell Start-Process
无法完成这项工作,因为它不允许对进程标准输入进行写操作(我一直在寻找解决方法,但直到现在都没有成功)。
编辑: WHERE.EXE
的输出通过管道传输到 FINDSTR /C:x
的示例实现:
function emitter() {
[CmdletBinding(SupportsShouldProcess=$True)]
[OutputType([String])] Param ([String] $cmd)
Begin {}
Process {
& $cmd
}
End {}
}
function receiver() {
[CmdletBinding(SupportsShouldProcess=$True)]
[OutputType([String])]
Param ([String] $cmd, [String] $cmdpar,
[Parameter(Mandatory=$false, ValueFromPipeline=$true)][String[]] $msg)
Begin {
$cmdinfo = $(Get-Command $cmd)
if (!$cmdinfo) {
Write-Host "Failed to find $cmd.."
exit;
}
$fullpath = $cmdinfo.Path
$ProcessInfo = New-Object System.Diagnostics.ProcessStartInfo
$ProcessInfo.FileName = $fullpath
$ProcessInfo.RedirectStandardError = $false
$ProcessInfo.RedirectStandardOutput = $false
$ProcessInfo.RedirectStandardInput = $true
$ProcessInfo.UseShellExecute = $false
$ProcessInfo.CreateNoWindow = $false;
$ProcessInfo.Arguments = $cmdpar
$Process = New-Object System.Diagnostics.Process
$Process.StartInfo = $ProcessInfo
if ($Process.Start()) {
$StreamWriter = $Process.StandardInput;
Write-Host "Start"
} else {
Write-Host "Failed to open $cmd.."
exit;
}
}
Process {
if ($StreamWriter) {
$StreamWriter.WriteLine([String]$msg);
$StreamWriter.Flush();
}
}
End {
if ($StreamWriter) {
$StreamWriter.Close();
$Process.WaitForExit();
Write-Host "quit"
}
}
}
emitter "where.exe" | receiver "findstr.exe" "/C:x"
这将根据您的语言环境输出类似 :
的内容
Start
The syntax of this command :
Examples :
WHERE /R c:\windows *.exe *.dll *.bat
quit
更新
这是一个复杂得多的 PowerShell Cmdlet,可用于菊花链外部命令 and/or PS cmdlet。它也可用于调试您的管道。它允许使用 PowerShell 流水线处理 进行连续流水线处理 (例如 ping -t 127.0.0.1 | emitter_recevier "more.com"
)。与 cmd.exe
等效时,它会带来一些性能成本。
事实上,不要尝试使用 sort.exe
.
的连续管道
带解释注释的代码:
# Unix-like daisy chaining of console commands with PowerShell pipeline processor.
#
#
# The wrapper function for external console command.
#
function emitter_receiver() {
[CmdletBinding(SupportsShouldProcess=$False)]
[OutputType([String])]
Param ([Parameter(Mandatory=$True,
HelpMessage="Command to execute, without parameters")][String] $cmd,
[Parameter(Mandatory=$False,
HelpMessage="Command parameters")][String] $cmdpar=$null,
[Parameter(Mandatory=$False,
HelpMessage="Pad buffered pipeline size")][Int32] $bufferedPipe=0,
[Parameter(Mandatory=$False,
HelpMessage="Debug pipeline flag")][Boolean] $debugPipe=$False,
[Parameter(Mandatory=$False, ValueFromPipeline=$True)][String[]] $msg)
<#
.SYNOPSIS
Allows Unix-like daisy chaining of pipelined commands with PowerShell pipeline.
.DESCRIPTION
Allows pipelining "on the fly" of external console commands with the
PowerShell pipe processing. For external console commands, the PowerShell
pipe processing waits for the termination of the command before piping its
output to the next channel, thus preventing any "tail"/pool usage type.
This function wraps the external console command to inject its input
and output in the PowerShell pipeline processor in a Unix-like way,
each member of the pipeline will be executed in parallel and will consume
outputs of previous member "on the fly" without a wait for their termination.
PowerShell 7.1 at least, not tested with previous release.
Indeed, don't try with sort.exe.
.PARAMETER cmd
Specify the external console command to execute, without any parameters.
The function will search the command with the Get-Command cmdlet, so a
relative path or only the command filename can be provided. The complete
name including extension must be provided.
.PARAMETER cmdpar
Specify a parameters string for the external console command to execute.
Optionnal, none by default.
.PARAMETER bufferedPipe
Size of the padding to add to the STDIN of command like findstr.exe.
Some commands has buffered input processing and do not process anything
until their input buffer is full. For findstr.exe, with a continous
input pipe, set it to 8000 for "fluent" output, otherwise findstr.exe
will wait to have 8kb of input before processing/outputting anything.
Optionnal, 0 by default. 8000 at a maximum, capped by default.
.PARAMETER debugPipe
Boolean flag to get a debug output on console for the pipe processing.
Optionnal, $False by default.
.EXAMPLES
A simple daisy chaning of common console commands :
ps> emitter_receiver "where.exe" | emitter_receiver "findstr.exe" "/C:x" | emitter_receiver "more.com"
> WHERE /R c:\windows *.exe *.dll *.bat
> examples :
> The syntax of this commande is :
The real interest of this function is for continuous pipeline of console commands output :
ps> emitter_receiver "ping" "-t 127.0.0.1" | emitter_receiver "more.com"
> pinging [127.0.0.1] with 32 bytes of data:
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
ps> emitter_receiver "ping.exe" "-t 127.0.0.1" | emitter_receiver "findstr.exe" "/C:TTL" 8000
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
ps> emitter_receiver "ping.exe" "-t 127.0.0.1" | emitter_receiver "findstr.exe" "/C:TTL" 8000 | emitter_receiver "more.com"
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
A direct "ping -t 127.0.0.1 | findstr /C:TTL" or "ping -t 127.0.0.1 | more" in PowerShell will hang.
The function can be used only in part of the pipeline for commands that can't handle continous input :
ps> ping -t 127.0.0.1 | emitter_receiver "more.com"
> pinging [127.0.0.1] with 32 bytes of data:
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
ps> ping.exe -t 127.0.0.1 | emitter_receiver "findstr.exe" "/C:TTL" 8000
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
.INPUTS
Pipelined input from a PowerShell pipeline processor.
.OUTPUTS
Pipelined output from the provided external console command.
.EXCEPTIONS
[System.IO.FileNotFoundException] "Failed to find .." : The provided command can't be found with Get-Command.
[System.Exception] "Failed to open .." : The provided command can't be launched or failed
[System.Exception] "Can't get a stack..." : Can't get a stack, memory and/or .Net related trouble
[System.Exception] "Input write to $cmd failed ..." : Can't write anymore in command STDIN
[System.Exception] "Output read from $cmd failed ..." : Can't read anymore from command STDOUT
[System.Exception] "Unexpected leave, pipe broken ?..." : Abnormal termination of the pipe processing
#>
Begin {
# If we fail, stop the pipe...
$ErrorActionPreference = "Stop"
# Look for the command in PATH
$cmdinfo = $(Get-Command $cmd)
if (!$cmdinfo -or !$cmdinfo.Path) {
throw [System.IO.FileNotFoundException] "Failed to find $cmd..";
}
$fullpath = $cmdinfo.Path
# Use of System.Diagnostics.Process to launch the command and redirect its STDIN/STDOUT
$ProcessInfo = New-Object System.Diagnostics.ProcessStartInfo
$ProcessInfo.FileName = $fullpath
$ProcessInfo.RedirectStandardError = $False
$ProcessInfo.RedirectStandardOutput = $True
$ProcessInfo.RedirectStandardInput = $True
$ProcessInfo.UseShellExecute = $False
$ProcessInfo.CreateNoWindow = $False;
if ($cmdpar) {
$ProcessInfo.Arguments = $cmdpar
}
$Process = New-Object System.Diagnostics.Process
$Process.StartInfo = $ProcessInfo
# Reference to the stream reader and writer for the command's STDOUT / STDIN processing
$StdInStreamWriter = $null;
$StdOutStreamReader = $null;
# Sleep time for polling STDIN. To not slow too much the pipeline processing,
# we take 1ms. We have to avoid stressing the CPU with the polling.
$SleepTime=1;
# Control of input parameters if any
if ($bufferedPipe -gt 8000) { $bufferedPipe = 8000; }
if ($bufferedPipe -lt 0) { $bufferedPipe = 0; }
# Launch the command
$ProcessStarted = $Process.Start()
if ($ProcessStarted) {
$StdInStreamWriter = $Process.StandardInput;
$StdOutStreamReader = $Process.StandardOutput;
$StdInStreamWriter.AutoFlush = $True;
} else {
throw [System.Exception] "Failed to open $cmd..";
}
# We use two FIFO stacks to exchange STDIN and STDOUT stream data between
# the PowerShell pipeline processing in the 'premise-style' context and the
# process we have launched for the command. We will feed them in the
# following two poll subprocesses for STDIN and STDOUT as no async operations
# are allowed in a 'premise-style' context.
$StdOutStack = new-object System.Collections.Queue;
if (!$StdOutStack) {
throw [System.Exception] "Can't get a stack..."
}
$StdInStack = new-object System.Collections.Queue;
if (!$StdInStack) {
throw [System.Exception] "Can't get a stack..."
}
# We create two poll subprocesses to read from STDOUT and write into STDIN
# of the process we have launched for the command. As theses subprocesses
# are started from the 'begin' pipeline part of the function, they can then
# makes STDOUT read that feed the STDOUT stack and STDIN write from the
# STDIN stack. Theses operations are not directly doable into the 'process'
# pipeline part of the function as it is a 'premise-style' context.
#
# STDOUT poll subprocess
#
$OutProc = [powershell]::Create().AddScript({
Param ([parameter(Mandatory=$True)]$args)
$Process = $args[0]
$StdOutStreamReader = $args[1]
$SleepTime = $args[2]
$cmd = $args[3]
$StdOutStack = $args[4]
$debugPipe = $args[5]
while (!$StdOutStreamReader.EndOfStream) {
$msgproc = $StdOutStreamReader.ReadLine()
Write-Output ($cmd+": OUT_S: "+[String]$msgproc)
try {
$syncStdOutStack = [System.Collections.Queue]::Synchronized($StdOutStack);
$syncStdOutStack.Enqueue($msgproc)
} finally {
$syncStdOutStack = $null
}
}
if ($debugPipe) { Write-Output ($cmd+": OUT_S terminated.") }
})
$tmp = $OutProc.AddParameter("args", @($Process, $StdOutStreamReader, $SleepTime, $cmd, $StdOutStack, $debugPipe))
$OutJob = $OutProc.BeginInvoke()
#
# STDIN poll subprocess
#
$InProc = [powershell]::Create().AddScript({
Param ([parameter(Mandatory=$True)]$args)
$Process = $args[0]
$StdInStreamWriter = $args[1]
$SleepTime = $args[2]
$cmd = $args[3]
$StdInStack = $args[4]
$debugPipe = $args[5]
$bufferedPipe = $args[6]
if ($bufferedPipe -gt 0) { $dumb_findstr = ("12"*4000).Replace("1", [char]27) }
while (!$Process.hasExited -and $StdInStreamWriter) {
try {
$syncStdInStack = [System.Collections.Queue]::Synchronized($StdInStack);
while ($syncStdInStack.Count -gt 0) {
$stack_msg = [String]$syncStdInStack.Dequeue();
$syncStdInStack = $null;
if ($debugPipe) { Write-Output ($cmd+": IN_S: "+[String]$stack_msg) }
#$check = $StdInStreamWriter.Write($dumb_findstr.Substring(0,$padsize))
$check = $StdInStreamWriter.WriteLine([String]$stack_msg)
if ($bufferedPipe -gt 0) {
# As some command (findstr...) can have buffered input in every case,
# we send a padding up to 4000 escape bytes just to overcome that...
$padsize = $bufferedPipe-$stack_msg.Length-($stack_msg.Length%2)
$check = $StdInStreamWriter.Write($dumb_findstr.Substring(0,$padsize))
}
};
} finally {
$syncStdInStack = $null;
}
# To not strain unusefully the CPU, we wait far more time between each
# flush to the stack. This does not affect really the performance as
# the incoming message are still stacked as we wait the 'Process'
# part of the function.
$tmp = [System.Threading.Thread]::Sleep($SleepTime * 5);
}
if ($debugPipe) { Write-Output ($cmd+": IN_S terminated.") }
})
$tmp = $InProc.AddParameter("args", @($Process, $StdInStreamWriter, $SleepTime, $cmd, $StdInStack, $debugPipe, $bufferedPipe))
$InJob = $InProc.BeginInvoke()
}
Process {
# If we are in the process part, that means there is input from the PowerShell
# pipeline to process. We send this input to the STDIN of the command through
# a FIFO stack and a polling subprocess. Each input from the PowerShell pipeline
# processor is injected in the command STDIN.
if ($StdInStreamWriter -and $StdInStack) {
try {
$syncStdInStack = [System.Collections.Queue]::Synchronized($StdInStack);
$syncStdInStack.Enqueue($msg)
if ($debugPipe) { Write-Host ($cmd+": IN_P "+$msg) }
} finally {
$syncStdInStack = $null
}
}
if ($debugPipe) { Write-Host ($cmd+": INST "+$InProc.InvocationStateInfo.State) }
if ($debugPipe) { Write-Host ($cmd+": OUST "+$OutProc.InvocationStateInfo.State) }
# While processing input from the pipe, we send on the fly command
# output to the pipeline processor.
if ($StdOutStack) {
try {
$syncStdOutStack = [System.Collections.Queue]::Synchronized($StdOutStack);
if ($debugPipe) { Write-Host ($cmd+": OUSTS "+$syncStdOutStack.Count) }
while ($syncStdOutStack.Count -gt 0) {
$stack_msg = [String]$syncStdOutStack.Dequeue();
if ($debugPipe) { Write-Host ($cmd+": OUT_P "+[String]$stack_msg) }
Write-Output ([String]$stack_msg)
};
} finally {
$syncStdOutStack = $null;
}
}
}
End {
if ($debugPipe) { Write-Host ($cmd+": No more input") }
# If there is still input for command to process, we wait.
# We do that until the process terminates, so the pipelining will be
# maintained.
try {
$inputDone = $False
while(!$inputDone -and !$Process.hasExited -and $StdInStack) {
$syncStack = [System.Collections.Queue]::Synchronized($StdInStack);
if ($syncStack.Count -gt 0) {
$syncStack = $null
$tmp = [System.Threading.Thread]::Sleep($SleepTime);
} else {
$inputDone = $True
}
}
} finally {
$syncStack = $null
}
# At end, we are sure we have no more input to process,
# so we immediately close the command STDIN. That way the command
# will "know" there is no more input to process. That also Allows
# the async read of its output to complete.
if ($StdInStreamWriter) {
$StdInStreamWriter.Close();
}
# The command has no more input, but it still can output things.
# We wait until the command terminated and send to the pipe
# output everything the command sends on STDOUT.
while (!$Process.hasExited) {
try {
# If we can sync the OUT stack, that means that the Output
# subprocess has nothing more to write.
if ($StdOutStack) {
$syncStack = [System.Collections.Queue]::Synchronized($StdOutStack);
while ($syncStack.Count -gt 0) {
$stack_msg = [String]$syncStack.Dequeue();
if ($debugPipe) { Write-Host ($cmd+": OUT_E "+[String]$stack_msg) }
Write-Output ([String]$stack_msg)
};
}
} finally {
$syncStack = $null;
}
# To not strain unusefully the CPU, we wait a little more time between
# each check of STDOUT as the command can be in a "long run", thus we'll
# feed the stack at a lesser rhythm.
$tmp = [System.Threading.Thread]::Sleep($SleepTime * 2);
}
# We are finally at complete termination of the command.
if ($Process.hasExited) {
if ($debugPipe) { Write-Host "$cmd terminated." }
while(!$InJob.IsCompleted -or !$OutJob.IsCompleted) {
[System.Threading.Thread]::Sleep($SleepTime);
}
if ($InJob.IsCompleted) {
if ($InProc.InvocationStateInfo.State -eq "Failed") {
Write-Host ($cmd+": JOB IN Fail:"+$InProc.InvocationStateInfo.State)
Write-Host ($cmd+": JOB IN Fail:"+$InProc.InvocationStateInfo.Reason)
throw [System.Exception] "Input write to $cmd failed ..."
}
$res = $InProc.EndInvoke($InJob)
if ($debugPipe) { Write-Host ("JOB IN terminated : "+$res) }
}
if ($OutJob.IsCompleted) {
if ($OutProc.InvocationStateInfo.State -eq "Failed") {
Write-Host ($cmd+":JOB OUT Fail:"+$OutProc.InvocationStateInfo.State)
Write-Host ($cmd+":JOB OUT Fail:"+$OutProc.InvocationStateInfo.Reason)
throw [System.Exception] "Output read from $cmd failed ..."
}
$res = $OutProc.EndInvoke($OutJob)
if ($debugPipe) { Write-Host ("JOB OUT terminated : "+$res) }
}
# If the async read of command output is still not completed, we wait.
# When completed, we send its output to the pipeline processor.
if ($StdOutStack) {
try {
$syncStack = [System.Collections.Queue]::Synchronized($StdOutStack);
while ($syncStack.Count -gt 0) {
$stack_msg = [String]$syncStack.Dequeue();
if ($debugPipe) { Write-Host ($cmd+": OUT_E "+[String]$stack_msg) }
Write-Output ([String]$stack_msg)
};
} finally {
$syncStack = $null;
}
}
# If we are here, command output was entirely processed, so we close command STDOUT.
if ($StdOutStreamReader) {
$StdOutStreamReader.Close();
}
} else {
throw [System.Exception] "Unexpected leave, pipe broken ?..."
}
}
}
我有两个程序:
emitter.exe
- 运行 是一个无限循环,并一直在stdout
上打印一条消息。receiver.exe
- 也是 运行 一个无限循环,等待stdin
上的消息并处理它们。
在 Unix-line 系统上,我可以 运行 类似的东西并且一切正常:
$ emitter | receiver
它当然不适用于 PowerShell。
PS 首先尝试缓冲整个 emitter
的输出,如果完成,则将其传递给 receiver
。在我们的例子中,由于两个程序中的无限循环,输出将永远不会被传递。
模拟类 Unix 行为的最简单解决方法是什么?
我想一个简单的方法是将 emitter.exe
和 receiver.exe
封装在指定为管道处理器的两个 PowerShell 函数中。
这是完整 PowerShell 中的一个简单示例脚本:
function emitter() {
[CmdletBinding(SupportsShouldProcess=$True)]
[OutputType([String])] Param ()
Begin {}
Process {
sleep -m 1000
Write-Output "FIRST"
sleep -m 5000
Write-Output "TWO"
sleep -m 4000
Write-Output "THREE"
}
End {}
}
function receiver() {
[CmdletBinding(SupportsShouldProcess=$True)]
[OutputType([String])]
Param ([Parameter(Mandatory=$true, ValueFromPipeline=$true)][String[]] $msg)
Begin {
Write-Host "Start"
}
Process {
Write-Host "Received $msg"
}
End {
Write-Host "quit"
}
}
emitter | receiver
将 emitter
函数中的 Write-Output
... 替换为对 emitter.exe
的调用,将 receiver
函数中的 Write-Host "Received $msg"
替换为对 receiver.exe
。
或者更好的是,将它们作为参数传递给这两个函数。对于接收器,您必须使用 System.Diagnostics.Process
,因为我们需要传输来自发射器的输出 即时 。内置的 Powerhell Start-Process
无法完成这项工作,因为它不允许对进程标准输入进行写操作(我一直在寻找解决方法,但直到现在都没有成功)。
编辑: WHERE.EXE
的输出通过管道传输到 FINDSTR /C:x
的示例实现:
function emitter() {
[CmdletBinding(SupportsShouldProcess=$True)]
[OutputType([String])] Param ([String] $cmd)
Begin {}
Process {
& $cmd
}
End {}
}
function receiver() {
[CmdletBinding(SupportsShouldProcess=$True)]
[OutputType([String])]
Param ([String] $cmd, [String] $cmdpar,
[Parameter(Mandatory=$false, ValueFromPipeline=$true)][String[]] $msg)
Begin {
$cmdinfo = $(Get-Command $cmd)
if (!$cmdinfo) {
Write-Host "Failed to find $cmd.."
exit;
}
$fullpath = $cmdinfo.Path
$ProcessInfo = New-Object System.Diagnostics.ProcessStartInfo
$ProcessInfo.FileName = $fullpath
$ProcessInfo.RedirectStandardError = $false
$ProcessInfo.RedirectStandardOutput = $false
$ProcessInfo.RedirectStandardInput = $true
$ProcessInfo.UseShellExecute = $false
$ProcessInfo.CreateNoWindow = $false;
$ProcessInfo.Arguments = $cmdpar
$Process = New-Object System.Diagnostics.Process
$Process.StartInfo = $ProcessInfo
if ($Process.Start()) {
$StreamWriter = $Process.StandardInput;
Write-Host "Start"
} else {
Write-Host "Failed to open $cmd.."
exit;
}
}
Process {
if ($StreamWriter) {
$StreamWriter.WriteLine([String]$msg);
$StreamWriter.Flush();
}
}
End {
if ($StreamWriter) {
$StreamWriter.Close();
$Process.WaitForExit();
Write-Host "quit"
}
}
}
emitter "where.exe" | receiver "findstr.exe" "/C:x"
这将根据您的语言环境输出类似 :
的内容Start The syntax of this command : Examples : WHERE /R c:\windows *.exe *.dll *.bat quit
更新
这是一个复杂得多的 PowerShell Cmdlet,可用于菊花链外部命令 and/or PS cmdlet。它也可用于调试您的管道。它允许使用 PowerShell 流水线处理 进行连续流水线处理 (例如 ping -t 127.0.0.1 | emitter_recevier "more.com"
)。与 cmd.exe
等效时,它会带来一些性能成本。
事实上,不要尝试使用 sort.exe
.
带解释注释的代码:
# Unix-like daisy chaining of console commands with PowerShell pipeline processor.
#
#
# The wrapper function for external console command.
#
function emitter_receiver() {
[CmdletBinding(SupportsShouldProcess=$False)]
[OutputType([String])]
Param ([Parameter(Mandatory=$True,
HelpMessage="Command to execute, without parameters")][String] $cmd,
[Parameter(Mandatory=$False,
HelpMessage="Command parameters")][String] $cmdpar=$null,
[Parameter(Mandatory=$False,
HelpMessage="Pad buffered pipeline size")][Int32] $bufferedPipe=0,
[Parameter(Mandatory=$False,
HelpMessage="Debug pipeline flag")][Boolean] $debugPipe=$False,
[Parameter(Mandatory=$False, ValueFromPipeline=$True)][String[]] $msg)
<#
.SYNOPSIS
Allows Unix-like daisy chaining of pipelined commands with PowerShell pipeline.
.DESCRIPTION
Allows pipelining "on the fly" of external console commands with the
PowerShell pipe processing. For external console commands, the PowerShell
pipe processing waits for the termination of the command before piping its
output to the next channel, thus preventing any "tail"/pool usage type.
This function wraps the external console command to inject its input
and output in the PowerShell pipeline processor in a Unix-like way,
each member of the pipeline will be executed in parallel and will consume
outputs of previous member "on the fly" without a wait for their termination.
PowerShell 7.1 at least, not tested with previous release.
Indeed, don't try with sort.exe.
.PARAMETER cmd
Specify the external console command to execute, without any parameters.
The function will search the command with the Get-Command cmdlet, so a
relative path or only the command filename can be provided. The complete
name including extension must be provided.
.PARAMETER cmdpar
Specify a parameters string for the external console command to execute.
Optionnal, none by default.
.PARAMETER bufferedPipe
Size of the padding to add to the STDIN of command like findstr.exe.
Some commands has buffered input processing and do not process anything
until their input buffer is full. For findstr.exe, with a continous
input pipe, set it to 8000 for "fluent" output, otherwise findstr.exe
will wait to have 8kb of input before processing/outputting anything.
Optionnal, 0 by default. 8000 at a maximum, capped by default.
.PARAMETER debugPipe
Boolean flag to get a debug output on console for the pipe processing.
Optionnal, $False by default.
.EXAMPLES
A simple daisy chaning of common console commands :
ps> emitter_receiver "where.exe" | emitter_receiver "findstr.exe" "/C:x" | emitter_receiver "more.com"
> WHERE /R c:\windows *.exe *.dll *.bat
> examples :
> The syntax of this commande is :
The real interest of this function is for continuous pipeline of console commands output :
ps> emitter_receiver "ping" "-t 127.0.0.1" | emitter_receiver "more.com"
> pinging [127.0.0.1] with 32 bytes of data:
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
ps> emitter_receiver "ping.exe" "-t 127.0.0.1" | emitter_receiver "findstr.exe" "/C:TTL" 8000
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
ps> emitter_receiver "ping.exe" "-t 127.0.0.1" | emitter_receiver "findstr.exe" "/C:TTL" 8000 | emitter_receiver "more.com"
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
A direct "ping -t 127.0.0.1 | findstr /C:TTL" or "ping -t 127.0.0.1 | more" in PowerShell will hang.
The function can be used only in part of the pipeline for commands that can't handle continous input :
ps> ping -t 127.0.0.1 | emitter_receiver "more.com"
> pinging [127.0.0.1] with 32 bytes of data:
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
ps> ping.exe -t 127.0.0.1 | emitter_receiver "findstr.exe" "/C:TTL" 8000
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> Reply from 127.0.0.1: bytes=32 time<1ms TTL=124
> ...
.INPUTS
Pipelined input from a PowerShell pipeline processor.
.OUTPUTS
Pipelined output from the provided external console command.
.EXCEPTIONS
[System.IO.FileNotFoundException] "Failed to find .." : The provided command can't be found with Get-Command.
[System.Exception] "Failed to open .." : The provided command can't be launched or failed
[System.Exception] "Can't get a stack..." : Can't get a stack, memory and/or .Net related trouble
[System.Exception] "Input write to $cmd failed ..." : Can't write anymore in command STDIN
[System.Exception] "Output read from $cmd failed ..." : Can't read anymore from command STDOUT
[System.Exception] "Unexpected leave, pipe broken ?..." : Abnormal termination of the pipe processing
#>
Begin {
# If we fail, stop the pipe...
$ErrorActionPreference = "Stop"
# Look for the command in PATH
$cmdinfo = $(Get-Command $cmd)
if (!$cmdinfo -or !$cmdinfo.Path) {
throw [System.IO.FileNotFoundException] "Failed to find $cmd..";
}
$fullpath = $cmdinfo.Path
# Use of System.Diagnostics.Process to launch the command and redirect its STDIN/STDOUT
$ProcessInfo = New-Object System.Diagnostics.ProcessStartInfo
$ProcessInfo.FileName = $fullpath
$ProcessInfo.RedirectStandardError = $False
$ProcessInfo.RedirectStandardOutput = $True
$ProcessInfo.RedirectStandardInput = $True
$ProcessInfo.UseShellExecute = $False
$ProcessInfo.CreateNoWindow = $False;
if ($cmdpar) {
$ProcessInfo.Arguments = $cmdpar
}
$Process = New-Object System.Diagnostics.Process
$Process.StartInfo = $ProcessInfo
# Reference to the stream reader and writer for the command's STDOUT / STDIN processing
$StdInStreamWriter = $null;
$StdOutStreamReader = $null;
# Sleep time for polling STDIN. To not slow too much the pipeline processing,
# we take 1ms. We have to avoid stressing the CPU with the polling.
$SleepTime=1;
# Control of input parameters if any
if ($bufferedPipe -gt 8000) { $bufferedPipe = 8000; }
if ($bufferedPipe -lt 0) { $bufferedPipe = 0; }
# Launch the command
$ProcessStarted = $Process.Start()
if ($ProcessStarted) {
$StdInStreamWriter = $Process.StandardInput;
$StdOutStreamReader = $Process.StandardOutput;
$StdInStreamWriter.AutoFlush = $True;
} else {
throw [System.Exception] "Failed to open $cmd..";
}
# We use two FIFO stacks to exchange STDIN and STDOUT stream data between
# the PowerShell pipeline processing in the 'premise-style' context and the
# process we have launched for the command. We will feed them in the
# following two poll subprocesses for STDIN and STDOUT as no async operations
# are allowed in a 'premise-style' context.
$StdOutStack = new-object System.Collections.Queue;
if (!$StdOutStack) {
throw [System.Exception] "Can't get a stack..."
}
$StdInStack = new-object System.Collections.Queue;
if (!$StdInStack) {
throw [System.Exception] "Can't get a stack..."
}
# We create two poll subprocesses to read from STDOUT and write into STDIN
# of the process we have launched for the command. As theses subprocesses
# are started from the 'begin' pipeline part of the function, they can then
# makes STDOUT read that feed the STDOUT stack and STDIN write from the
# STDIN stack. Theses operations are not directly doable into the 'process'
# pipeline part of the function as it is a 'premise-style' context.
#
# STDOUT poll subprocess
#
$OutProc = [powershell]::Create().AddScript({
Param ([parameter(Mandatory=$True)]$args)
$Process = $args[0]
$StdOutStreamReader = $args[1]
$SleepTime = $args[2]
$cmd = $args[3]
$StdOutStack = $args[4]
$debugPipe = $args[5]
while (!$StdOutStreamReader.EndOfStream) {
$msgproc = $StdOutStreamReader.ReadLine()
Write-Output ($cmd+": OUT_S: "+[String]$msgproc)
try {
$syncStdOutStack = [System.Collections.Queue]::Synchronized($StdOutStack);
$syncStdOutStack.Enqueue($msgproc)
} finally {
$syncStdOutStack = $null
}
}
if ($debugPipe) { Write-Output ($cmd+": OUT_S terminated.") }
})
$tmp = $OutProc.AddParameter("args", @($Process, $StdOutStreamReader, $SleepTime, $cmd, $StdOutStack, $debugPipe))
$OutJob = $OutProc.BeginInvoke()
#
# STDIN poll subprocess
#
$InProc = [powershell]::Create().AddScript({
Param ([parameter(Mandatory=$True)]$args)
$Process = $args[0]
$StdInStreamWriter = $args[1]
$SleepTime = $args[2]
$cmd = $args[3]
$StdInStack = $args[4]
$debugPipe = $args[5]
$bufferedPipe = $args[6]
if ($bufferedPipe -gt 0) { $dumb_findstr = ("12"*4000).Replace("1", [char]27) }
while (!$Process.hasExited -and $StdInStreamWriter) {
try {
$syncStdInStack = [System.Collections.Queue]::Synchronized($StdInStack);
while ($syncStdInStack.Count -gt 0) {
$stack_msg = [String]$syncStdInStack.Dequeue();
$syncStdInStack = $null;
if ($debugPipe) { Write-Output ($cmd+": IN_S: "+[String]$stack_msg) }
#$check = $StdInStreamWriter.Write($dumb_findstr.Substring(0,$padsize))
$check = $StdInStreamWriter.WriteLine([String]$stack_msg)
if ($bufferedPipe -gt 0) {
# As some command (findstr...) can have buffered input in every case,
# we send a padding up to 4000 escape bytes just to overcome that...
$padsize = $bufferedPipe-$stack_msg.Length-($stack_msg.Length%2)
$check = $StdInStreamWriter.Write($dumb_findstr.Substring(0,$padsize))
}
};
} finally {
$syncStdInStack = $null;
}
# To not strain unusefully the CPU, we wait far more time between each
# flush to the stack. This does not affect really the performance as
# the incoming message are still stacked as we wait the 'Process'
# part of the function.
$tmp = [System.Threading.Thread]::Sleep($SleepTime * 5);
}
if ($debugPipe) { Write-Output ($cmd+": IN_S terminated.") }
})
$tmp = $InProc.AddParameter("args", @($Process, $StdInStreamWriter, $SleepTime, $cmd, $StdInStack, $debugPipe, $bufferedPipe))
$InJob = $InProc.BeginInvoke()
}
Process {
# If we are in the process part, that means there is input from the PowerShell
# pipeline to process. We send this input to the STDIN of the command through
# a FIFO stack and a polling subprocess. Each input from the PowerShell pipeline
# processor is injected in the command STDIN.
if ($StdInStreamWriter -and $StdInStack) {
try {
$syncStdInStack = [System.Collections.Queue]::Synchronized($StdInStack);
$syncStdInStack.Enqueue($msg)
if ($debugPipe) { Write-Host ($cmd+": IN_P "+$msg) }
} finally {
$syncStdInStack = $null
}
}
if ($debugPipe) { Write-Host ($cmd+": INST "+$InProc.InvocationStateInfo.State) }
if ($debugPipe) { Write-Host ($cmd+": OUST "+$OutProc.InvocationStateInfo.State) }
# While processing input from the pipe, we send on the fly command
# output to the pipeline processor.
if ($StdOutStack) {
try {
$syncStdOutStack = [System.Collections.Queue]::Synchronized($StdOutStack);
if ($debugPipe) { Write-Host ($cmd+": OUSTS "+$syncStdOutStack.Count) }
while ($syncStdOutStack.Count -gt 0) {
$stack_msg = [String]$syncStdOutStack.Dequeue();
if ($debugPipe) { Write-Host ($cmd+": OUT_P "+[String]$stack_msg) }
Write-Output ([String]$stack_msg)
};
} finally {
$syncStdOutStack = $null;
}
}
}
End {
if ($debugPipe) { Write-Host ($cmd+": No more input") }
# If there is still input for command to process, we wait.
# We do that until the process terminates, so the pipelining will be
# maintained.
try {
$inputDone = $False
while(!$inputDone -and !$Process.hasExited -and $StdInStack) {
$syncStack = [System.Collections.Queue]::Synchronized($StdInStack);
if ($syncStack.Count -gt 0) {
$syncStack = $null
$tmp = [System.Threading.Thread]::Sleep($SleepTime);
} else {
$inputDone = $True
}
}
} finally {
$syncStack = $null
}
# At end, we are sure we have no more input to process,
# so we immediately close the command STDIN. That way the command
# will "know" there is no more input to process. That also Allows
# the async read of its output to complete.
if ($StdInStreamWriter) {
$StdInStreamWriter.Close();
}
# The command has no more input, but it still can output things.
# We wait until the command terminated and send to the pipe
# output everything the command sends on STDOUT.
while (!$Process.hasExited) {
try {
# If we can sync the OUT stack, that means that the Output
# subprocess has nothing more to write.
if ($StdOutStack) {
$syncStack = [System.Collections.Queue]::Synchronized($StdOutStack);
while ($syncStack.Count -gt 0) {
$stack_msg = [String]$syncStack.Dequeue();
if ($debugPipe) { Write-Host ($cmd+": OUT_E "+[String]$stack_msg) }
Write-Output ([String]$stack_msg)
};
}
} finally {
$syncStack = $null;
}
# To not strain unusefully the CPU, we wait a little more time between
# each check of STDOUT as the command can be in a "long run", thus we'll
# feed the stack at a lesser rhythm.
$tmp = [System.Threading.Thread]::Sleep($SleepTime * 2);
}
# We are finally at complete termination of the command.
if ($Process.hasExited) {
if ($debugPipe) { Write-Host "$cmd terminated." }
while(!$InJob.IsCompleted -or !$OutJob.IsCompleted) {
[System.Threading.Thread]::Sleep($SleepTime);
}
if ($InJob.IsCompleted) {
if ($InProc.InvocationStateInfo.State -eq "Failed") {
Write-Host ($cmd+": JOB IN Fail:"+$InProc.InvocationStateInfo.State)
Write-Host ($cmd+": JOB IN Fail:"+$InProc.InvocationStateInfo.Reason)
throw [System.Exception] "Input write to $cmd failed ..."
}
$res = $InProc.EndInvoke($InJob)
if ($debugPipe) { Write-Host ("JOB IN terminated : "+$res) }
}
if ($OutJob.IsCompleted) {
if ($OutProc.InvocationStateInfo.State -eq "Failed") {
Write-Host ($cmd+":JOB OUT Fail:"+$OutProc.InvocationStateInfo.State)
Write-Host ($cmd+":JOB OUT Fail:"+$OutProc.InvocationStateInfo.Reason)
throw [System.Exception] "Output read from $cmd failed ..."
}
$res = $OutProc.EndInvoke($OutJob)
if ($debugPipe) { Write-Host ("JOB OUT terminated : "+$res) }
}
# If the async read of command output is still not completed, we wait.
# When completed, we send its output to the pipeline processor.
if ($StdOutStack) {
try {
$syncStack = [System.Collections.Queue]::Synchronized($StdOutStack);
while ($syncStack.Count -gt 0) {
$stack_msg = [String]$syncStack.Dequeue();
if ($debugPipe) { Write-Host ($cmd+": OUT_E "+[String]$stack_msg) }
Write-Output ([String]$stack_msg)
};
} finally {
$syncStack = $null;
}
}
# If we are here, command output was entirely processed, so we close command STDOUT.
if ($StdOutStreamReader) {
$StdOutStreamReader.Close();
}
} else {
throw [System.Exception] "Unexpected leave, pipe broken ?..."
}
}
}