PHP popen进程限制?

PHP popen process limit?

我正在尝试将一项耗时的任务提取到一个单独的进程中。不幸的是,多线程似乎并不是 PHP 的一个选项,但您可以使用 popen 创建新的 php 进程。

用例是这样的:有一个每分钟运行的 cronjob,它检查是否有任何电子邮件活动需要发送。可能需要同时发送多个活动,但截至目前,它每分钟只接收一个活动。我想将活动的发送提取到一个单独的进程,以便我可以同时发送多个活动。

代码看起来像这样(请注意,这只是概念证明):

crontab

* * * * * root /usr/local/bin/php /var/www/maintask.php 2>&1

maintask.php

for ($i = 0; $i < 4; $i++) {
    $processName = "Process_{$i}";
    echo "Spawn process {$processName}" . PHP_EOL;

    $process = popen("php subtask.php?process_name={$processName} 2>&1", "r");
    stream_set_blocking($process, false);
}

subtask.php

$process = $_GET['process_name'];

echo "Started sleeping process {$process}" . PHP_EOL;
sleep(rand(10, 40));
echo "Stopped sleeping process  {$process}" . PHP_EOL;

现在,我遇到的问题是 popen 在任何时候都只会产生 2 个进程,而我正试图产生 4 个进程。我不明白为什么。似乎没有记录任何限制。也许这是受我可用内核数量的限制?

我修改了 subtask.php 以便您可以看到每个任务何时开始、结束以及打算等待多长时间。现在您可以看到进程何时 starts/stops 可以减少睡眠时间 - 不需要使用 ps -aux 来显示进程何时 运行

subtask.php

<?php
$process = $argv[1];

$sleepTime = rand(1, 10);
echo date('Y-m-d H:i:s') . " - Started sleeping process {$process} ({$sleepTime})" . PHP_EOL;
sleep($sleepTime);
echo date('Y-m-d H:i:s') . " - Stopped sleeping process {$process}" . PHP_EOL;

我已将 Class 添加到 maintask.php 代码中,以便您可以对其进行测试...当您 queue() 的条目数超过您设置的 [=15] 时,乐趣就开始了=](尝试 32)
注意:结果将按完成的顺序返回

maintask.php

<?php
class ParallelProcess
{
    private $maxProcesses = 16; // maximum processes
    private $arrProcessQueue = [];
    private $arrCommandQueue = [];

    private function __construct()
    {
    }

    private function __clone()
    {
    }

    /**
     *
     * @return \static
     */
    public static function create()
    {
        $result = new static();
        return $result;
    }

    /**
     *
     * @param int $maxProcesses
     * @return \static
     */
    public static function load($maxProcesses = 16)
    {
        $result = self::create();
        $result->setMaxProcesses($maxProcesses);
        return $result;
    }

    /**
     * get maximum processes
     *
     * @return int
     */
    public function getMaxProcesses()
    {
        return $this->maxProcesses;
    }

    /**
     * set maximum processes
     *
     * @param int $maxProcesses
     * @return $this
     */
    public function setMaxProcesses($maxProcesses)
    {
        $this->maxProcesses = $maxProcesses;
        return $this;
    }

    /**
     * number of entries in the process queue
     *
     * @return int
     */
    public function processQueueLength()
    {
        $result = count($this->arrProcessQueue);
        return $result;
    }

    /**
     * number of entries in the command queue
     *
     * @return int
     */
    public function commandQueueLength()
    {
        $result = count($this->arrCommandQueue);
        return $result;
    }


    /**
     * process open
     *
     * @staticvar array $arrDescriptorspec
     * @param string $strCommand
     * @return $this
     * @throws \Exception
     */
    private function p_open($strCommand)
    {
        static $arrDescriptorSpec = array(
            0 => array('file', '/dev/null', 'r'), // stdin is a file that the child will reda from
            1 => array('pipe', 'w'), // stdout is a pipe that the child will write to
            2 => array('file', '/dev/null', 'w') // stderr is a pipe that the child will write to
        );

        $arrPipes = array();
        if (($resProcess = proc_open($strCommand, $arrDescriptorSpec, $arrPipes)) === false) {
            throw new \Exception("error: proc_open() failed!");
        }

        $resStream = &$arrPipes[1];

        if (($blnSetBlockingResult = stream_set_blocking($resStream, true)) === false) {
            throw new \Exception("error: stream_set_blocking() failed!");
        }

        $this->arrProcessQueue[] = array(&$strCommand, &$resProcess, &$resStream);
        return $this;
    }

    /**
     * execute any queued commands
     *
     * @return $this
     */
    private function executeCommand()
    {
        while ($this->processQueueLength() < $this->maxProcesses and $this->commandQueueLength() > 0) {
            $strCommand = array_shift($this->arrCommandQueue);
            $this->p_open($strCommand);
        }
        return $this;
    }

    /**
     * process close
     *
     * @param array $arrQueueEntry
     * @return $this
     */
    private function p_close(array $arrQueueEntry)
    {
        $resProcess = $arrQueueEntry[1];
        $resStream = $arrQueueEntry[2];

        fclose($resStream);

        $this->returnValue = proc_close($resProcess);

        $this->executeCommand();
        return $this;
    }

    /**
     * queue command
     *
     * @param string $strCommand
     * @return $this
     */
    public function queue($strCommand) {
        // put the command on the $arrCommandQueue
        $this->arrCommandQueue[] = $strCommand;
        $this->executeCommand();
        return $this;
    }

    /**
     * read from stream
     *
     * @param resource $resStream
     * @return string
     */
    private static function readStream($resStream)
    {
        $result = '';
        while (($line = fgets($resStream)) !== false) {
            $result .= $line;
        }
        return $result;
    }

    /**
     * read a result from the process queue
     *
     * @return string|false
     */
    private function readProcessQueue()
    {
        $result = false;
        reset($this->arrProcessQueue);
        while ($result === false && list($key, $arrQueueEntry) = each($this->arrProcessQueue)) {
            $arrStatus = proc_get_status($arrQueueEntry[1]);
            if ($arrStatus['running'] === false) {
                array_splice($this->arrProcessQueue, $key, 1);
                $resStream = $arrQueueEntry[2];
                $result = self::readStream($resStream);
                $this->p_close($arrQueueEntry);
            }
        }
        return $result;
    }

    /**
     * get result from process queue
     *
     * @return string|false
     */
    public function readNext()
    {
        $result = false;
        if ($this->processQueueLength() === 0) {
        } else {
            while ($result === false and $this->processQueueLength() > 0) {
                $result = $this->readProcessQueue();
            }
        }
        return $result;
    }
}

set_time_limit(0); // don't timeout

$objParallelProcess = ParallelProcess::load(8); // allow up to 8 parallel processes

for ($i = 0; $i < 4; $i++) {
    $processName = "Process_{$i}";
    echo date('Y-m-d H:i:s') . " - Queue process {$processName}" . PHP_EOL;
    $objParallelProcess->queue("php subtask.php {$processName}"); // queue process
}

// loop through process queue
while (($strResponse = $objParallelProcess->readNext()) !== false) { // read next result and run next command if one is queued
    // process response
    echo $strResponse;
}