为什么不是所有线程都完成?
Why not all threads are completed?
我试过这个 Joe 回答中的例子 并且效果很好,但是当我尝试稍微编辑一下这段代码时:
$pool = new Pool(4);
while (@$i++<10) {
$pool->submit(new class($i) extends Collectable {
public function __construct($id) {
$this->id = $id;
}
public function run() {
printf(
"Hello World from %d\n", $this->id);
$this->html = file_get_contents('http://google.fr?q=' . $this->query);
$this->setGarbage();
}
public $id;
public $html;
});
}
while ($pool->collect(function(Collectable $work){
printf(
"Collecting %d\n", $work->id);
var_dump($work->html);
return $work->isGarbage();
})) continue;
$pool->shutdown();
"Hello world" 的计数与 "Collecting" 的计数不同。
文档已过时。
这个问题怎么办?
Pthreads V3 比 V2 更容易被原谅。
收集在 V3 中是不行的。
规则 n°1: 我在线程内执行所有查询,避免在线程内传递过多的数据。这对 V2 没问题,但对 V3 就不行了。我尽可能整洁地向工人传递论据。这也允许更快的过程。
规则 n°2: 我不会检查每个池可用的 CPU 线程的数量,并相应地使用循环将它们分块。通过这种方式,我可以确保大量池没有内存开销,并且每次循环完成时,我都会强制进行垃圾回收。由于跨线程的 Ram 需求非常高,这对我来说是必要的,可能不是你的情况,但要确保你消耗的 ram 不会超过你的 php 限制。你传递给线程的参数越多,ram 就会越快。
规则 n°3: 使用 (array) 在 workers 中正确声明对象数组以确保返回所有结果。
这是一个基本的重写工作示例,按照您的示例尽可能遵循 3 条规则:
使用查询数组进行多线程。
一个可收集的工具,用于获取结果以代替收集。
根据线程的 CPU nb 分批池以避免 ram 开销。
线程查询,每个查询都有自己的连接,不会跨工作线程传递。
最后将数组中的所有结果推送。
代码:
define("SQLHOST", "127.0.0.1");
define("SQLUSER", "root");
define("SQLPASS", "password");
define("SQLDBTA", "mydatabase");
$Nb_of_th=12; // (6 cpu cores in this example)
$queries = array_chunk($queries, ($Nb_of_th));// whatever list of queries you want to pass to the workers
$global_data=array();// all results from all pool cycles
// first we set the main loops
foreach ($queries as $key => $chunks) {
$pool = new Pool($Nb_of_th, Worker::class);// 12 pools max
$workCount = count($chunks);
// second we launch the submits
foreach (range(1, $workCount) as $i) {
$chunck = $chunks[$i - 1];
$pool->submit(new MyWorkers($chunck));
}
$data = [];// pool cycle result array
$collector = function (\Collectable $work) use (&$data) {
$isGarbage = $work->isGarbage();
if ($isGarbage) {
$data[] = $work->result; // thread result
}
return $isGarbage;
};
do {
$count = $pool->collect($collector);
$isComplete = count($data) === $workCount;
} while (!$isComplete);
array_push($global_data, $data);// push pool results into main
//complete purge
unset($data);
$pool->shutdown();
unset($pool);
gc_collect_cycles();// force garbage collector before new pool cycle
}
Var_dump($global_data); // results for all pool cycles
class MyWorkers extends \Threaded implements \Collectable {
private $isGarbage;
public $result;
private $process;
public function __construct($process) {
$this->process = $process;
}
public function run() {
$con = new PDO('mysql:host=' . SQLHOST . ';dbname=' . SQLDBTA . ';charset=UTF8', SQLUSER, SQLPASS);
$proc = (array) $this->process; // important ! avoid volatile destruction in V3
$stmt = $con->prepare($proc);
$stmt->execute();
$obj = $stmt1->fetchall(PDO::FETCH_ASSOC);
/* do whatever you want to do here */
$this->result = (array) $obj; // important ! avoid volatile destruction in V3
$this->isGarbage = true;
}
public function isGarbage() : bool
{
return $this->isGarbage;
}
}
Worker::collect
并不是为了让你收获成果;是non-deterministic。
Worker::collect
仅 旨在 运行 对 Worker
对象堆栈中引用的对象进行垃圾回收。
如果打算在每个结果可用时对其进行处理,则代码可能如下所示:
<?php
$pool = new Pool(4);
$results = new Volatile();
$expected = 10;
$found = 0;
while (@$i++ < $expected) {
$pool->submit(new class($i, $results) extends Threaded {
public function __construct($id, Volatile $results) {
$this->id = $id;
$this->results = $results;
}
public function run() {
$result = file_get_contents('http://google.fr?q=' . $this->id);
$this->results->synchronized(function($results, $result){
$results[$this->id] = $result;
$results->notify();
}, $this->results, $result);
}
private $id;
private $results;
});
}
do {
$next = $results->synchronized(function() use(&$found, $results) {
while (!count($results)) {
$results->wait();
}
$found++;
return $results->shift();
});
var_dump($next);
} while ($found < $expected);
while ($pool->collect()) continue;
$pool->shutdown();
?>
这显然不能很好地容忍错误,但主要区别在于我使用共享的 Volatile
结果集合,并且我会在结果可用时正确同步以在主上下文中获取结果。
如果您想等待所有结果可用,并可能避免一些锁争用——如果可以的话,您应该始终尽量避免——那么代码看起来会更简单,类似于:
<?php
$pool = new Pool(4);
$results = new Volatile();
$expected = 10;
while (@$i++ < $expected) {
$pool->submit(new class($i, $results) extends Threaded {
public function __construct($id, Volatile $results) {
$this->id = $id;
$this->results = $results;
}
public function run() {
$result = file_get_contents('http://google.fr?q=' . $this->id);
$this->results->synchronized(function($results, $result){
$results[$this->id] = $result;
$results->notify();
}, $this->results, $result);
}
private $id;
private $results;
});
}
$results->synchronized(function() use($expected, $results) {
while (count($results) != $expected) {
$results->wait();
}
});
var_dump(count($results));
while ($pool->collect()) continue;
$pool->shutdown();
?>
值得注意的是,Collectable
接口已经在最新版本的 pthreads 中由 Threaded
实现——这是你应该使用的……总是……
文档已过时,对此感到抱歉...一个人...
我试过这个 Joe 回答中的例子 并且效果很好,但是当我尝试稍微编辑一下这段代码时:
$pool = new Pool(4);
while (@$i++<10) {
$pool->submit(new class($i) extends Collectable {
public function __construct($id) {
$this->id = $id;
}
public function run() {
printf(
"Hello World from %d\n", $this->id);
$this->html = file_get_contents('http://google.fr?q=' . $this->query);
$this->setGarbage();
}
public $id;
public $html;
});
}
while ($pool->collect(function(Collectable $work){
printf(
"Collecting %d\n", $work->id);
var_dump($work->html);
return $work->isGarbage();
})) continue;
$pool->shutdown();
"Hello world" 的计数与 "Collecting" 的计数不同。 文档已过时。 这个问题怎么办?
Pthreads V3 比 V2 更容易被原谅。 收集在 V3 中是不行的。
规则 n°1: 我在线程内执行所有查询,避免在线程内传递过多的数据。这对 V2 没问题,但对 V3 就不行了。我尽可能整洁地向工人传递论据。这也允许更快的过程。
规则 n°2: 我不会检查每个池可用的 CPU 线程的数量,并相应地使用循环将它们分块。通过这种方式,我可以确保大量池没有内存开销,并且每次循环完成时,我都会强制进行垃圾回收。由于跨线程的 Ram 需求非常高,这对我来说是必要的,可能不是你的情况,但要确保你消耗的 ram 不会超过你的 php 限制。你传递给线程的参数越多,ram 就会越快。
规则 n°3: 使用 (array) 在 workers 中正确声明对象数组以确保返回所有结果。
这是一个基本的重写工作示例,按照您的示例尽可能遵循 3 条规则:
使用查询数组进行多线程。
一个可收集的工具,用于获取结果以代替收集。
根据线程的 CPU nb 分批池以避免 ram 开销。
线程查询,每个查询都有自己的连接,不会跨工作线程传递。
最后将数组中的所有结果推送。
代码:
define("SQLHOST", "127.0.0.1");
define("SQLUSER", "root");
define("SQLPASS", "password");
define("SQLDBTA", "mydatabase");
$Nb_of_th=12; // (6 cpu cores in this example)
$queries = array_chunk($queries, ($Nb_of_th));// whatever list of queries you want to pass to the workers
$global_data=array();// all results from all pool cycles
// first we set the main loops
foreach ($queries as $key => $chunks) {
$pool = new Pool($Nb_of_th, Worker::class);// 12 pools max
$workCount = count($chunks);
// second we launch the submits
foreach (range(1, $workCount) as $i) {
$chunck = $chunks[$i - 1];
$pool->submit(new MyWorkers($chunck));
}
$data = [];// pool cycle result array
$collector = function (\Collectable $work) use (&$data) {
$isGarbage = $work->isGarbage();
if ($isGarbage) {
$data[] = $work->result; // thread result
}
return $isGarbage;
};
do {
$count = $pool->collect($collector);
$isComplete = count($data) === $workCount;
} while (!$isComplete);
array_push($global_data, $data);// push pool results into main
//complete purge
unset($data);
$pool->shutdown();
unset($pool);
gc_collect_cycles();// force garbage collector before new pool cycle
}
Var_dump($global_data); // results for all pool cycles
class MyWorkers extends \Threaded implements \Collectable {
private $isGarbage;
public $result;
private $process;
public function __construct($process) {
$this->process = $process;
}
public function run() {
$con = new PDO('mysql:host=' . SQLHOST . ';dbname=' . SQLDBTA . ';charset=UTF8', SQLUSER, SQLPASS);
$proc = (array) $this->process; // important ! avoid volatile destruction in V3
$stmt = $con->prepare($proc);
$stmt->execute();
$obj = $stmt1->fetchall(PDO::FETCH_ASSOC);
/* do whatever you want to do here */
$this->result = (array) $obj; // important ! avoid volatile destruction in V3
$this->isGarbage = true;
}
public function isGarbage() : bool
{
return $this->isGarbage;
}
}
Worker::collect
并不是为了让你收获成果;是non-deterministic。
Worker::collect
仅 旨在 运行 对 Worker
对象堆栈中引用的对象进行垃圾回收。
如果打算在每个结果可用时对其进行处理,则代码可能如下所示:
<?php
$pool = new Pool(4);
$results = new Volatile();
$expected = 10;
$found = 0;
while (@$i++ < $expected) {
$pool->submit(new class($i, $results) extends Threaded {
public function __construct($id, Volatile $results) {
$this->id = $id;
$this->results = $results;
}
public function run() {
$result = file_get_contents('http://google.fr?q=' . $this->id);
$this->results->synchronized(function($results, $result){
$results[$this->id] = $result;
$results->notify();
}, $this->results, $result);
}
private $id;
private $results;
});
}
do {
$next = $results->synchronized(function() use(&$found, $results) {
while (!count($results)) {
$results->wait();
}
$found++;
return $results->shift();
});
var_dump($next);
} while ($found < $expected);
while ($pool->collect()) continue;
$pool->shutdown();
?>
这显然不能很好地容忍错误,但主要区别在于我使用共享的 Volatile
结果集合,并且我会在结果可用时正确同步以在主上下文中获取结果。
如果您想等待所有结果可用,并可能避免一些锁争用——如果可以的话,您应该始终尽量避免——那么代码看起来会更简单,类似于:
<?php
$pool = new Pool(4);
$results = new Volatile();
$expected = 10;
while (@$i++ < $expected) {
$pool->submit(new class($i, $results) extends Threaded {
public function __construct($id, Volatile $results) {
$this->id = $id;
$this->results = $results;
}
public function run() {
$result = file_get_contents('http://google.fr?q=' . $this->id);
$this->results->synchronized(function($results, $result){
$results[$this->id] = $result;
$results->notify();
}, $this->results, $result);
}
private $id;
private $results;
});
}
$results->synchronized(function() use($expected, $results) {
while (count($results) != $expected) {
$results->wait();
}
});
var_dump(count($results));
while ($pool->collect()) continue;
$pool->shutdown();
?>
值得注意的是,Collectable
接口已经在最新版本的 pthreads 中由 Threaded
实现——这是你应该使用的……总是……
文档已过时,对此感到抱歉...一个人...