在前面的教程里面,我已经学习了Swoole的Server端的应用,本篇学习Swoole在Client端的应用。

在Swoole出现之前,PHP主要用socket来创建TCP/UDP Client,用CURL或者Guzzle类库来创建HTTP Client,
众所周知,php连接mysql一般都是直接用php 的PDO扩展组件来实现客户端的,本篇通过手写Mysql登陆过程来体验socket客户端的开发过程

Mysql协议分析

我们知道,TCP/Ip套接字方式是Mysql数据库在所有平台下都提供的通讯方式,也是使用最多的一种方式,要使用TCP/Ip来连接Mysql,我们首先得知道Mysql的通讯协议。

图片Mysql通讯过程

以上图片显示了mysql的通讯流程,具体流程分析请看MySQL协议分析,这篇博文详细分析了Mysql的数据结构,消息头和消息体结构如下

图片mysql报文

登陆认证的消息体报文如下:

图片握手报文


MySQL客户端与服务器的完整交互过程如下

图片Mysql通讯流程


在分析完报文结构和交互流程后,我们就可以开始着手写代码了。

用原生PHP来实现Mysql客户端

php进行socket编译主要使用socket_create等socket[1]函数和位运算。实现登陆用到了2个数据结构如下:

图片服务器握手包


图片客户端登陆包


具体实现代码如下:

$conf = include_once './conf.php';
$host = $conf['mysql']['HOST'];
$port = $conf['mysql']['PORT'];
$user = $conf['mysql']['USER'];
$db = $conf['mysql']['DATABASE'];
$password = $conf['mysql']['PASS'];

//
//$client->connect($host,$port,$db,$user,$password);

$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if ($socket === false) {
    echo "socket_create() 创建socket失败 reason: " . socket_strerror(socket_last_error()) . "\n";
} else {
    echo "成功创建socket\n";
}

$result = socket_connect($socket, $host, $port);
if ($result === false) {
    echo "连接mysql失败,原因: ($result) " . socket_strerror(socket_last_error($socket)) . "\n";
} else {
    echo "成功连接mysql";

    //开始解析
    $data = socket_read($socket, 8192, PHP_BINARY_READ);
    //解析服务器发来的握手信息
    $greeting_arr = decodeData($data);
//    var_dump($greeting_arr);
    $resp = formatLogin($greeting_arr);
    //开始请求登陆
    socket_write($socket, $resp, mb_strlen($resp, 'ASCII'));//发送登陆认证,

    //获取服务端响应报文,先读取消息头
    $data = socket_read($socket, 8192, PHP_BINARY_READ);
    $byteArr = unpack('C*', $data);//解包

    $type = $byteArr[5];
//    var_dump(sprintf('%02x',$type));
    $type_str = '';
    if ($type == 0x00) {
        $type_str = "OK";//响应命令
    } elseif ($type == 0xff) {
        $type_str = "ERR";
    } elseif ($type == 0xfe) {
        $type_str = "EOF";//EOF 报文表示登陆成功
    } elseif ($type <= 250) {
    }
    echo $type_str;
    socket_close($socket);
    exit;
}


function decodeData($data){

    $byteArr = unpack('C*',$data);//解包

    //获取消息长度
    $package_length  = $byteArr[1] | ($byteArr[2] << 8)  | ($byteArr[3] << 16);//前面3个字节表示长度
    $package_number = $byteArr[4];//消息序号
    //获取报文类型
    $type = $byteArr[5];

    $protocol = $type_str = '';
    if($type == 0x00 ){
        $type_str = "OK";//响应命令
    }elseif ($type == 0xff){
        $type_str = "ERR";
    }elseif($type == 0xfe){
        $type_str = "EOF";
    }elseif($type <= 250 ) {
        $type_str = "DATA";//响应数据
        $protocol = $type;//协议版本号为响应类型
    }

    $nextString = mb_substr($data,4);
    var_dump($nextString);
    //nullpos
    for($nullpos=6;$nullpos<count($byteArr);$nullpos++){
        if($byteArr[$nullpos] == 0x00){
            break;
        }
    }
    $shuzu = array_slice($byteArr,5,($nullpos-5));
    $version = pack('C*',...$shuzu);

    $leftPacket = array_slice($byteArr,$nullpos);
//    var_dump($leftPacket);
    //服务器线程id
    $thread_id = $leftPacket[0] | ($leftPacket[1] << 8)  | ($leftPacket[2] << 16) | ($leftPacket[3] << 24);//前面3个字节表示长度
    //第一个挑战随机数
    $salt1 = pack('C*',...array_slice($leftPacket,4,8));
    $fill = sprintf('%02x',$leftPacket[12]);//一位填充位

    //服务器权能标志
    $server_capability = sprintf('%02x%02x',$leftPacket[13],$leftPacket[14]);
    var_dump($server_capability);
    $charset = sprintf('%02x',$leftPacket[15]);//字符编码
    $status = sprintf('%02x%02x',$leftPacket[16],$leftPacket[17]);//服务器状态
    var_dump($status);
    $server_capability_ext = sprintf('%02x%02x',$leftPacket[18],$leftPacket[19]);//服务器权能标志(高16位)
    $auth_plugin_data_len = $leftPacket[20];//随机挑战数长度
    var_dump($auth_plugin_data_len);
    $fill2 = array_slice($leftPacket,21,10);//填充值
    for($nullpos=31;$nullpos<count($leftPacket);$nullpos++){
        if($leftPacket[$nullpos] == 0x00){
            break;
        }
    }
    $salt2 = array_slice($leftPacket,31,($nullpos-31));//第二部分挑战数
    $salt2 = pack('C*',...$salt2);//第二部分挑战数

    $client_auth_plugin = array_slice($leftPacket,$nullpos);
    $client_auth_plugin =pack('C*',...$client_auth_plugin);//登陆组件名
    var_dump($client_auth_plugin);

    return compact('package_length','package_number','type_str','protocol',
        'version','thread_id','salt1','fill','server_capability','charset',
        'status','server_capability_ext','auth_plugin_data_len','fill2','salt2','client_auth_plugin');
}

//登录
function formatLogin($greeting){
    global $user;
    global $password;
    global $db;
    //客户端配置项 类似bitMap 具体每个值的权限可以参考mysql的文档
    $body['capability'] = bin2hex(pack('v', 0b1010001010001101));
    $body['capability_ext'] = bin2hex(pack('v', 0b0000000000001010));
    $body['message_max_length'] = '000000c0';//最大消息长度
    $body['charset'] = $greeting['charset'];//字符编码
    $body['unused'] = str_repeat('00', 23);//填充数

    //00分隔符
    $body['user_name'] = bin2hex($user) . '00';
    $salt = $greeting['salt1'] . $greeting['salt2'];
    $part1 = sha1($password, true);
    $part2 = sha1($salt . sha1(sha1($password, true), true), true);
    //挑战认证数据,\x14表示后面有20位的密码 如果不传database的话要再结尾加上00表结束
    $body['passwd'] ='14'. bin2hex($part1 ^ $part2);
    $body['database'] = bin2hex($db) . '00';

    $body['client_auth_plugin'] = bin2hex($greeting['client_auth_plugin']);

    $body['connection_attributes'] = '150c' . bin2hex('_client_name') . '07' . bin2hex('mysqlnd');

//    var_dump($body);
    $stream = '';
    foreach ($body as $v) {
        $stream .= $v;
    }
    //消息体封包
    $stream = _packageBody($stream, 1);
    return _hex2ascii($stream);
}

//16进制转换成ascii  反过来是bin2hex
function _hex2ascii($hex_str) {
    $send_msg = "";
    foreach (str_split($hex_str, 2) as $v) {
        $send_msg .= chr(hexdec($v));
    }
    return $send_msg;
}

/*----------  格式化发送的数据  ----------*/
/**
 * 给消息体封装包头信息 (body数据长度数据(小端序3字节).第几个包(小端序1字节).消息体)
 * @param $hex_body
 * @param int $package_number
 * @return string
 */
function _packageBody($hex_body, $package_number = 0) {
    $byte_len = mb_strlen($hex_body, 'ASCII') / 2;//两个16进制位=1个字节
    //转换成24bit 小端字节序,3个字节
    $package_length = substr(bin2hex(pack('V', $byte_len)), 0, 6);
    //转换成8bit 小端字节序
    $package_number = bin2hex(pack('h', $package_number));
    return $package_length . $package_number . $hex_body;
}

上面的代码只是简单的实现了一个mysql登陆过程,距离完成整个mysql客户端还差了99.9%。你还要实现增删改查,你要实现事务功能,日志功能、读写锁等等,很复杂,但Swoole团队实现了

用Swoole来实现Mysql客户端

Swoole团队实现了一个协程 MySQL 客户端。使用方法如下:

use Swoole\Coroutine\MySQL;
use function Swoole\Coroutine\run;

run(function () {
    $swoole_mysql = new MySQL();
    $swoole_mysql->connect([
        'host'     => '127.0.0.1',
        'port'     => 3306,
        'user'     => 'user',
        'password' => 'pass',
        'database' => 'test',
    ]);
    $res = $swoole_mysql->query('select sleep(1)');
    var_dump($res);
});

具体使用文档:https://wiki.swoole.com/#/coroutine_client/mysql

一键协程化

起初,为了解决这些客户端的协程支持问题 Swoole 开发组做了大量的工作:比如完成了mysql客户端,redis客户端,针对每种类型的客户端都完成对应的适配。但慢慢的,swoole发现了这样做有一些问题:

  • • 实现复杂,团队需要针对每个客户端都需要很了解,想要完成完美的支持,那需要投入巨大的工作量。

  • • 增加了用户的技术负担,比如用户都习惯了PHP原生的PDO,那么现在需要用陌生的 Swoole\Coroutine\MySQL[2] 的方法,增加了学习成本。

  • • 很难覆盖到所有的操作,比如 proc_open()、sleep() 函数等等也可能阻塞住导致程序变成同步阻塞的

  • • phper一般都会安装好了php_pdo,phpredis等扩展,没有好好利用真的很可惜。

针对上述问题,Swoole 开发组换了实现思路,采用 Hook 原生 PHP 函数的方式实现协程客户端,通过一行代码就可以让原来的同步 IO 的代码变成可以协程调度[3]异步 IO[4],即一键协程化
实现方式如下:

Co::set(['hook_flags' => SWOOLE_HOOK_ALL]); //不包括CURL
Co::set(['hook_flags' => SWOOLE_HOOK_ALL | SWOOLE_HOOK_CURL]); //真正的协程化所有类型,包括CURL

通过一行代码,就能把原来的同步Io变成了异步IO,文档地址:https://wiki.swoole.com/#/runtime
有了这个特性,在hyperf框架的StartServer这个Command中,通过一句Coroutine::set(['hook_flags' => swoole_hook_flags()]);来实现了整个框架的协程化

    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $this->checkEnvironment($output);

        $serverFactory = $this->container->get(ServerFactory::class)
            ->setEventDispatcher($this->container->get(EventDispatcherInterface::class))
            ->setLogger($this->container->get(StdoutLoggerInterface::class));

        $serverConfig = $this->container->get(ConfigInterface::class)->get('server', []);
        if (! $serverConfig) {
            throw new InvalidArgumentException('At least one server should be defined.');
        }

        $serverFactory->configure($serverConfig);

        Coroutine::set(['hook_flags' => swoole_hook_flags()]);

        $serverFactory->start();

        return 0;
    }


TCP/UDP客户端

Coroutine\Client 提供了 TCP、UDP、unixSocket[5] 传输协议的 Socket 客户端[6]封装代码,使用时仅需 new Swoole\Coroutine\Client 即可。
使用示例:

use Swoole\Coroutine\Client;
use function Swoole\Coroutine\run;

run(function () {
    $client = new Client(SWOOLE_SOCK_TCP);
    if (!$client->connect('127.0.0.1', 9501, 0.5))
    {
        echo "connect failed. Error: {$client->errCode}\n";
    }
    $client->send("hello world\n");
    echo $client->recv();
    $client->close();
});

协程客户端也支持长度和 EOF 协议处理,支持通过协议解析来处理TCP 数据包边界问题[7]
结束符检测

$client->set(array(
    'open_eof_check' => true,
    'package_eof' => "\r\n\r\n",
    'package_max_length' => 1024 * 1024 * 2,
));

长度检测

$client->set(array(
    'open_length_check' => true,
    'package_length_type' => 'N',
    'package_length_offset' => 0, //第N个字节是包长度的值
    'package_body_offset' => 4, //第几个字节开始计算长度
    'package_max_length' => 2000000, //协议最大长度
));

启用mqtt协议

$client->set(array(
    'open_mqtt_protocol' => true,
));


Socket客户端来

Swoole\Coroutine\Socket 模块相比协程风格服务端和协程客户端相关模块 Socket 可以实现更细粒度的一些 IO 操作。
示例:

use Swoole\Coroutine;
use function Swoole\Coroutine\run;

run(function () {
    $socket = new Coroutine\Socket(AF_INET, SOCK_STREAM, 0);

    $retval = $socket->connect('127.0.0.1', 9601);
    while ($retval)
    {
        $n = $socket->send('hello');
        var_dump($n);

        $data = $socket->recv();
        var_dump($data);

        //发生错误或对端关闭连接,本端也需要关闭
        if ($data === '' || $data === false) {
            echo "errCode: {$socket->errCode}\n";
            $socket->close();
            break;
        }

        Coroutine::sleep(1.0);
    }

    var_dump($retval, $socket->errCode, $socket->errMsg);
});

Coroutine\Socket 模块提供的 IO 操作接口均为同步编程风格,底层自动使用协程调度器实现异步 IO。
Socket是比较底层了一个类了,前面的TCP/UDP客户端都是Socket的封装,它们都内置了一个socket属性成员。

FastCGI 客户端

Swoole应用发布时一般都是通过nginx做方向代理来实现服务的发布。那么有没有可能不用做反向代理直接运行呢,答案是可以的。
我们知道php自带的php-fpm启动后,会创建一个master进程,监听9000端口(可配置),master进程又会根据fpm.conf/www.conf去创建若干子进程,子进程用于处理实际的业务。当有客户端来连接9000端口时,空闲子进程会自己去accept,如果子进程全部处于忙碌状态,新进的待accept的连接会被master放进队列里,等待fpm子进程空闲;[8]
我们看nginx的配置文件,一般都会有如下配置:

    location ~ [^/]\.php(/|$)
    {
        try_files $uri =404;
        fastcgi_pass  unix:/tmp/php-cgi.sock;
        fastcgi_index index.php;
        include fastcgi.conf;
        include pathinfo.conf;
    }

在看php-fpm.conf的配置可以知道,unix:/tmp/php-cgi.sock;其实就是127.0.0.1:9000 的unixsocket地址,说明nginx把php文件的请求转发给我php-fpm去处理了,而php-fpm收到请求就执行原生php运算了,swoole利用Socket客户端类对接了127.0.0.1:9000,接管了php原生部分的工作,从而实现了一个FastCGI 客户端,通过 FastCGI 客户端,可以直接与 PHP-FPM 服务进行交互而无需通过任何 HTTP 反向代理。
FastCGI客户端使用示例如下:

try {
    $client = new \Swoole\Coroutine\FastCGI\Client('127.0.0.1', 9000);
    $request = (new \Swoole\FastCGI\HttpRequest())
        ->withDocumentRoot(__DIR__)
        ->withScriptFilename(__DIR__ . '/var.php')
        ->withScriptName('var.php')
        ->withMethod('POST')
        ->withUri('/var?foo=bar&bar=char')
        ->withHeader('X-Foo', 'bar')
        ->withHeader('X-Bar', 'char')
        ->withBody(['foo' => 'bar', 'bar' => 'char']);
    $response = $client->execute($request);
    echo "Result: \n{$response->getBody()}";
} catch (\Swoole\Coroutine\FastCGI\Client\Exception $exception) {
    echo "Error: {$exception->getMessage()}\n";
}

引用链接

[1] socket: https://www.php.net/manual/en/ref.sockets.php
[2] Swoole\Coroutine\MySQL: https://wiki.swoole.com/#/coroutine_client/mysql
[3] 协程调度: https://wiki.swoole.com/#/coroutine?id=%e5%8d%8f%e7%a8%8b%e8%b0%83%e5%ba%a6
[4] 异步 IO: https://wiki.swoole.com/#/learn?id=%e5%90%8c%e6%ad%a5io%e5%bc%82%e6%ad%a5io
[5] unixSocket: https://wiki.swoole.com/#/learn?id=%e4%bb%80%e4%b9%88%e6%98%afipc
[6] Socket 客户端: https://wiki.swoole.com/#/coroutine_client/socket
[7] TCP 数据包边界问题: https://wiki.swoole.com/#/learn?id=tcp%e6%95%b0%e6%8d%ae%e5%8c%85%e8%be%b9%e7%95%8c%e9%97%ae%e9%a2%98
[8] www.conf去创建若干子进程,子进程用于处理实际的业务。当有客户端来连接9000端口时,空闲子进程会自己去accept,如果子进程全部处于忙碌状态,新进的待accept的连接会被master放进队列里,等待fpm子进程空闲;: http://www.conf去创建若干子进程,子进程用于处理实际的业务。当有客户端来连接9000端口时,空闲子进程会自己去accept,如果子进程全部处于忙碌状态,新进的待accept的连接会被master放进队列里,等待fpm子进程空闲;