检测按顺序完成的线程

Detect the threads that finish in order

我有以下多线程代码

class My_Thread extends Thread {

    public function run() {

        /* ... */
        // The time it takes to execut the code is different for each thread
    }
}

// Create a array
$threads = array();


/* *** STEP 1 *** */
//Initiate Miltiple Threads
foreach ( range("A", "B") as $i ) {
    $threads[] = new My_Thread($i);
}


/* *** STEP 2 *** */
// Start The Threads
foreach ($threads as $thread) {
    $thread->start(); // Thread A starts before thread B, and it takes more time to finish
}


/* *** STEP 3 *** */
// Process the threads
foreach ($threads as $thread) {
    if ($thread->join()) {
        /* ... Do Something ... */
    }
}

快速解释代码:

第 1 步:我正在创建两个线程 A 和 B

第 2 步:线程 A 首先启动,并且比线程 B 花费更长的时间完成。

第 3 步:然后,我等待每个线程完成,从线程 A 开始。

现在,问题出在第 3 步。当我遍历线程时,我必须等待线程 A 完成才能进行进一步处理,然而,线程 B 正在等待空闲状态,因为它需要完成时间更短,除非线程 A 在步骤 3 中处理,否则不会被处理。不能保证线程 A 将花费更长的时间,因此我必须编写一个通用的解决方案。

如何确保第 3 步处理先完成的线程?也就是说,有没有类似这样的伪代码?

/* *** STEP 3 *** */
// Do the following for all threads in the $threads array, FIRST COME FIRST SERVE
// If the thread finished STEP 2, then immediately process it.

谢谢。

首先,Thread代表一个执行上下文

您需要做的是分别考虑上下文和数据...

<?php
class Test extends Thread {

    public function __construct(Volatile $queue, $value) {
        $this->queue = $queue;
        $this->value = $value;
    }

    public function run() {
        $data = strlen(
            file_get_contents("http://www.google.co.uk/?q={$this->value}"));

        usleep(mt_rand(10000, 20000));

        $this->queue->synchronized(function($queue, $value, $data) {
            $queue[] = (array) [
                $value => $data
            ];
            $queue->notify();
        }, $this->queue, $this->value, $data);
    }

    private $queue;
    private $value;
}

$chars = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J"];
$queue = new Volatile();
$tests = [];

for ($test = 0; $test < 10; $test++) {
    $tests[$test] = new Test($queue, $chars[$test]);
    $tests[$test]->start();
}

$test = 0;

while (($next = $queue->synchronized(function() use($queue, &$test) {
    /* guard infinite loop */
    if (++$test > 10)
        return false;

    /* predicated wait for data */
    while (!count($queue))
        $queue->wait();

    /* return next item */
    return $queue->shift();
}))) {
    var_dump($next);
}

foreach ($tests as $thread)
    $thread->join();
?>

以上代码适用于 pthreads v3,PHP7,这是可用的最佳版本,也是新项目使用的推荐版本。

解决方案的核心包含在 Test::run 和主要上下文中的 while 循环中。

$data = strlen(
    file_get_contents("http://www.google.co.uk/?q={$this->value}"));

usleep(mt_rand(10000, 20000));

这是为了从 google 中获取一些垃圾,碰巧响应时间非常一致,我不得不添加 usleep,只是为了让您看到订单没有'正确完成并不重要。

你不应该在现实世界的多线程代码中使用 usleep

$this->queue->synchronized(function($queue, $value, $data) {
    $queue[] = (array) [
        $value => $data
    ];
    $queue->notify();
}, $this->queue, $this->value, $data);

生成一些数据后,每个 Test 与队列同步,向其附加一些数据,并向当前等待的任何上下文发送通知。

与此同时,这正在进行中:

$test = 0;

while (($next = $queue->synchronized(function() use($queue, &$test) {
    /* guard infinite loop */
    if (++$test > 10)
        return false;

    /* predicated wait for data */
    while (!count($queue))
        $queue->wait();

    /* return next item */
    return $queue->shift();
}))) {
    var_dump($next);
}

主上下文与队列同步,而在同步块中它防止无限循环(因为我们知道有多少数据即将到来),然后如果队列中没有数据,它会等待一些变得可用。最后将队列中的第一项返回到主上下文。

上面的代码会输出如下内容:

array(1) {
  ["I"]=>
  int(188965)
}
array(1) {
  ["B"]=>
  int(188977)
}
array(1) {
  ["C"]=>
  int(188921)
}
array(1) {
  ["F"]=>
  int(188962)
}
array(1) {
  ["J"]=>
  int(188954)
}
array(1) {
  ["A"]=>
  int(188912)
}
array(1) {
  ["E"]=>
  int(188929)
}
array(1) {
  ["G"]=>
  int(188941)
}
array(1) {
  ["D"]=>
  int(188946)
}
array(1) {
  ["H"]=>
  int(188929)
}

这里的关键是上下文和数据是不同的问题。