Powershell:管道生产者和消费者通信

Powershell: Pipeline Producer and Consumer communication

环境:Win10 + Powershell 5.1

我有一个生产者函数和一个消费者函数。我用管道连接它们。 生产者函数每 5 秒创建一个新对象,而消费者函数可能需要更长的时间才能在 5 秒内消费一个对象。消费者函数有能力处理 3 个对象。 有没有办法让消费者让生产者知道生产者需要等到消费者有额外的容量?

这是我的代码。 Populate-NextIcon = Producer 和 Launch-Icon = consumer

Populate-NextIcon -List $list| foreach-object { $_ | Launch-Icon }

function Populate-NextIcon
{
param ([System.Collections.ArrayList]$list)

foreach($i in $list)
{
  New-Object PSObject -Property @{ Name = $i.name; x = $i.x; y= $i.y }
  sleep
}

}

function Launch-Icon
{
 [CmdletBinding()]
    Param(
            [Parameter(ValueFromPipelineByPropertyName )]
                [String]$name,
            [Parameter(ValueFromPipelineByPropertyName )]
                [int]$x,
            [Parameter(ValueFromPipelineByPropertyName )]
                [int]$y              
        )

....do something to launch the icon
}

您无需执行任何操作来限制管道中的上游命令(假设它是 PowerShell 命令)- PowerShell 将自动阻止生产者(上游 cmdlet)直到消费者(下游 cmdlet)已准备好接受更多输入。

考虑以下示例:

& { while ($true) { (++$i) } } | ForEach-Object { "[$_]"; Start-Sleep 1 }

这会产生无限的数字序列,每秒输出一个新数字。

详情见底部。


请注意,PowerShell 的通用 -OutBuffer 参数 仅控制 生产者 端的缓冲 (因此在这里适用);也就是说,它允许 cmdlet 在 之前 生成一批对象输出 PowerShell 通过管道发送它们 - 而默认情况下,每个对象在输出时立即发送(有关详细信息,请参阅底部部分) ).
请注意,也许令人惊讶的是,-OutBuffer <n> 意味着仅在创建第 <n>n 个 + 1 对象后才开始输出。换句话说:默认行为等于 -OutBuffer 0,而 -OutBuffer 1 表示一旦创建了 2nd 对象,就发送对象。

虽然这改变了对象通过管道发送的时间,但基本机制保持不变:一旦一批对象准备就绪,对象就被发送一个一个.

此参数很少使用,但在缓慢的消费者(下游 cmdlet)会过度限制生产者并且您希望生产者提前生产对象的情况下很有用;总体时间不会改变,但生产者可以更快地生产其批次。


至于你试过的:

作为连接两个管道处理 cmdlet 的管道中的 Mathias R. Jessen points out, there is no reason to use the ForEach-Object cmdlet - 只需使用
Populate-NextIcon -List $list| Launch-Icon

但是,此 假设 Launch-Icon 使用 process { ... } 块进行每个输入对象处理 ,缺少它很可能是问题所在在你的情况下。

没有这样的块,你的函数体隐含地表现得像一个 end { ... } 块,这意味着它不会被调用直到 all 输入已收到 - 假设输入实际结束,您的管道绑定参数变量将在此时仅反映 last 输入对象 - 参见 about_Functions_Advanced.
实际上 - 如果曾经调用过(隐含的)end 块,使用 infinite 生产者将 not 发生 - 这种遗漏限制您的函数最多处理从管道接收到的 单个 对象。

绕过你的ForEach-Object调用缺少process块的问题,通过有效地创建一个嵌套,单- input-object 管道,其唯一的输入对象是单个 ForEach-Object 输入对象 手边 .

虽然这不会妨碍 功能 本身,但它会不必要地显着降低您的命令速度。


数据如何流经 PowerShell 管道:

传统shell管道,例如在POSIX-like shells Unix,基于:

  • 进程间通信(程序运行宁在子进程交流)
  • 发送原始字节数据数据以字节块(固定大小的缓冲区,通常为 64KB)。

相比之下,PowerShell's pipeline 基于:

  • 进程中 方法调用 specialized .NET 类 调用了 cmdlets(同样适用于通常编译的 binary cmdlet来自 C# 和写入 PowerShell cmdlets a.k.a. advanced functions / scripts)
  • 发送数据作为对象(.NET类型的实例),它们总是通过一个一个:
    • 默认情况下,一旦它们被发射
    • with -OutputBuffer 可选地控制首先生成多少个对象

注意:外部程序参与 PowerShell 管道时:

  • 它们 运行 在 子进程 中,必然

  • 仅支持基于文本的通信(字符串),其中行文本构成objects 一个接一个地流过管道,这既适用于从外部程序 接收 的数据,也适用于 PowerShell 命令 发送到的数据 一个外部程序。

  • 警告:从 PowerShell 7.1 开始:

    • 支持通过管道发送原始字节数据 - 参见 GitHub issue #1908

    • 将单个字符串 发送到 外部程序总是会导致 尾随换行符 附加 - 见 GitHub issue #5974.


管道中 cmdlet 之间的通信:

二进制 cmdlet 接收 管道输入 - 一个对象一个对象 - 通过 Cmdlet.ProcessRecord() 方法(注意术语 record[=268 的意外使用=] 在名称中,但是不能再更改)。

PowerShell代码中,等价于:

  • 在高级函数/脚本中:函数/脚本主体中的process { ... }块。

  • in(很少用)filter函数:整个函数体

发送管道输入通常发生在相同方法/块中(使用.WriteObject() 二进制 cmdlet 中的方法,以及隐式输出或(很少需要)Write-Output 调用),尽管必须 聚合 所有输入才能产生输出的 cmdlet ,例如 Sort-Object,必须延迟输出,直到它们的 .EndProcessing() 方法/end 块被调用。

当两个 PowerShell 命令 在管道中连接时 - 我们称它们为 生产者消费者(从给定命令的角度来看,指代管道中较早/较晚的命令的一般术语是上游 / downstream commands),它们的具体交互如下:

  • 当生产者输出(发送)一个对象(如果涉及-OutBuffer可能不会立即发生),消费者的.ProcessRecord方法/process块被调用。

  • 同步发生,即生产者的执行阻塞直到接收者的方法/块调用完成。

这意味着 调整消费者和接收者的节奏以避免 运行消费者中接收者无法跟上的输出生产被内置到 PowerShell 的基础设施中(假设 intra-command 同步执行的代码,这是典型的)。

这是一个最小示例,它使用简单函数作为生产者,使用filter函数作为接收者;但是,相同的原则适用于 cmdlet:

# A simple produces function that produces a number sequence indefinitely.
# Write-Output is equivalent to calling .WriteObject() from a binary cmdlet.
# (It is used here explicitly to highlight when output happens, 
#  though note that explicit Write-Output use  is rarely necessary.)
function Invoke-Producer { 
  while ($true) { Write-Output ++$i; Write-Verbose -vb 'back in producer' } 
}

# A simply receiver filter function that echoes each input object and
# indefinitely waits for user input before continuing.
# The filter's body acts like a ProcessRecord() / process block implementation.
filter Invoke-Receiver { "`nReceived: $_"; Read-Host 'Press Enter to continue' }

Invoke-Producer | Invoke-Receiver

按下 Enter 后,您将看到以下内容,这表明在接收方完成对每个对象的处理之前,控制权不会返回给生产方:

Received: 1
Press Enter to continue:

VERBOSE: back in producer

Received: 2
Press Enter to continue:

警告:当外部程序 充当生产者 与充当接收者的 PowerShell 命令结合使用(从 PowerShell 7.1 开始):

PowerShell 缓冲区 如果接收器无法跟上,则来自外部程序的输出行,这意味着至少假设你可能 运行 内存不足。

您可以在 Unix(macOS 或 Linux)上按如下方式验证此行为;确保当前没有 find 进程 运行ning:

$notified = $false

# Using `find`, list the paths of all file-system items in PowerShell's 
# home directory and echo one by one, waiting fora half-second after each.
find $PSHOME | ForEach-Object { 
  "[$_]"
  if (-not $notified -and -not (Get-Process -ea Ignore find)) { 
    Write-Verbose -vb "FIND HAS EXITED."
    $notified = $true
  }
  Start-Sleep -MilliSeconds 500
}

你会看到类似下面的内容,这意味着 PowerShell 在后台缓冲了所有 find 的输出(因此 find 然后退出),然后ForEach-Object 命令已全部处理:

[/Users/jdoe/.powershell]
[/Users/jdoe/.powershell/Microsoft.CSharp.dll]
VERBOSE: FIND HAS EXITED.
[/Users/jdoe/.powershell/Microsoft.PowerShell.Commands.Management.dll]
...

请注意,在某些情况下,即使是 PowerShell 接收器也可能 运行 内存不足 无限生产者,即:

  • 如果 PowerShell 命令需要先聚合所有输入。

  • 如果 PowerShell 命令是一个 简单的 函数/脚本,它通过 automatic $input variable 访问管道输入,其使用意味着 PowerShell 缓冲所有输入前面。

例如,以下命令将 运行 内存不足,因为简单 foo 函数体的处理直到 所有输入都已收集,根据定义,这里永远不会发生:

# Simple function that, due to use of $input, relies on PowerShell
# to collect all its pipeline input *up front*.
# Note: The body of a simple function / script is implicitly the
#       equivalent of the `.EndProcessing()` method / `end` block of a
#       cmdlet.
function foo { $input | % { "[$_]" } }

# !! Never produces output, and eventually runs out of memory.
& { while ($true) { (++$i) } } | foo

所以:

  1. 一旦我在我的函数中使用了 process{} 块,就不需要 ForEach-Object 了。 我以为默认情况下该函数在 process{} 块中,但后来我发现它在 end 块中
  2. 这个理解很关键:

once a batch of objects is ready, the objects are sent one by one

最初是由于误用了 process{} 块,我误解了程序只会在一切就绪后才发送批处理对象