PHP popen进程限制?
PHP popen process limit?
我正在尝试将一项耗时的任务提取到一个单独的进程中。不幸的是,多线程似乎并不是 PHP 的一个选项,但您可以使用 popen 创建新的 php 进程。
用例是这样的:有一个每分钟运行的 cronjob,它检查是否有任何电子邮件活动需要发送。可能需要同时发送多个活动,但截至目前,它每分钟只接收一个活动。我想将活动的发送提取到一个单独的进程,以便我可以同时发送多个活动。
代码看起来像这样(请注意,这只是概念证明):
crontab
* * * * * root /usr/local/bin/php /var/www/maintask.php 2>&1
maintask.php
for ($i = 0; $i < 4; $i++) {
$processName = "Process_{$i}";
echo "Spawn process {$processName}" . PHP_EOL;
$process = popen("php subtask.php?process_name={$processName} 2>&1", "r");
stream_set_blocking($process, false);
}
subtask.php
$process = $_GET['process_name'];
echo "Started sleeping process {$process}" . PHP_EOL;
sleep(rand(10, 40));
echo "Stopped sleeping process {$process}" . PHP_EOL;
现在,我遇到的问题是 popen 在任何时候都只会产生 2 个进程,而我正试图产生 4 个进程。我不明白为什么。似乎没有记录任何限制。也许这是受我可用内核数量的限制?
我修改了 subtask.php
以便您可以看到每个任务何时开始、结束以及打算等待多长时间。现在您可以看到进程何时 starts/stops 可以减少睡眠时间 - 不需要使用 ps -aux
来显示进程何时 运行
subtask.php
<?php
$process = $argv[1];
$sleepTime = rand(1, 10);
echo date('Y-m-d H:i:s') . " - Started sleeping process {$process} ({$sleepTime})" . PHP_EOL;
sleep($sleepTime);
echo date('Y-m-d H:i:s') . " - Stopped sleeping process {$process}" . PHP_EOL;
我已将 Class 添加到 maintask.php 代码中,以便您可以对其进行测试...当您 queue()
的条目数超过您设置的 [=15] 时,乐趣就开始了=](尝试 32)
注意:结果将按完成的顺序返回
maintask.php
<?php
class ParallelProcess
{
private $maxProcesses = 16; // maximum processes
private $arrProcessQueue = [];
private $arrCommandQueue = [];
private function __construct()
{
}
private function __clone()
{
}
/**
*
* @return \static
*/
public static function create()
{
$result = new static();
return $result;
}
/**
*
* @param int $maxProcesses
* @return \static
*/
public static function load($maxProcesses = 16)
{
$result = self::create();
$result->setMaxProcesses($maxProcesses);
return $result;
}
/**
* get maximum processes
*
* @return int
*/
public function getMaxProcesses()
{
return $this->maxProcesses;
}
/**
* set maximum processes
*
* @param int $maxProcesses
* @return $this
*/
public function setMaxProcesses($maxProcesses)
{
$this->maxProcesses = $maxProcesses;
return $this;
}
/**
* number of entries in the process queue
*
* @return int
*/
public function processQueueLength()
{
$result = count($this->arrProcessQueue);
return $result;
}
/**
* number of entries in the command queue
*
* @return int
*/
public function commandQueueLength()
{
$result = count($this->arrCommandQueue);
return $result;
}
/**
* process open
*
* @staticvar array $arrDescriptorspec
* @param string $strCommand
* @return $this
* @throws \Exception
*/
private function p_open($strCommand)
{
static $arrDescriptorSpec = array(
0 => array('file', '/dev/null', 'r'), // stdin is a file that the child will reda from
1 => array('pipe', 'w'), // stdout is a pipe that the child will write to
2 => array('file', '/dev/null', 'w') // stderr is a pipe that the child will write to
);
$arrPipes = array();
if (($resProcess = proc_open($strCommand, $arrDescriptorSpec, $arrPipes)) === false) {
throw new \Exception("error: proc_open() failed!");
}
$resStream = &$arrPipes[1];
if (($blnSetBlockingResult = stream_set_blocking($resStream, true)) === false) {
throw new \Exception("error: stream_set_blocking() failed!");
}
$this->arrProcessQueue[] = array(&$strCommand, &$resProcess, &$resStream);
return $this;
}
/**
* execute any queued commands
*
* @return $this
*/
private function executeCommand()
{
while ($this->processQueueLength() < $this->maxProcesses and $this->commandQueueLength() > 0) {
$strCommand = array_shift($this->arrCommandQueue);
$this->p_open($strCommand);
}
return $this;
}
/**
* process close
*
* @param array $arrQueueEntry
* @return $this
*/
private function p_close(array $arrQueueEntry)
{
$resProcess = $arrQueueEntry[1];
$resStream = $arrQueueEntry[2];
fclose($resStream);
$this->returnValue = proc_close($resProcess);
$this->executeCommand();
return $this;
}
/**
* queue command
*
* @param string $strCommand
* @return $this
*/
public function queue($strCommand) {
// put the command on the $arrCommandQueue
$this->arrCommandQueue[] = $strCommand;
$this->executeCommand();
return $this;
}
/**
* read from stream
*
* @param resource $resStream
* @return string
*/
private static function readStream($resStream)
{
$result = '';
while (($line = fgets($resStream)) !== false) {
$result .= $line;
}
return $result;
}
/**
* read a result from the process queue
*
* @return string|false
*/
private function readProcessQueue()
{
$result = false;
reset($this->arrProcessQueue);
while ($result === false && list($key, $arrQueueEntry) = each($this->arrProcessQueue)) {
$arrStatus = proc_get_status($arrQueueEntry[1]);
if ($arrStatus['running'] === false) {
array_splice($this->arrProcessQueue, $key, 1);
$resStream = $arrQueueEntry[2];
$result = self::readStream($resStream);
$this->p_close($arrQueueEntry);
}
}
return $result;
}
/**
* get result from process queue
*
* @return string|false
*/
public function readNext()
{
$result = false;
if ($this->processQueueLength() === 0) {
} else {
while ($result === false and $this->processQueueLength() > 0) {
$result = $this->readProcessQueue();
}
}
return $result;
}
}
set_time_limit(0); // don't timeout
$objParallelProcess = ParallelProcess::load(8); // allow up to 8 parallel processes
for ($i = 0; $i < 4; $i++) {
$processName = "Process_{$i}";
echo date('Y-m-d H:i:s') . " - Queue process {$processName}" . PHP_EOL;
$objParallelProcess->queue("php subtask.php {$processName}"); // queue process
}
// loop through process queue
while (($strResponse = $objParallelProcess->readNext()) !== false) { // read next result and run next command if one is queued
// process response
echo $strResponse;
}
我正在尝试将一项耗时的任务提取到一个单独的进程中。不幸的是,多线程似乎并不是 PHP 的一个选项,但您可以使用 popen 创建新的 php 进程。
用例是这样的:有一个每分钟运行的 cronjob,它检查是否有任何电子邮件活动需要发送。可能需要同时发送多个活动,但截至目前,它每分钟只接收一个活动。我想将活动的发送提取到一个单独的进程,以便我可以同时发送多个活动。
代码看起来像这样(请注意,这只是概念证明):
crontab
* * * * * root /usr/local/bin/php /var/www/maintask.php 2>&1
maintask.php
for ($i = 0; $i < 4; $i++) {
$processName = "Process_{$i}";
echo "Spawn process {$processName}" . PHP_EOL;
$process = popen("php subtask.php?process_name={$processName} 2>&1", "r");
stream_set_blocking($process, false);
}
subtask.php
$process = $_GET['process_name'];
echo "Started sleeping process {$process}" . PHP_EOL;
sleep(rand(10, 40));
echo "Stopped sleeping process {$process}" . PHP_EOL;
现在,我遇到的问题是 popen 在任何时候都只会产生 2 个进程,而我正试图产生 4 个进程。我不明白为什么。似乎没有记录任何限制。也许这是受我可用内核数量的限制?
我修改了 subtask.php
以便您可以看到每个任务何时开始、结束以及打算等待多长时间。现在您可以看到进程何时 starts/stops 可以减少睡眠时间 - 不需要使用 ps -aux
来显示进程何时 运行
subtask.php
<?php
$process = $argv[1];
$sleepTime = rand(1, 10);
echo date('Y-m-d H:i:s') . " - Started sleeping process {$process} ({$sleepTime})" . PHP_EOL;
sleep($sleepTime);
echo date('Y-m-d H:i:s') . " - Stopped sleeping process {$process}" . PHP_EOL;
我已将 Class 添加到 maintask.php 代码中,以便您可以对其进行测试...当您 queue()
的条目数超过您设置的 [=15] 时,乐趣就开始了=](尝试 32)
注意:结果将按完成的顺序返回
maintask.php
<?php
class ParallelProcess
{
private $maxProcesses = 16; // maximum processes
private $arrProcessQueue = [];
private $arrCommandQueue = [];
private function __construct()
{
}
private function __clone()
{
}
/**
*
* @return \static
*/
public static function create()
{
$result = new static();
return $result;
}
/**
*
* @param int $maxProcesses
* @return \static
*/
public static function load($maxProcesses = 16)
{
$result = self::create();
$result->setMaxProcesses($maxProcesses);
return $result;
}
/**
* get maximum processes
*
* @return int
*/
public function getMaxProcesses()
{
return $this->maxProcesses;
}
/**
* set maximum processes
*
* @param int $maxProcesses
* @return $this
*/
public function setMaxProcesses($maxProcesses)
{
$this->maxProcesses = $maxProcesses;
return $this;
}
/**
* number of entries in the process queue
*
* @return int
*/
public function processQueueLength()
{
$result = count($this->arrProcessQueue);
return $result;
}
/**
* number of entries in the command queue
*
* @return int
*/
public function commandQueueLength()
{
$result = count($this->arrCommandQueue);
return $result;
}
/**
* process open
*
* @staticvar array $arrDescriptorspec
* @param string $strCommand
* @return $this
* @throws \Exception
*/
private function p_open($strCommand)
{
static $arrDescriptorSpec = array(
0 => array('file', '/dev/null', 'r'), // stdin is a file that the child will reda from
1 => array('pipe', 'w'), // stdout is a pipe that the child will write to
2 => array('file', '/dev/null', 'w') // stderr is a pipe that the child will write to
);
$arrPipes = array();
if (($resProcess = proc_open($strCommand, $arrDescriptorSpec, $arrPipes)) === false) {
throw new \Exception("error: proc_open() failed!");
}
$resStream = &$arrPipes[1];
if (($blnSetBlockingResult = stream_set_blocking($resStream, true)) === false) {
throw new \Exception("error: stream_set_blocking() failed!");
}
$this->arrProcessQueue[] = array(&$strCommand, &$resProcess, &$resStream);
return $this;
}
/**
* execute any queued commands
*
* @return $this
*/
private function executeCommand()
{
while ($this->processQueueLength() < $this->maxProcesses and $this->commandQueueLength() > 0) {
$strCommand = array_shift($this->arrCommandQueue);
$this->p_open($strCommand);
}
return $this;
}
/**
* process close
*
* @param array $arrQueueEntry
* @return $this
*/
private function p_close(array $arrQueueEntry)
{
$resProcess = $arrQueueEntry[1];
$resStream = $arrQueueEntry[2];
fclose($resStream);
$this->returnValue = proc_close($resProcess);
$this->executeCommand();
return $this;
}
/**
* queue command
*
* @param string $strCommand
* @return $this
*/
public function queue($strCommand) {
// put the command on the $arrCommandQueue
$this->arrCommandQueue[] = $strCommand;
$this->executeCommand();
return $this;
}
/**
* read from stream
*
* @param resource $resStream
* @return string
*/
private static function readStream($resStream)
{
$result = '';
while (($line = fgets($resStream)) !== false) {
$result .= $line;
}
return $result;
}
/**
* read a result from the process queue
*
* @return string|false
*/
private function readProcessQueue()
{
$result = false;
reset($this->arrProcessQueue);
while ($result === false && list($key, $arrQueueEntry) = each($this->arrProcessQueue)) {
$arrStatus = proc_get_status($arrQueueEntry[1]);
if ($arrStatus['running'] === false) {
array_splice($this->arrProcessQueue, $key, 1);
$resStream = $arrQueueEntry[2];
$result = self::readStream($resStream);
$this->p_close($arrQueueEntry);
}
}
return $result;
}
/**
* get result from process queue
*
* @return string|false
*/
public function readNext()
{
$result = false;
if ($this->processQueueLength() === 0) {
} else {
while ($result === false and $this->processQueueLength() > 0) {
$result = $this->readProcessQueue();
}
}
return $result;
}
}
set_time_limit(0); // don't timeout
$objParallelProcess = ParallelProcess::load(8); // allow up to 8 parallel processes
for ($i = 0; $i < 4; $i++) {
$processName = "Process_{$i}";
echo date('Y-m-d H:i:s') . " - Queue process {$processName}" . PHP_EOL;
$objParallelProcess->queue("php subtask.php {$processName}"); // queue process
}
// loop through process queue
while (($strResponse = $objParallelProcess->readNext()) !== false) { // read next result and run next command if one is queued
// process response
echo $strResponse;
}