为什么不是所有线程都完成?

Why not all threads are completed?

我试过这个 Joe 回答中的例子 并且效果很好,但是当我尝试稍微编辑一下这段代码时:

$pool = new Pool(4);

while (@$i++<10) {
    $pool->submit(new class($i) extends Collectable {
        public function __construct($id) {
            $this->id = $id;
        }

        public function run() {
            printf(
                "Hello World from %d\n", $this->id);
        $this->html = file_get_contents('http://google.fr?q=' . $this->query);
            $this->setGarbage();
        }

        public $id;
public $html;
    });
}

while ($pool->collect(function(Collectable $work){
    printf(
        "Collecting %d\n", $work->id);
var_dump($work->html);
    return $work->isGarbage();
})) continue;

$pool->shutdown();

"Hello world" 的计数与 "Collecting" 的计数不同。 文档已过时。 这个问题怎么办?

Pthreads V3 比 V2 更容易被原谅。 收集在 V3 中是不行的。

规则 n°1: 我在线程内执行所有查询,避免在线程内传递过多的数据。这对 V2 没问题,但对 V3 就不行了。我尽可能整洁地向工人传递论据。这也允许更快的过程。

规则 n°2: 我不会检查每个池可用的 CPU 线程的数量,并相应地使用循环将它们分块。通过这种方式,我可以确保大量池没有内存开销,并且每次循环完成时,我都会强制进行垃圾回收。由于跨线程的 Ram 需求非常高,这对我来说是必要的,可能不是你的情况,但要确保你消耗的 ram 不会超过你的 php 限制。你传递给线程的参数越多,ram 就会越快。

规则 n°3: 使用 (array) 在 workers 中正确声明对象数组以确保返回所有结果。

这是一个基本的重写工作示例,按照您的示例尽可能遵循 3 条规则:

  • 使用查询数组进行多线程。

  • 一个可收集的工具,用于获取结果以代替收集。

  • 根据线程的 CPU nb 分批池以避免 ram 开销。

  • 线程查询,每个查询都有自己的连接,不会跨工作线程传递。

  • 最后将数组中的所有结果推送。

代码:

    define("SQLHOST", "127.0.0.1");
    define("SQLUSER", "root");
    define("SQLPASS", "password");
    define("SQLDBTA", "mydatabase");

    $Nb_of_th=12; // (6 cpu cores in this example)
    $queries = array_chunk($queries, ($Nb_of_th));// whatever list of queries you want to pass to the workers
    $global_data=array();// all results from all pool cycles

    // first we set the main loops
    foreach ($queries as $key => $chunks) {
    $pool = new Pool($Nb_of_th, Worker::class);// 12 pools max
    $workCount = count($chunks);

    // second we launch the submits 
    foreach (range(1, $workCount) as $i) {
        $chunck = $chunks[$i - 1];
        $pool->submit(new MyWorkers($chunck));
    }

    $data = [];// pool cycle result array
    $collector = function (\Collectable $work) use (&$data) {
        $isGarbage = $work->isGarbage();
        if ($isGarbage) {
            $data[] = $work->result; // thread result
        }
        return $isGarbage;
    };

    do {
        $count = $pool->collect($collector);
        $isComplete = count($data) === $workCount;
    } while (!$isComplete);

    array_push($global_data, $data);// push pool results into main

    //complete purge
    unset($data);
    $pool->shutdown();
    unset($pool);
    gc_collect_cycles();// force garbage collector before new pool cycle
    }

    Var_dump($global_data); // results for all pool cycles

    class MyWorkers extends \Threaded implements \Collectable {

    private $isGarbage;
    public $result;
    private $process;

    public function __construct($process) {
        $this->process = $process;
    }

    public function run() {

        $con = new PDO('mysql:host=' . SQLHOST . ';dbname=' . SQLDBTA . ';charset=UTF8', SQLUSER, SQLPASS);
        $proc = (array) $this->process; // important ! avoid volatile destruction in V3
        $stmt = $con->prepare($proc);
        $stmt->execute();
        $obj = $stmt1->fetchall(PDO::FETCH_ASSOC);

        /* do whatever you want to do here */
        $this->result = (array) $obj; // important ! avoid volatile destruction in V3
        $this->isGarbage = true;
    }

    public function isGarbage() : bool
    {
    return $this->isGarbage;
    }
}

Worker::collect并不是为了让你收获成果;是non-deterministic。

Worker::collect 旨在 运行 对 Worker 对象堆栈中引用的对象进行垃圾回收。

如果打算在每个结果可用时对其进行处理,则代码可能如下所示:

<?php
$pool = new Pool(4);
$results = new Volatile();
$expected = 10;
$found = 0;

while (@$i++ < $expected) {
    $pool->submit(new class($i, $results) extends Threaded {

        public function __construct($id, Volatile $results) {
            $this->id = $id;
            $this->results = $results;
        }

        public function run() {
            $result = file_get_contents('http://google.fr?q=' . $this->id);

            $this->results->synchronized(function($results, $result){
                $results[$this->id] = $result;
                $results->notify();
            }, $this->results, $result);
        }

        private $id;
        private $results;
    });
}

do {
    $next = $results->synchronized(function() use(&$found, $results) {
        while (!count($results)) {
            $results->wait();
        }

        $found++;

        return $results->shift();
    });

    var_dump($next);
} while ($found < $expected);

while ($pool->collect()) continue;

$pool->shutdown();
?>

这显然不能很好地容忍错误,但主要区别在于我使用共享的 Volatile 结果集合,并且我会在结果可用时正确同步以在主上下文中获取结果。

如果您想等待所有结果可用,并可能避免一些锁争用——如果可以的话,您应该始终尽量避免——那么代码看起来会更简单,类似于:

<?php
$pool = new Pool(4);
$results = new Volatile();
$expected = 10;

while (@$i++ < $expected) {
    $pool->submit(new class($i, $results) extends Threaded {

        public function __construct($id, Volatile $results) {
            $this->id = $id;
            $this->results = $results;
        }

        public function run() {
            $result = file_get_contents('http://google.fr?q=' . $this->id);

            $this->results->synchronized(function($results, $result){
                $results[$this->id] = $result;
                $results->notify();
            }, $this->results, $result);
        }

        private $id;
        private $results;
    });
}

$results->synchronized(function() use($expected, $results) {
    while (count($results) != $expected) {
        $results->wait();
    }
});

var_dump(count($results));

while ($pool->collect()) continue;

$pool->shutdown();
?>

值得注意的是,Collectable 接口已经在最新版本的 pthreads 中由 Threaded 实现——这是你应该使用的……总是……

文档已过时,对此感到抱歉...一个人...