图片image.png


我们知道,PHP 最初是为了支持同步开发而创建的,因此大多数 PHP 开发人员习惯于仅使用php编写同步代码,那么我们能用Php来进行异步编程吗。答案是可以的。

什么是异步编程

当我们用php编写的应用程序I/O任务时,程序会在执行某个任务之前,一定要等待之前的任务完成,这时CPU会有很多时间处于空闲状态,这不仅会降低应用程序性能,还会降低硬件利用率。比如,当程序需要从数据库中读取大量的数据时,由于需要等待I/O操作完成,程序的执行速度会非常缓慢。因此,我们通过Swoole扩展或者一些如Reactphp的库,在程序执行的过程中,不需要等待某个任务完成才能执行下一个任务。这种编程模式可以极大地提高程序的效率和响应速度,尤其在处理复杂的I/O操作时表现得更为出色,而这就是异步编程

Php pcntl扩展

通过异步编程的概念可知,异步编程的可以通过多进程来实现,在php中,pcntl扩展是首选的用来处理多进程的库,我们先测一段同步的代码

<?php
class AsyncPhp{
    public function curl($url)
    {
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($ch, CURLOPT_HEADER, 0);
        $output = curl_exec($ch);
        curl_close($ch);
        return $output;
    }
    public function testSync(){
        $url = 'https://www.baidu.com/?i=';
        $start=microtime(true);    
        for($i=0;$i<5;$i++){
            $this->curl($url.$i);
            echo '访问第'.($i+1).'次百度'."\n";
        }
        $end=microtime(true);

        echo "\n总用时",$end-$start."\n";        
    }
}

$tester = new AsyncPhp();
$tester->testSync();

当我们同步请求5次百度时,是0.9秒

图片


这时为了提高性能,pcntl就可以派上用场。
pcntl是一个可以利用操作系统的fork系统调用在PHP中实现多线程的进程控制扩展,当使用fork系统调用后执行的代码将会是并行的。pcntl的缺点是只适用于Linux平台的CLI模式下使用
我们在刚才的测试来新增如下方法来测试

    public function testAsync(){
        $url = 'https://www.baidu.com/?i=';
        $start=microtime(true); 
        $i=0;
        while($i<5){
            $pids[$i] = pcntl_fork(); //创建子进程
             if($pids[$i] == 0){ //返回0表示在子进程
                 $this->curl($url.$i);; //子进程执行代码
                 exit(0);
             }
             $i++;
         }
         //等待进程关闭
         for($i=0;$i<5;$i++){
             pcntl_waitpid($pids[$i],$status,WUNTRACED);//等待进程结束
             if(pcntl_wifexited($status)){
                 //子进程完成退出
                 echo '第'.$i.'次访问百度,用时:'.microtime(true)-$start."\n";
             }
         } 

        $end=microtime(true);

        echo "\n总用时",$end-$start."\n";        
    } 

在上面的代码中,当程序运行pcntl_fork时,Linux系统立fork出一个子进程,并在子进程中返回0,所以$this->curl是在子进程中运行的,exit(0)是终止了这个子进程,为什么要终止?如果不终止,子进程就会继续执行while这样最终导致产生很多次访问,而我们只要5次。pcntl_waitpid用于等待子进程结束,这样就可以计算子进程运行时间了。代码执行结果如下

图片


通过对比可知,php使用pcntl扩展实现异步总用时比传统同步方式少用时0.7秒,性能提升了3倍多。但是pcntl只能跑在cli环境下,传统php-fpm环境下是无法使用的,这时我们可以在网络请求这边想办法,于是有了curl_multi

使用curl_multi函数组

**curl_multi函数组允许异步处理多个 cURL 句柄,**下面是具体代码

    public function getCurl($url)
    {
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($ch, CURLOPT_HEADER, 0);
        return $ch;
    }    
    public function testCurlMulti(){
        $url = 'https://www.baidu.com/?i=';
        $start=microtime(true); 
        $mh = curl_multi_init();

        $curl_array = array();
        for ($i=0; $i < 5; $i++) { 
            $curl_array[$i] = $this->getCurl($url.$i);
            curl_multi_add_handle($mh, $curl_array[$i]);
        }
        $running = NULL;
        do {
            curl_multi_exec($mh,$running);
        } while($running > 0);

        foreach($curl_array as $i=>$item){
            $cotnent = curl_multi_getcontent($curl_array[$i]);
            echo '第'.$i.'次访问百度,用时:'.microtime(true)-$start."\n";
            curl_multi_remove_handle($mh, $curl_array[$i]);
        }
        curl_multi_close($mh);  

        $end=microtime(true);

        echo "\n总用时",$end-$start."\n";            
    }

在上面的代码中,curl_multi_exec函数将并行执行每个curl句柄。最终总用时如下:

图片


也比同步的代码快了一倍了
curl_multi函数组在很多场景下都有使用到,特别是在网络爬虫开发中会经常使用到

Parallel扩展

同pcntl一样,Parallel[1]也是一个php扩展,安装方法:https://github.com/krakjoe/parallel/blob/develop/INSTALL.md,如果安装时报如下错误,

图片


则需要重新编译php,编译方式如下:

wget https://www.php.net/distributions/php-8.1.1.tar.gz
tar -zxvf ./php-8.1.1.tar.gz
cd php-8.1.1
# 编译
./configure --prefix=/usr/local/php81 --enable-debug --enable-zts --with-config-file-path=/usr/local/php81/etc --with-config-file-scan-dir=/usr/local/php81/conf.d --with-mysqli=mysqlnd --with-pdo-mysql=mysqlnd --with-iconv=/usr/local --with-freetype=/usr/local/freetype --with-jpeg --with-zlib --enable-xml --disable-rpath --enable-bcmath --enable-shmop --enable-sysvsem --with-curl --enable-mbregex --enable-mbstring --enable-intl --enable-pcntl --enable-ftp --enable-gd --with-openssl --with-mhash --enable-pcntl --enable-sockets --with-zip --enable-soap --with-gettext --enable-opcache --with-xsl --with-pear --disable-mbregex
make && make install
# 搞定
ln -/usr/local/php81/bin/php /usr/bin/php81
php -v

以上安装过程中有可能会出错,出错的解决方法请看我的另一篇文章《为了编译启用ZTS的php,我一路披荆斩棘》
成功安装php之后,会打印如下结果

图片


ZTS的php环境有了,就可以安装parallel了

git clone https://github.com/krakjoe/parallel.git
cd parallel
phpize
./configure --enable-parallel  [ --enable-parallel-coverage ] [ --enable-parallel-dev ]
make
make test
make install

安装成功后是这样子的

图片


我们继续新增testParallel方法。代码如下

    public function testParallel(){        
        $ch = new Channel();
        
        // executed within thread
        $start=microtime(true); 
        $task = function (Channel $channel, int $i){
            $url = 'https://www.baidu.com/?i=';
            $url = $url.$i;
            $ch = curl_init();
            curl_setopt($ch, CURLOPT_URL, $url);
            curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
            curl_setopt($ch, CURLOPT_HEADER, 0);
            $output = curl_exec($ch);
            curl_close($ch);
            // echo $output;
            echo "第".$i."次访问百度\n";
              $channel->send($output);       
        };
        
        // creating a few threads
        $runtimeList = [];
        for ($i = 0; $i < 5; $i++) {
            $runtimeList[] = new Runtime();
        }
        // run all threads
        $futureList = [];
        foreach ($runtimeList as $i => $runtime) {
            $futureList[] = $runtime->run($task, [$ch, $i]);
        }
        
        $ch->close();
        
        // echo $queue . PHP_EOL;
    }

Parallel底层用的也是多进程机制,但术语一般是说成线程,上面的代码中,我们定义了5个线程,并发运行结果如下

图片


ReactPHP

图片


与前面几个扩展库不同,ReactPHP[2] 是一个用PHP编写的基于事件驱动编程(EventLoop)的低级库。其核心是事件循环,在其之上提供低级实用程序,例如:流抽象、异步 DNS 解析器、网络客户端/服务器、HTTP 客户端/服务器以及与进程的交互。
安装ReactPHP很简单,直接用composer安装

composer require react/react

ReactPHP 有各种组件,例如事件循环、promise 和流。当我们安装 ReactPHP 时,这些组件和其他一些组件都会被安装,因此您不必单独安装
ReactPHP异步编程的写法采用了类Promise的设计,熟悉ES6 Promise的话上手很快。

require __DIR__ . '/vendor/autoload.php';

$browser = new React\Http\Browser();
$url = 'https://www.baidu.com/?i=';
$start = microtime(true);

for ($i=0; $i < 500; $i++) { 
  $browser->get($url.$i)->then(function (Psr\Http\Message\ResponseInterface $response) use($i,$start){
    $result = $response->getBody();
    echo "第{$i}次访问百度完成,用时:".microtime(true)-$start."\n";
  }, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
  });
}

上面的代码确实实现了异步编程。但我们发现用时并没有多大的提升

图片


总用时为0.63秒?为什么?
我们把5次访问改为500次,运行程序时,我们新开个命令行窗口,输入ps -ef | grep php来观察php进程

图片


我们发现始终都只有一个php ReactPhp2.php的进程,说明Reactphp没有采用多进程机制,但它的优点大于它的缺点。

  1. 1. ReactPHP安装非常方便,直接用composer require就行,不需要其他php扩展的支持

  2. 2. ReactPHP对异步编程进行了非常好的支持,书写语法非常友好。具体查看官方文档就知道了

Fibers

2021 年 11 月发布的 PHP 8.1 开始支持了一个新特性: Fibers[3],用于实现轻量级协程。由于它是一个非常底层的 API ,并不是直接可以使用于应用层,更多的,要使用Fibers,只需把php版本更新到8.1即可。

<?php
declare(ticks=1);//Zend引擎每执行1条低级语句就去执行一次 register_tick_function() 注册的函数

class Thread {
  protected static $names = [];
  protected static $fibers = [];
  protected static $params = [];

  public static function register(string|int $name, callable $callback, array $params)
  
{
    self::$names[]  = $name;
    self::$fibers[] = new Fiber($callback);
    self::$params[] = $params;
  }

  public static function run() {
    $output = [];

    while (self::$fibers) {
      foreach (self::$fibers as $i => $fiber) {
          try {
              if (!$fiber->isStarted()) {
                  // Register a new tick function for scheduling this fiber
                  register_tick_function('Thread::scheduler');
                  $fiber->start(...self::$params[$i]);
              } elseif ($fiber->isTerminated()) {
                  $output[self::$names[$i]] = $fiber->getReturn();
                  unset(self::$fibers[$i]);
              } elseif ($fiber->isSuspended()) {
                $fiber->resume();
              }                
          } catch (Throwable $e) {
              $output[self::$names[$i]] = $e;
          }
      }
    }

    return $output;
  }

  public static function scheduler () {
    if(Fiber::getCurrent() === null) {
      return;
    }
    // running Fiber::suspend() in this if condition will prevent an infinite loop!
    if(count(self::$fibers) > 1)
    {
      Fiber::suspend();
    }
  }
}


$start = microtime(true);
function curl($url)
{
    $ch = curl_init();
    curl_setopt($ch, CURLOPT_URL, $url);
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
    curl_setopt($ch, CURLOPT_HEADER, 0);
    $output = curl_exec($ch);
    curl_close($ch);
    return $output;
}
function callback($i){
    $url = 'https://www.baidu.com/?i=';
    return curl($url.$i);
};
// registering 6 Threads (A, B, C, D, E, and F)
foreach(range(1, 5) as $id) {
  Thread::register($id, function($id,$start){
    $result = callback($id);
    return microtime(true) - $start;//放回用时。
  }, [$id,$start]);
}

// run threads and wait until execution finishes
$outputs = Thread::run();


print_r($outputs);

以上代码通过declare(ticks=1)来实现Fibers协程的切换。通过register_tick_function函数来让程序没运行一行就执行Fiber::suspend(),这样就可以达到协程切换的目的,最后$outputs打印的时每个协程的耗时。也因为频繁suspend,耗时比较长。但我们异步编程的目录已达到了。

图片


Amphp

前面的例子只是使用Fiber完成了一个最简单但不太合理的协程。实际上 Fiber 扩展进入内核后,它是一个非常底层的 API ,并不是直接用于应用层的。而Amphp[4]利用了Fibers的特殊,完成一个完整的协程框架,里面包含了一大堆组件,可以根据需要composer对应的组件就行。
安装:

composer require amphp/amp

Amphp的使用和Reactphp很像,使用方法大概如下:

<?php

use Amp\Future;

$httpClient = HttpClientBuilder::buildDefault();
$uris = [
    "google" => "https://www.google.com",
    "news"   => "https://news.google.com",
    "bing"   => "https://www.bing.com",
    "yahoo"  => "https://www.yahoo.com",
];

try {
    $responses = Future\await(array_map(function ($uri) use ($httpClient) {
        return Amp\async(fn () => $httpClient->request(new Request($uri, 'HEAD')));
    }, $uris));

    foreach ($responses as $key => $response) {
        printf(
            "%s | HTTP/%s %d %s\n",
            $key,
            $response->getProtocolVersion(),
            $response->getStatus(),
            $response->getReason()
        );
    }
} catch (Exception $e) {
    // If any one of the requests fails the combo will fail
    echo $e->getMessage(), "\n";
}

Swoole

我们最后介绍Swoole,swoole目前应该是php中异步编程的首选框架了,在国内有很高的知名度,甚至在有些php招聘中要求必会swoole。

use Swoole\Coroutine\Channel;
use function Swoole\Coroutine\go;
use function Swoole\Coroutine\run;
run(function() {
    $chan = new Channel(5);

    go(function () use ($chan) {
        $cli = new Swoole\Coroutine\Http\Client('www.qq.com', 80);
        $ret = $cli->get('/');
        $chan->push(['key' => 'www.qq.com', 'content' => '访问www.qq.com成功!']);
    });

    go(function () use ($chan) {
        $cli = new Swoole\Coroutine\Http\Client('www.163.com', 80);
        $ret = $cli->get('/');
        $chan->push(['key' => 'www.163.com', 'content' => '访问www.qq.com成功!']);
    });

    for ($i = 0; $i < 2; $i++) {
        $result = $chan->pop();
        var_dump($result);
    }
});

上面的代码执行结果如下:

图片