什么是协程
协程可以简单理解为线程,只不过这个线程是用户态的,不需要操作系统参与,创建销毁和切换的成本非常低,和线程不同的是协程没法利用多核 CPU 的,想利用多核 CPU 需要依赖 Swoole 的多进程模型,可以通过Swoole\Process[1]来实现多进程运行。那么协程如何调度呢?
首先每个协程可以简单的理解为一个线程,大家知道多线程是为了提高程序的并发,同样的多协程也是为了提高并发。用户的每个请求都会创建一个协程,请求结束后协程结束,如果同时有成千上万的并发请求,某一时刻某个进程内部会存在成千上万的协程,那么 CPU 资源是有限的,到底执行哪个协程的代码?
• 首先,在执行某个协程代码的过程中发现这行代码遇到了 Co::sleep() 或者产生了网络 IO,例如 MySQL->query(),这肯定是一个耗时的过程,Swoole 就会把这个 MySQL 连接的 Fd 放到 EventLoop[2] 中。
• 然后让出这个协程的 CPU 给其他协程使用:即 yield(挂起)
• 等待 MySQL 数据返回后再继续执行这个协程:即 resume(恢复)
• 其次,如果协程的代码有 CPU 密集型代码,可以开启 enable_preemptive_scheduler,Swoole 会强行让这个协程让出 CPU。
ini_set('enable_preemptive_scheduler','On')
Swoole之前的半自动化协程
为了更好的理解协程,我们先看一段原生PHP代码
<?php
$timeStart = microtime(true);
function task1(){
for ($i=0;$i<=300;$i++){
//写入文件,大概要3000微秒
usleep(3000);
echo "写入文件{$i}\n";
}
}
function task2(){
for ($i=0;$i<=500;$i++){
//发送邮件给500名会员,大概3000微秒
usleep(3000);
echo "发送邮件{$i}\n";
}
}
function task3(){
for ($i=0;$i<=100;$i++){
//模拟插入100条数据,大概3000微秒
usleep(3000);
echo "插入数据{$i}\n";
}
}
task1();
task2();
task3();
echo '总用时:'.microtime(true)-$timeStart."\n";
在这个代码中,我们主要做了3件事:写入文件,发送邮件,以及插入数据,我们可以看到执行都是顺序运行的。
协程
,先写入文件,再发送邮件,再插入数据。
如果我们通过PHP生成器实现协程,代码大概是这样子。
<?php
$timeStart = microtime(true);
function task1(){
for ($i=0;$i<=30;$i++){
//写入文件,大概要3000微秒
usleep(3000);
yield "写入文件{$i}\n";
}
}
function task2(){
for ($i=0;$i<=50;$i++){
//发送邮件给500名会员,大概3000微秒
usleep(3000);
yield "发送邮件{$i}\n";
}
}
function task3(){
for ($i=0;$i<=10;$i++){
//模拟插入100条数据,大概3000微秒
usleep(3000);
yield "插入数据{$i}\n";
}
}
class Task {
public $i_task_id;
public $g_coroutine;
public $m_send_value;
public $b_is_first_yield = true;
public function __construct( $i_task_id, Generator $g_coroutine ) {
$this->g_coroutine = $g_coroutine;
$this->i_task_id = $i_task_id;
}
public function set_send_value( $m_send_value ) {
$this->m_send_value = $m_send_value;
}
public function get_task_id() {
return $this->i_task_id;
}
public function run() {
// 如果是第一次执行yield
// 第一个yield的值要用current方法返回
if ( true === $this->b_is_first_yield ) {
$this->b_is_first_yield = false;
return $this->g_coroutine->current();
}
// 只要不是第一次yield,剩下的值都用send双向通道里获取到
else {
$m_yield_ret = $this->g_coroutine->send( $this->m_send_value );
$this->m_send_value = null;
return $m_yield_ret;
}
}
// 注意这个方法的内在逻辑是这样的
// 如果说当前的coroutine是可用的,那么就表示「还没有结束」
// 如果说当前的coroutine是不可用的,那么就表示「已经结束了」
// 所以,前面要取反,加上!
public function is_finish() {
return !$this->g_coroutine->valid();
}
}
class Scheduler {
public $i_current_task_id = 0; // 任务管理器当前最大的任务id
public $a_task_map = array();
// 创建一个新的调度器,就是初始化一个array用来存储task对象
public function __construct() {
$this->a_task_map = array();
}
public function new_task( Generator $g_coroutine ) {
$i_task_id = $this->i_current_task_id++;
$o_task = new Task( $i_task_id, $g_coroutine );
$this->a_task_map[ $i_task_id ] = $o_task;
return $i_task_id;
}
public function run() {
while ( count( $this->a_task_map ) > 0 ) {
$o_task = array_shift( $this->a_task_map );
$result = $o_task->run();
echo $result;
if ( $o_task->is_finish() ) {
unset( $this->a_task_map[ $o_task->get_task_id() ] );
}
else {
array_push( $this->a_task_map, $o_task );
}
}
}
}
$scheduler = new Scheduler();
$scheduler->new_task( task1() );
$scheduler->new_task( task2() );
$scheduler->new_task( task3() );
$scheduler->run();
echo '总用时:'.microtime(true)-$timeStart."\n";
运行结果:
协程
但是我们发现整个代码虽然协程化了,但执行速度并没有提升。而swoole的协程可以直接提升执行效率
<?php
use Swoole\Coroutine;
use function Swoole\Coroutine\run;
$timeStart = microtime(true);
function task1(){
for ($i=0;$i<=10;$i++){
//写入文件,大概要3000微秒
usleep(3000);
echo "写入文件{$i}\n";
}
}
function task2(){
for ($i=0;$i<=50;$i++){
//发送邮件给500名会员,大概3000微秒
usleep(3000);
echo "发送邮件{$i}\n";
}
}
function task3(){
for ($i=0;$i<=10;$i++){
//模拟插入100条数据,大概3000微秒
usleep(3000);
echo "插入数据{$i}\n";
}
}
run(function (){
Coroutine::create(function (){
task1();
});
Coroutine::create(function (){
task2();
});
Coroutine::create(function (){
task3();
});
});
echo '总用时:'.microtime(true)-$timeStart."\n";
copy上面代码去运行一下,你就会感受到效率的大幅提升。现在你可以感受到为什么要Swoole了,因为如果要原生PHP,我们需要编写任务类,也要编写调度器Scheduler,非常麻烦,在swoole中,直接用run包装一下就是,其实,run内部也运行了一个调度器Coroutine\Scheduler
function run(callable $fn, ...$args)
{
$s = new Scheduler();
$options = Coroutine::getOptions();
if (!isset($options['hook_flags'])) {
$s->set(['hook_flags' => SWOOLE_HOOK_ALL]);
}
$s->add($fn, ...$args);
return $s->start();
}
,只不过官方称这个调度器为协程容器。
协程容器
在Swoole中,所有的协程必须在协程容器里面创建才行,Swoole 程序启动的时候大部分情况会自动创建协程容器,** Swoole 启动协程容器的方式一共有四种**:
• 1、我们之前讲Swoole服务端的时候,Swoole\Server及其子类,在start后,会在事件回调中默认自动创建协程容器,除非我们手动设置了enable_coroutine属性为false,正常情况下我们不会这么做,但如果要如果要自己手动实现更细粒度的协程的话,可以通过go方法去创建协程,如:
$server = new Swoole\Http\Server("127.0.0.1", 9501);
$server->set([
//关闭内置协程
'enable_coroutine' => false,
]);
$server->on("request", function ($request, $response) {
if ($request->server['request_uri'] == '/coro') {
//关闭内置协程后,自己去实现协程
go(function () use ($response) {
co::sleep(0.2);
$response->header("Content-Type", "text/plain");
$response->end("Hello World\n");
});
} else {
$response->header("Content-Type", "text/plain");
$response->end("Hello World\n");
}
});
$server->start();
• 2、 Swoole 提供的 2 个进程管理模块 Process 和 Process\Pool,当构造方法里面设置了enable_coroutine为true,启动方式会在进程启动的时候创建协程容器,如:
for ($n = 1; $n <= 3; $n++) {
$process = new Process(function () use ($n) {
echo 'Child #' . getmypid() . " start and sleep {$n}s" . PHP_EOL;
sleep($n);
echo 'Child #' . getmypid() . ' exit' . PHP_EOL;
},false,SOCK_DGRAM,true);//enable_coroutine为true
$process->start();
}
• 3、我们常用的直接裸写协程的方式启动程序,使用run方法,如:
use Swoole\Coroutine\Http\Server;
use function Swoole\Coroutine\run;
run(function () {
$server = new Server('127.0.0.1', 9502, false);
$server->handle('/', function ($request, $response) {
$response->end("<h1>Index</h1>");
});
$server->handle('/test', function ($request, $response) {
$response->end("<h1>Test</h1>");
});
$server->handle('/stop', function ($request, $response) use ($server) {
$response->end("<h1>Stop</h1>");
$server->shutdown();
});
$server->start();
});
echo 1;//得不到执行直到run里面的任务结束
注意:不可以嵌套 Coroutine\run()。如果嵌套,将得到:PHP Warning: swoole\Coroutine\Scheduler::start(): eventLoop has already been created
Coroutine\run() 里面的逻辑如果有未处理的事件在 Coroutine\run() 之后就进行 EventLoop了,后面的代码将得不到执行,反之,如果没有事件了将继续向下执行,可以再次 Coroutine\run()。
• 4、run方法只是一个包装,可以通过new一个Coroutine\Scheduler来实现更细粒度的协程容器配置,如
use Swoole\Coroutine;
$scheduler = new Coroutine\Scheduler;
$scheduler->add(function ($a, $b) {
Coroutine::sleep(5);
echo assert($a == 'hello') . PHP_EOL;
echo assert($b == 12345) . PHP_EOL;
echo "Done.\n";
}, "hello", 12345);
$scheduler->add(function (){
Coroutine::sleep(1);
echo 'i just sleep 1'.PHP_EOL;
});
$scheduler->start();
Scheduler里面添加的协程函数一般不会立即执行,需要等待start后才会开始执行。
协程
协程的创建
在协程容器中,可以通过以下方法创建一个新协程。
1、Swoole\Coroutine::create(callable$function,...$args):int|false
2、go(callable$function,...$args):int|false// 参考php.ini的use_shortname配置
实际上go就是Coroutine::create的缩写
function go(callable $fn, ...$args)
{
return Coroutine::create($fn, ...$args);
}
代码:
run(function (){
$cid = go(function (){
echo 'aaaa'.PHP_EOL;
});
echo $cid.PHP_EOL;
});
echo microtime(true) - $start;
协程的执行顺序
使用go嵌套创建协程时,子协程会优先执行,子协程执行完毕或挂起时,将重新回到父协程向下执行代码,如果子协程挂起后,父协程退出,不影响子协程的执行
use Swoole\Coroutine;
use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;
$start = microtime(true);
run(function() {
go(function () {
// Coroutine::sleep(2.0);
go(function () {
// Coroutine::sleep(3.0);
echo "co[3] end\n";
});
echo "co[2] end\n";
});
// Coroutine::sleep(1.0);
echo "co[1] end\n";
});
上面的代码,co[3]比co[1]先执行,上面的代码将输出
co[3] end
co[2] end
co[1] end
如果注释掉Coroutine::sleep,根据挂起的时间不同,将输出
co[1] end
co[2] end
co[3] end
协程的开销
我们先跑一段php代码
<?php
$start_mem = memory_get_usage();
$start_time = microtime(true);
for($i=0;$i<10000;$i++){
file_put_contents('defer.log',$i.PHP_EOL,FILE_APPEND);
}
$end_mem = memory_get_usage();
echo "use time".microtime(true)-$start_time.", use mem : ".( $end_mem - $start_mem ) / 1024 .'kbytes'.PHP_EOL;
结果如下:
内存的使用
再跑一段swoole代码
<?php
use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;
$start_mem = memory_get_usage();
$start_time = microtime(true);
run(function (){
go(function (){
for($i=0;$i<1000;$i++){
file_put_contents('defer.log',$i.PHP_EOL,FILE_APPEND);
}
});
go(function (){
for($i=1000;$i<5000;$i++){
file_put_contents('defer.log',$i.PHP_EOL,FILE_APPEND);
}
});
go(function (){
for($i=5000;$i<10000;$i++){
file_put_contents('defer.log',$i.PHP_EOL,FILE_APPEND);
}
});
});
$end_mem = memory_get_usage();
echo "use time".microtime(true)-$start_time.", use mem : ".( $end_mem - $start_mem ) / 1024 .'kbytes'.PHP_EOL;
结果
内存增加了
说明用上swoole后,速度快了,但内存开销也增加了。
每个协程都是相互独立的,需要创建单独的内存空间 (栈内存),在 PHP-7.2 版本中底层会分配 8K 的 stack 来存储协程的变量,zval 的尺寸为 16字节,因此 8K 的 stack 最大可以保存 512 个变量。协程栈内存占用超过 8K 后 ZendVM 会自动扩容。协程退出时会释放申请的 stack 内存。
资源的释放
当我们通过Swoole进行Mysql、redis操作之后,协程结束之后要及时关闭释放资源。不然久而久之会吧内存耗尽。Swoole的defer 方法用于资源的释放,会在协程关闭之前 (即协程函数执行完毕时) 进行调用,就算抛出了异常,已注册的 defer 也会被执行。
go(function () {
defer(function () use ($db) {
$db->close();
});
});
当然,当我们使用Swoole内置的连接池时,Swoole已经帮我们做好了这部分的工作了,当我们需要自定义自己的连接池时需要及时释放资源。
协程Id和协程父Id
1、Swoole使用getCid()获取当前协程的唯一 ID, 它的别名为 getuid, 是一个进程内唯一的正整数。当代码不在协程环境中会返回-1。
2、Swoole使用getPcid来获取当前协程的父Id,getPcid也可以接收一个cid参数来获取指定协程的父Id
<?php
use Swoole\Coroutine;
use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;
$start = microtime(true);
run(function() {
go(function () {
go(function () {
echo Coroutine::getCid().PHP_EOL;
echo Coroutine::getPcid().PHP_EOL;
echo "co[3] end\n";
});
echo Coroutine::getCid().PHP_EOL;
echo Coroutine::getPcid(3).PHP_EOL;//返回null,因为协程3已经结束
echo Coroutine::getPcid(2).PHP_EOL;//返回1
echo "co[2] end\n";
});
echo Coroutine::getCid().PHP_EOL;
echo "co[1] end\n";
});
echo Coroutine::getCid().PHP_EOL;
以上代码输出
判断指定协程是否存在
Swoole使用exists来判断指定的协程是否存在。不入cid参数时默认指定当前协程。
<?php
use Swoole\Coroutine;
use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;
run(function () {
go(function () {
go(function () {
Coroutine::sleep(0.001);
$pid = Coroutine::getPcid();
echo "父Id:{$pid},当前Id:".Coroutine::getCid().PHP_EOL;
var_dump(Coroutine::exists($pid)); // 1:第一次输出true,因为父协程2还存在
});
go(function () {
Coroutine::sleep(0.003);
$pid = Coroutine::getPcid();
echo "父Id:{$pid},当前Id:".Coroutine::getCid().PHP_EOL;
var_dump(Coroutine::exists($pid)); // 4:第四次输出false,这时父协程2已经走完了,结束了
});
Coroutine::sleep(0.002);
$pid = Coroutine::getPcid();
echo $pid.PHP_EOL;
var_dump(Coroutine::exists($pid)); // 2: 第二次输出false,父协程1也就是run函数已经走完了。
var_dump(Coroutine::exists(4));// 3:第三次输出true,这时协程Id4还是存在的
});
});
以上代码将输出
协程id
协程上下文对象
我们先测试一段代码
<?php
use Swoole\Coroutine;
$http = new Swoole\Http\Server('0.0.0.0', 9501);
$_POST = [];
$http->on('Request', function (Swoole\Http\Request $request, Swoole\Http\Response $response) {
global $_POST;
$uri = $request->server['request_uri'];
$_POST['var'] = $request->post['hello'];
if($uri == '/a'){
Coroutine::sleep(5);//这时可能在请求数据库
echo "/a 收到请求:参数是:".$_POST['var']."\n";
}
elseif($uri == '/b'){
echo "/b 收到请求:参数是:".$_POST['var']."\n";
}else{
}
$response->end("请求完成");
});
$http->set([
'worker_num'=>1 //只用一个进程就好,方便测试
]);
$http->start();
启动服务之后,我们先访问/a
curl http://127.0.0.1:9501/a -X POST -d "hello=world"
然后再立即访问/b
curl http://127.0.0.1:9501/b -X POST -d "hello=world2"
得到的结果可能跟你们想的不一样。
协程上下文
两个请求都收到了world2,这就是协程间的变量污染,于是有了getContext。
使用getContext可以获取当前或指定协程Id的上下文对象Context。Context有如下特点
• 协程退出后上下文自动清理 (如无其它协程或全局变量引用)
• 无 defer 注册和调用的开销 (无需注册清理方法,无需调用函数清理)
• 无 PHP 数组实现的上下文的哈希计算开销 (在协程数量巨大时有一定好处)
• Co\Context 使用 ArrayObject, 满足各种存储需求 (既是对象,也可以以数组方式操作)
在Server开发中,通常会在一个时间周期内处理多个协程(或直接理解为请求)的代码,也就意味着如果使用了全局变量来储存状态可能会被多个协程所使用,也就是说不同的请求之间可能会混淆数据,这里的全局变量指的是 $_GET/$_POST/$_REQUEST/$_SESSION/$_COOKIE/$_SERVER等$_开头的变量、global 变量,以及 static 静态属性。但是在协程间的切换是隐式发生的,所以在协程切换的前后不能保证全局变量以及 static 变量的一致性。所以在 Swoole 内,无法 通过 $_GET/$_POST/$_REQUEST/$_SESSION/$_COOKIE/$_SERVER 等以 $_开头的变量获取到任何属性参数。但是,通过getContext获取到Context是相互独立的,所以我们可以将原来$_开头的变量存储到Context中,以保证这些变量不会别别的协程污染
在很多Swoole框架中,一般都会吧Request对象和Response对象保存到基于协程Id来做隔离的Context中管理。
class Context
{
protected static $nonCoContext = [];
public static function set(string $id, $value)
{
if (Context::inCoroutine()) {
Coroutine::getContext()[$id] = $value;
} else {
static::$nonCoContext[$id] = $value;
}
return $value;
}
public static function get(string $id, $default = null, $coroutineId = null)
{
if (Context::inCoroutine()) {
if ($coroutineId !== null) {
return Coroutine::getContext($coroutineId)[$id] ?? $default;
}
return Coroutine::getContext()[$id] ?? $default;
}
return static::$nonCoContext[$id] ?? $default;
}
public static function has(string $id, $coroutineId = null)
{
if (Context::inCoroutine()) {
if ($coroutineId !== null) {
return isset(Coroutine::getContext($coroutineId)[$id]);
}
return isset(Coroutine::getContext()[$id]);
}
return isset(static::$nonCoContext[$id]);
}
/**
* Release the context when you are not in coroutine environment.
*/
public static function destroy(string $id)
{
unset(static::$nonCoContext[$id]);
}
/**
* Retrieve the value and override it by closure.
*/
public static function override(string $id, \Closure $closure)
{
$value = null;
if (self::has($id)) {
$value = self::get($id);
}
$value = $closure($value);
self::set($id, $value);
return $value;
}
public static function inCoroutine(): bool
{
return Coroutine::getCid() > 0;
}
}
协程的调度
每个协程可以简单的理解为一个线程,多线程是为了提高程序的并发,同样的多协程也是为了提高并发。用户的每个请求都会创建一个协程,请求结束后协程结束,如果同时有成千上万的并发请求,某一时刻某个进程内部会存在成千上万的协程,那么 CPU 资源是有限的,到底执行哪个协程的代码?
决定到底让 CPU 执行哪个协程的代码的决断过程就是协程调度,Swoole 的调度策略又是怎么样的呢?
• 1,在执行某个协程代码的过程中发现这行代码遇到了 Co::sleep() 或者产生了网络 IO,例如 MySQL->query(),这肯定是一个耗时的过程,Swoole 就会把这个 MySQL 连接的 Fd 放到 EventLoop[3] 中。然后然后让出这个协程的 CPU 给其他协程使用,
• 2,如果协程的代码有 CPU 密集型代码,并且开启了 enable_preemptive_scheduler,Swoole 会强行让这个协程让出 CPU。
• 3,如果网络IO结束或者Mysql查询结束,调度程序就会恢复原来被挂起的协程继续执行。
• 4、如果协程内遇到手动挂起的代码(即yield),调度程序也会让出CPU
yield方法手动让出当前协程的执行权。而不是基于 IO 的协程调度,此方法拥有另外一个别名:Coroutine::suspend()
$cid = go(function () {
echo "co 1 start\n";
Co::yield();
echo "co 1 end\n";
});
go(function () use ($cid) {
echo "co 2 start\n";
Co::sleep(0.5);
Co::resume($cid);
echo "co 2 end\n";
});
Swoole\Event::wait();
resume方法手动恢复某个协程,使其继续运行。
当前协程处于挂起状态时,另外的协程中可以使用 resume 再次唤醒当前协程
获取协程状态
可以使用stats方法获取协程状态,统计协程的当前运行情况
var_dump(Swoole\Coroutine::stats());
array(1) {
["c_stack_size"]=>
int(2097152)
["coroutine_num"]=>
int(132)
["coroutine_peak_num"]=>
int(2)
}
取消协程
cancel方法用于取消某个协程,但不能对当前协程发起取消操作,手动取消正常结束,将返回 true, 如失败将返回 false,协程取消后,可以使用isCanceled方法来判断协程是否被手动取消。
use Swoole\Coroutine;
use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;
run(function () {
$cid = Coroutine::getCid();
var_dump($cid);
go(function () use ($cid) {
Coroutine::sleep(0.005);
$toCancel = go(function (){
Coroutine::sleep(1);
$cid = Coroutine::getCid();
echo $cid.PHP_EOL;
});
go(function () use ($toCancel){
var_dump(Coroutine::cancel($toCancel));//true
var_dump(Coroutine::cancel(1));//false 1已经结束了
});
});
echo "Done\n";
});
协程的并发执行
Swoole使用batch函数并发执行多个协程,并且通过数组,返回这些协程方法的返回值
<?php
use function Swoole\Coroutine\run;
use function Swoole\Coroutine\batch;
run(function () {
$use = microtime(true);
$results = batch([
'gethostbyname' => function () {
return gethostbyname('localhost');
},
'file_get_contents' => function () {
return file_get_contents(__DIR__ . '/defer.log');
},
'sleep' => function () {
sleep(5);
return true;
},
'usleep' => function () {
usleep(10000);
return true;
},
], 0.1);
$use = microtime(true) - $use;
echo "Use {$use}s, Result:\n";
var_dump($results);
});
当不需要返回执行结果时,可以使用parallel方法。
<?php
use Swoole\Coroutine;
use Swoole\Coroutine\System;
use function Swoole\Coroutine\parallel;
$start_time = microtime(true);
Coroutine\run(function () {
$use = microtime(true);
$results = [];
parallel(2, function () use (&$results) {
System::sleep(0.2);
$results[] = System::gethostbyname('localhost');
});
$use = microtime(true) - $use;
echo "Use {$use}s, Result:\n";
var_dump($results);
});
$end_time = microtime(true) - $start_time;
echo "Use {$end_time}s, Done\n";
协程间通讯
协程都是相互独立的,那么他们之间如何通讯呢?使用Coroutine\Channel
Coroutine\Channel,官方称之为通道,用于协程间通讯,支持多生产者协程和多消费者协程。底层自动实现了协程的切换和调度
• 通道与 PHP 的 Array 类似,仅占用内存,没有其他额外的资源申请,所有操作均为内存操作,无 IO 消耗
• 底层使用 PHP 引用计数实现,无内存拷贝。即使是传递巨大字符串或数组也不会产生额外性能消耗
• channel 基于引用计数实现,是零拷贝的
Channel使用方法与 SplQueue 队列类似:
• Channel->push :当队列中有其他协程正在等待 pop 数据时,自动按顺序唤醒一个消费者协程。当队列已满时自动 yield 让出控制权,等待其他协程消费数据
• Channel->pop :当队列为空时自动 yield,等待其他协程生产数据。消费数据后,队列可写入新的数据,自动按顺序唤醒一个生产者协程。
<?php
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use function Swoole\Coroutine\run;
run(function(){
$channel = new Channel(1);
Coroutine::create(function () use ($channel) {
for($i = 0; $i < 10; $i++) {
Coroutine::sleep(1.0);
$channel->push(['rand' => rand(1000, 9999), 'index' => $i]);//在第一个协程中,生产数据
echo "{$i}\n";
}
});
Coroutine::create(function () use ($channel) {
while(1) {
$data = $channel->pop(2.0);//在第二个协程中消费数据,设置为2秒超时,如果2秒内管道内没有数据,就退出
if ($data) {
var_dump($data);
} else {
assert($channel->errCode === SWOOLE_CHANNEL_TIMEOUT);
break;
}
}
});
});
协程间的并发控制
假设我们写了个爬虫,要爬取了淘宝和百度的首页,我们哗啦哗啦写下了如下代码:
<?php
use Swoole\Coroutine;
use Swoole\Coroutine\WaitGroup;
use Swoole\Coroutine\Http\Client;
use function Swoole\Coroutine\run;
function getConent($domain){
//启动一个协程客户端client,请求淘宝首页
$cli = new Client($domain, 443, true);
$cli->setHeaders([
'Host' => $domain,
'User-Agent' => 'Chrome/49.0.2587.3',
'Accept' => 'text/html,application/xhtml+xml,application/xml',
'Accept-Encoding' => 'gzip',
]);
$cli->set(['timeout' => 1]);
$cli->get('/index.php');
$result = $cli->body;
$cli->close();
return $result;
}
run(function () {
$result = [];
Coroutine::create(function () use ( &$result) {
$result['baidu'] = getConent('www.baidu.com');
});
Coroutine::create(function () use ( &$result) {
$result['taobao'] = getConent('www.taobao.com');
});
var_dump($result);
});
初看这段代码,如果你认为$result能打印出内容,那么你的技术还有待加强,实际上result打印出来是空数组。
因为你打印时子协程还在拼命挖矿中。要打印出内容,有2种方法,1、分别在28行和31行打印。2、33行的上面sleep一下,让父协程等待子协程完成但要sleep多久了,睡得太久,那效率太低,睡得太少,那么子协程可能还没挖到矿。这时如果有WaitGroup加持,就可以解决这个问题。
WaitGroup
WaitGroup 是基于 Channel 衍生出来的一个特性,如果接触过 Go 语言,我们都会知道 WaitGroup 这一特性,WaitGroup 的用途是使得主协程一直阻塞等待直到所有相关的子协程都已经完成了任务后再继续运行,这里说到的阻塞等待是仅对于主协程(即当前协程)来说的,并不会阻塞当前进程。有了WaitGroup,上面的代码可以改为这样
<?php
use Swoole\Coroutine;
use Swoole\Coroutine\WaitGroup;
use Swoole\Coroutine\Http\Client;
use Swoole\Coroutine\Channel;
use function Swoole\Coroutine\run;
function getConent($domain){
//启动一个协程客户端client,请求淘宝首页
$cli = new Client($domain, 443, true);
$cli->setHeaders([
'Host' => $domain,
'User-Agent' => 'Chrome/49.0.2587.3',
'Accept' => 'text/html,application/xhtml+xml,application/xml',
'Accept-Encoding' => 'gzip',
]);
$cli->set(['timeout' => 1]);
$cli->get('/index.php');
$result = $cli->body;
$cli->close();
return $result;
}
run(function () {
$wg = new WaitGroup();
$result = [];
$wg->add();
//启动第一个协程
Coroutine::create(function () use ($wg, &$result) {
$result['baidu'] = getConent('www.baidu.com');
$wg->done();
});
$wg->add();
//启动第二个协程
Coroutine::create(function () use ($wg, &$result) {
$result['taobao'] = getConent('www.taobao.com');
$wg->done();
});
//挂起当前协程,等待所有任务完成后恢复
$wg->wait();
//这里 $result 包含了 2 个任务执行结果
var_dump($result);
});
Barrier
在 Swoole 中底层提供了一个更便捷的协程并发管理工具:Coroutine\Barrier 协程屏障,或者叫协程栅栏。基于 PHP 引用计数和 Coroutine API 实现。相比于 Coroutine\WaitGroup[4],Coroutine\Barrier 使用更简单一些,只需通过参数传递或者闭包的 use 语法,引入子协程函数上即可。
<?php
use Swoole\Coroutine;
use Swoole\Coroutine\WaitGroup;
use Swoole\Coroutine\Http\Client;
use Swoole\Coroutine\Barrier;
use function Swoole\Coroutine\run;
function getConent($domain)
{
//启动一个协程客户端client,请求淘宝首页
$cli = new Client($domain, 443, true);
$cli->setHeaders([
'Host' => $domain,
'User-Agent' => 'Chrome/49.0.2587.3',
'Accept' => 'text/html,application/xhtml+xml,application/xml',
'Accept-Encoding' => 'gzip',
]);
$cli->set(['timeout' => 1]);
$cli->get('/index.php');
$result = $cli->body;
$cli->close();
return $result;
}
run(function () {
$barrier = Barrier::make();
$result = [];
//启动第一个协程
Coroutine::create(function () use ($barrier, &$result) {
$result['baidu'] = getConent('www.baidu.com');
});
//启动第二个协程
Coroutine::create(function () use ($barrier, &$result) {
$result['taobao'] = getConent('www.taobao.com');
});
//挂起当前协程,等待所有任务完成后恢复
Barrier::wait($barrier);
//这里 $result 包含了 2 个任务执行结果
var_dump($result);
});
引用链接
[1]
Swoole\Process: https://wiki.swoole.com/#/process/process?id=swooleprocess[2]
EventLoop: https://wiki.swoole.com/#/learn?id=%e4%bb%80%e4%b9%88%e6%98%afeventloop[3]
EventLoop: https://wiki.swoole.com/#/learn?id=%e4%bb%80%e4%b9%88%e6%98%afeventloop[4]
Coroutine\WaitGroup: https://wiki.swoole.com/#/coroutine/wait_group
发表评论 取消回复