koulab

技術系のメモ

PHP7.4以上でマルチスレッドができるExtension parallelを使う

  • parallel

https://www.php.net/manual/ja/book.parallel.php

導入方法

Linux/Unix

pecl install parallel

または https://github.com/krakjoe/parallel/blob/develop/INSTALL.md を参考にコンパイル

Windows

以下から最新バージョンをダウンロードしてphp_parallel.dllはextディレクトリに入れる。 他はphp.exeがあるディレクトリと同じ場所に置く

https://windows.php.net/downloads/pecl/releases/parallel/

Hello world

githubにあるサンプルを試す

<?php
$runtime = new \parallel\Runtime();

$future = $runtime->run(function(){
    for ($i = 0; $i < 500; $i++)
        echo "*";

    return "easy";
});

for ($i = 0; $i < 500; $i++) {
    echo ".";
}

printf("\nUsing \\parallel\\Runtime is %s\n", $future->value());

もう少しわかりやすいサンプルコードとして以下に示します

2つのスレッドを立ち上げ、すべてのタスクが終わるまで->value()で待機します

片方は10秒かかり、一方は8秒で処理が完了します。

これを並列で処理すると10秒で終わるはずです。

<?php
use parallel\{Channel,Runtime,Events,Events\Event};

$start = time();
$runtime1 = new Runtime();
$runtime2=  new Runtime();

$runnable1 = function($a,$b,$c) {
    echo $a.$b.$c.PHP_EOL;
    sleep(10);
    return 'Success Runnable1';
};
$runnable2 = function($a,$b,$c) {
    echo $a.$b.$c.PHP_EOL;
    sleep(8);
    return 'Success Runnable2';
};

$arg1 = ['111','222','333'];
$arg2 = ['444','555','666'];

$futures = [];
$futures[] = $runtime1->run($runnable1,$arg1);
$futures[]  = $runtime2->run($runnable2,$arg2);

foreach ($futures as $future){
    echo $future->value().PHP_EOL;
}
$runtime1->close();
$runtime2->close();

$end = time();
echo 'Done! '.($end-$start).'sec'.PHP_EOL;

出力は以下のようになります

111222333
444555666
Success Runnable1
Success Runnable2
Done! 10sec

では次のような場合

<?php
use parallel\{Channel,Runtime,Events,Events\Event};

$start = time();
$runtime1 = new Runtime();
$runtime2=  new Runtime();

$runnable1 = function($a,$b,$c,$name) {
    echo $a.$b.$c.PHP_EOL;
    sleep(10);
    return 'Success Runnable1 :'.$name;
};
$runnable2 = function($a,$b,$c,$name) {
    echo $a.$b.$c.PHP_EOL;
    sleep(8);
    return 'Success Runnable2 :'.$name;
};


$futures = [];
$futures[] = $runtime1->run($runnable1,['111','222','333','thread1']);
$futures[] = $runtime1->run($runnable1,['777','888','999','thread1-2']);
$futures[]  = $runtime2->run($runnable2,['444','555','666','thread2']);

foreach ($futures as $future){
    echo $future->value().PHP_EOL;
}
$runtime1->close();
$runtime2->close();

$end = time();
echo 'Done! '.($end-$start).'sec'.PHP_EOL;

出力は次の通りです

444555666
111222333
777888999
Success Runnable1 :thread1
Success Runnable1 :thread1-2
Success Runnable2 :thread2
Done! 20sec

最初にruntime1と2が実行され、その後に待ち状態のruntime1が実行されるので合計で20秒かかります。

あとは、HTTPリクエストを並列のサンプルを書くと以下のような感じ。でもスレッド生成コストかかるので非同期処理させたほうが良いのでサンプルとしてはあまり良くない

こんな感じでクラス作っておくと便利かも。

<?php

use parallel\{Channel,Runtime,Events,Events\Event};

class BalanceRuntime {
    protected $numberOfThreads = 2;
    protected $currentBalance = 1;

    protected $runtimes = [];

    /**
     * @param int $numberOfThreads
     * @return BalanceRuntime
     */
    public function setNumberOfThreads(int $numberOfThreads): BalanceRuntime
    {
        $this->numberOfThreads = $numberOfThreads;
        return $this;
    }

    /**
     * @return int
     */
    public function getNumberOfThreads(): int
    {
        return $this->numberOfThreads;
    }

    public function addNextRuntime(){
        $this->currentBalance = $this->currentBalance + 1;
        if($this->currentBalance > $this->numberOfThreads){
            $this->currentBalance = 1;
        }
    }

    public function generateRuntime(){
        for($i = 1; $i <= $this->getNumberOfThreads(); $i++) {
            $runtimes[$i] = new Runtime();
        }
        $this->runtimes = $runtimes;
    }

    public function getAllRuntime() : array {
        return $this->runtimes;
    }

    public function getNextRuntime() : Runtime{
        return $this->runtimes[$this->getNextRuntimeKey()];
    }

    public function getNextRuntimeKey(){
        $b =  $this->currentBalance;
        $this->addNextRuntime();
        return $b;
    }

    public function __destruct()
    {
        foreach ($this->getAllRuntime() as $r){
            $r->close();
        }
    }


}
<?php
require 'BalanceRuntime.php';

$runnable = function($url,$timeout){
    $ch = curl_init();
    
    curl_setopt($ch, CURLOPT_URL, $url);
    curl_setopt($ch, CURLOPT_HEADER, false);
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
    curl_setopt($ch,CURLOPT_TIMEOUT,$timeout);
    curl_setopt($ch, CURLOPT_FOLLOWLOCATION ,true);
    curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
    curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
    curl_setopt($ch, CURLOPT_CONNECTTIMEOUT ,$timeout);

    $reply = curl_exec($ch);

    return mb_strimwidth($reply,0,500,'...');
};
$to = new BalanceRuntime();
$to->setNumberOfThreads(3)->generateRuntime();

$futures = [];
$futures[] = $to->getNextRuntime()->run($runnable,['https://www.yahoo.co.jp/',5]);
$futures[] = $to->getNextRuntime()->run($runnable,['http://httpbin.org/ip',5]);
$futures[] = $to->getNextRuntime()->run($runnable,['http://google.com',5]);
$futures[] = $to->getNextRuntime()->run($runnable,['http://bing.com',5]);

foreach($futures as $f){
    var_dump($f->value());
}

ほかにもchannelやeventsなどがあるのでそちらも活用していくと良いかと思います https://www.php.net/parallel

https://github.com/krakjoe/parallel/issues/10