在前面的教程里面,我已经学习了Swoole的Server端的应用,本篇学习Swoole在Client端的应用。
在Swoole出现之前,PHP主要用socket来创建TCP/UDP Client,用CURL或者Guzzle类库来创建HTTP Client,
众所周知,php连接mysql一般都是直接用php 的PDO扩展组件来实现客户端的,本篇通过手写Mysql登陆过程来体验socket客户端的开发过程
Mysql协议分析
我们知道,TCP/Ip套接字方式是Mysql数据库在所有平台下都提供的通讯方式,也是使用最多的一种方式,要使用TCP/Ip来连接Mysql,我们首先得知道Mysql的通讯协议。
Mysql通讯过程
以上图片显示了mysql的通讯流程,具体流程分析请看MySQL协议分析,这篇博文详细分析了Mysql的数据结构,消息头和消息体结构如下
mysql报文
登陆认证的消息体报文如下:
握手报文
MySQL客户端与服务器的完整交互过程如下:
Mysql通讯流程
在分析完报文结构和交互流程后,我们就可以开始着手写代码了。
用原生PHP来实现Mysql客户端
php进行socket编译主要使用socket_create等socket[1]函数和位运算。实现登陆用到了2个数据结构如下:
服务器握手包
客户端登陆包
具体实现代码如下:
$conf = include_once './conf.php';
$host = $conf['mysql']['HOST'];
$port = $conf['mysql']['PORT'];
$user = $conf['mysql']['USER'];
$db = $conf['mysql']['DATABASE'];
$password = $conf['mysql']['PASS'];
//
//$client->connect($host,$port,$db,$user,$password);
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if ($socket === false) {
echo "socket_create() 创建socket失败 reason: " . socket_strerror(socket_last_error()) . "\n";
} else {
echo "成功创建socket\n";
}
$result = socket_connect($socket, $host, $port);
if ($result === false) {
echo "连接mysql失败,原因: ($result) " . socket_strerror(socket_last_error($socket)) . "\n";
} else {
echo "成功连接mysql";
//开始解析
$data = socket_read($socket, 8192, PHP_BINARY_READ);
//解析服务器发来的握手信息
$greeting_arr = decodeData($data);
// var_dump($greeting_arr);
$resp = formatLogin($greeting_arr);
//开始请求登陆
socket_write($socket, $resp, mb_strlen($resp, 'ASCII'));//发送登陆认证,
//获取服务端响应报文,先读取消息头
$data = socket_read($socket, 8192, PHP_BINARY_READ);
$byteArr = unpack('C*', $data);//解包
$type = $byteArr[5];
// var_dump(sprintf('%02x',$type));
$type_str = '';
if ($type == 0x00) {
$type_str = "OK";//响应命令
} elseif ($type == 0xff) {
$type_str = "ERR";
} elseif ($type == 0xfe) {
$type_str = "EOF";//EOF 报文表示登陆成功
} elseif ($type <= 250) {
}
echo $type_str;
socket_close($socket);
exit;
}
function decodeData($data){
$byteArr = unpack('C*',$data);//解包
//获取消息长度
$package_length = $byteArr[1] | ($byteArr[2] << 8) | ($byteArr[3] << 16);//前面3个字节表示长度
$package_number = $byteArr[4];//消息序号
//获取报文类型
$type = $byteArr[5];
$protocol = $type_str = '';
if($type == 0x00 ){
$type_str = "OK";//响应命令
}elseif ($type == 0xff){
$type_str = "ERR";
}elseif($type == 0xfe){
$type_str = "EOF";
}elseif($type <= 250 ) {
$type_str = "DATA";//响应数据
$protocol = $type;//协议版本号为响应类型
}
$nextString = mb_substr($data,4);
var_dump($nextString);
//nullpos
for($nullpos=6;$nullpos<count($byteArr);$nullpos++){
if($byteArr[$nullpos] == 0x00){
break;
}
}
$shuzu = array_slice($byteArr,5,($nullpos-5));
$version = pack('C*',...$shuzu);
$leftPacket = array_slice($byteArr,$nullpos);
// var_dump($leftPacket);
//服务器线程id
$thread_id = $leftPacket[0] | ($leftPacket[1] << 8) | ($leftPacket[2] << 16) | ($leftPacket[3] << 24);//前面3个字节表示长度
//第一个挑战随机数
$salt1 = pack('C*',...array_slice($leftPacket,4,8));
$fill = sprintf('%02x',$leftPacket[12]);//一位填充位
//服务器权能标志
$server_capability = sprintf('%02x%02x',$leftPacket[13],$leftPacket[14]);
var_dump($server_capability);
$charset = sprintf('%02x',$leftPacket[15]);//字符编码
$status = sprintf('%02x%02x',$leftPacket[16],$leftPacket[17]);//服务器状态
var_dump($status);
$server_capability_ext = sprintf('%02x%02x',$leftPacket[18],$leftPacket[19]);//服务器权能标志(高16位)
$auth_plugin_data_len = $leftPacket[20];//随机挑战数长度
var_dump($auth_plugin_data_len);
$fill2 = array_slice($leftPacket,21,10);//填充值
for($nullpos=31;$nullpos<count($leftPacket);$nullpos++){
if($leftPacket[$nullpos] == 0x00){
break;
}
}
$salt2 = array_slice($leftPacket,31,($nullpos-31));//第二部分挑战数
$salt2 = pack('C*',...$salt2);//第二部分挑战数
$client_auth_plugin = array_slice($leftPacket,$nullpos);
$client_auth_plugin =pack('C*',...$client_auth_plugin);//登陆组件名
var_dump($client_auth_plugin);
return compact('package_length','package_number','type_str','protocol',
'version','thread_id','salt1','fill','server_capability','charset',
'status','server_capability_ext','auth_plugin_data_len','fill2','salt2','client_auth_plugin');
}
//登录
function formatLogin($greeting){
global $user;
global $password;
global $db;
//客户端配置项 类似bitMap 具体每个值的权限可以参考mysql的文档
$body['capability'] = bin2hex(pack('v', 0b1010001010001101));
$body['capability_ext'] = bin2hex(pack('v', 0b0000000000001010));
$body['message_max_length'] = '000000c0';//最大消息长度
$body['charset'] = $greeting['charset'];//字符编码
$body['unused'] = str_repeat('00', 23);//填充数
//00分隔符
$body['user_name'] = bin2hex($user) . '00';
$salt = $greeting['salt1'] . $greeting['salt2'];
$part1 = sha1($password, true);
$part2 = sha1($salt . sha1(sha1($password, true), true), true);
//挑战认证数据,\x14表示后面有20位的密码 如果不传database的话要再结尾加上00表结束
$body['passwd'] ='14'. bin2hex($part1 ^ $part2);
$body['database'] = bin2hex($db) . '00';
$body['client_auth_plugin'] = bin2hex($greeting['client_auth_plugin']);
$body['connection_attributes'] = '150c' . bin2hex('_client_name') . '07' . bin2hex('mysqlnd');
// var_dump($body);
$stream = '';
foreach ($body as $v) {
$stream .= $v;
}
//消息体封包
$stream = _packageBody($stream, 1);
return _hex2ascii($stream);
}
//16进制转换成ascii 反过来是bin2hex
function _hex2ascii($hex_str) {
$send_msg = "";
foreach (str_split($hex_str, 2) as $v) {
$send_msg .= chr(hexdec($v));
}
return $send_msg;
}
/*---------- 格式化发送的数据 ----------*/
/**
* 给消息体封装包头信息 (body数据长度数据(小端序3字节).第几个包(小端序1字节).消息体)
* @param $hex_body
* @param int $package_number
* @return string
*/
function _packageBody($hex_body, $package_number = 0) {
$byte_len = mb_strlen($hex_body, 'ASCII') / 2;//两个16进制位=1个字节
//转换成24bit 小端字节序,3个字节
$package_length = substr(bin2hex(pack('V', $byte_len)), 0, 6);
//转换成8bit 小端字节序
$package_number = bin2hex(pack('h', $package_number));
return $package_length . $package_number . $hex_body;
}
上面的代码只是简单的实现了一个mysql登陆过程,距离完成整个mysql客户端还差了99.9%。你还要实现增删改查,你要实现事务功能,日志功能、读写锁等等,很复杂,但Swoole团队实现了
用Swoole来实现Mysql客户端
Swoole团队实现了一个协程 MySQL 客户端。使用方法如下:
use Swoole\Coroutine\MySQL;
use function Swoole\Coroutine\run;
run(function () {
$swoole_mysql = new MySQL();
$swoole_mysql->connect([
'host' => '127.0.0.1',
'port' => 3306,
'user' => 'user',
'password' => 'pass',
'database' => 'test',
]);
$res = $swoole_mysql->query('select sleep(1)');
var_dump($res);
});
具体使用文档:https://wiki.swoole.com/#/coroutine_client/mysql
一键协程化
起初,为了解决这些客户端的协程支持问题 Swoole 开发组做了大量的工作:比如完成了mysql客户端,redis客户端,针对每种类型的客户端都完成对应的适配。但慢慢的,swoole发现了这样做有一些问题:
• 实现复杂,团队需要针对每个客户端都需要很了解,想要完成完美的支持,那需要投入巨大的工作量。
• 增加了用户的技术负担,比如用户都习惯了PHP原生的PDO,那么现在需要用陌生的 Swoole\Coroutine\MySQL[2] 的方法,增加了学习成本。
• 很难覆盖到所有的操作,比如 proc_open()、sleep() 函数等等也可能阻塞住导致程序变成同步阻塞的
• phper一般都会安装好了php_pdo,phpredis等扩展,没有好好利用真的很可惜。
针对上述问题,Swoole 开发组换了实现思路,采用 Hook 原生 PHP 函数的方式实现协程客户端,通过一行代码就可以让原来的同步 IO 的代码变成可以协程调度[3]的异步 IO[4],即一键协程化。
实现方式如下:
Co::set(['hook_flags' => SWOOLE_HOOK_ALL]); //不包括CURL
Co::set(['hook_flags' => SWOOLE_HOOK_ALL | SWOOLE_HOOK_CURL]); //真正的协程化所有类型,包括CURL
通过一行代码,就能把原来的同步Io变成了异步IO,文档地址:https://wiki.swoole.com/#/runtime
有了这个特性,在hyperf框架的StartServer这个Command中,通过一句Coroutine::set(['hook_flags' => swoole_hook_flags()]);来实现了整个框架的协程化
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->checkEnvironment($output);
$serverFactory = $this->container->get(ServerFactory::class)
->setEventDispatcher($this->container->get(EventDispatcherInterface::class))
->setLogger($this->container->get(StdoutLoggerInterface::class));
$serverConfig = $this->container->get(ConfigInterface::class)->get('server', []);
if (! $serverConfig) {
throw new InvalidArgumentException('At least one server should be defined.');
}
$serverFactory->configure($serverConfig);
Coroutine::set(['hook_flags' => swoole_hook_flags()]);
$serverFactory->start();
return 0;
}
TCP/UDP客户端
Coroutine\Client 提供了 TCP、UDP、unixSocket[5] 传输协议的 Socket 客户端[6]封装代码,使用时仅需 new Swoole\Coroutine\Client 即可。
使用示例:
use Swoole\Coroutine\Client;
use function Swoole\Coroutine\run;
run(function () {
$client = new Client(SWOOLE_SOCK_TCP);
if (!$client->connect('127.0.0.1', 9501, 0.5))
{
echo "connect failed. Error: {$client->errCode}\n";
}
$client->send("hello world\n");
echo $client->recv();
$client->close();
});
协程客户端也支持长度和 EOF 协议处理,支持通过协议解析来处理TCP 数据包边界问题[7]。
结束符检测
$client->set(array(
'open_eof_check' => true,
'package_eof' => "\r\n\r\n",
'package_max_length' => 1024 * 1024 * 2,
));
长度检测
$client->set(array(
'open_length_check' => true,
'package_length_type' => 'N',
'package_length_offset' => 0, //第N个字节是包长度的值
'package_body_offset' => 4, //第几个字节开始计算长度
'package_max_length' => 2000000, //协议最大长度
));
启用mqtt协议
$client->set(array(
'open_mqtt_protocol' => true,
));
Socket客户端来
Swoole\Coroutine\Socket 模块相比协程风格服务端和协程客户端相关模块 Socket 可以实现更细粒度的一些 IO 操作。
示例:
use Swoole\Coroutine;
use function Swoole\Coroutine\run;
run(function () {
$socket = new Coroutine\Socket(AF_INET, SOCK_STREAM, 0);
$retval = $socket->connect('127.0.0.1', 9601);
while ($retval)
{
$n = $socket->send('hello');
var_dump($n);
$data = $socket->recv();
var_dump($data);
//发生错误或对端关闭连接,本端也需要关闭
if ($data === '' || $data === false) {
echo "errCode: {$socket->errCode}\n";
$socket->close();
break;
}
Coroutine::sleep(1.0);
}
var_dump($retval, $socket->errCode, $socket->errMsg);
});
Coroutine\Socket 模块提供的 IO 操作接口均为同步编程风格,底层自动使用协程调度器实现异步 IO。
Socket是比较底层了一个类了,前面的TCP/UDP客户端都是Socket的封装,它们都内置了一个socket属性成员。
FastCGI 客户端
Swoole应用发布时一般都是通过nginx做方向代理来实现服务的发布。那么有没有可能不用做反向代理直接运行呢,答案是可以的。
我们知道php自带的php-fpm启动后,会创建一个master进程,监听9000端口(可配置),master进程又会根据fpm.conf/www.conf去创建若干子进程,子进程用于处理实际的业务。当有客户端来连接9000端口时,空闲子进程会自己去accept,如果子进程全部处于忙碌状态,新进的待accept的连接会被master放进队列里,等待fpm子进程空闲;[8]
我们看nginx的配置文件,一般都会有如下配置:
location ~ [^/]\.php(/|$)
{
try_files $uri =404;
fastcgi_pass unix:/tmp/php-cgi.sock;
fastcgi_index index.php;
include fastcgi.conf;
include pathinfo.conf;
}
在看php-fpm.conf的配置可以知道,unix:/tmp/php-cgi.sock;其实就是127.0.0.1:9000 的unixsocket地址,说明nginx把php文件的请求转发给我php-fpm去处理了,而php-fpm收到请求就执行原生php运算了,swoole利用Socket客户端类对接了127.0.0.1:9000,接管了php原生部分的工作,从而实现了一个FastCGI 客户端,通过 FastCGI 客户端,可以直接与 PHP-FPM 服务进行交互而无需通过任何 HTTP 反向代理。
FastCGI客户端使用示例如下:
try {
$client = new \Swoole\Coroutine\FastCGI\Client('127.0.0.1', 9000);
$request = (new \Swoole\FastCGI\HttpRequest())
->withDocumentRoot(__DIR__)
->withScriptFilename(__DIR__ . '/var.php')
->withScriptName('var.php')
->withMethod('POST')
->withUri('/var?foo=bar&bar=char')
->withHeader('X-Foo', 'bar')
->withHeader('X-Bar', 'char')
->withBody(['foo' => 'bar', 'bar' => 'char']);
$response = $client->execute($request);
echo "Result: \n{$response->getBody()}";
} catch (\Swoole\Coroutine\FastCGI\Client\Exception $exception) {
echo "Error: {$exception->getMessage()}\n";
}
引用链接
[1]
socket: https://www.php.net/manual/en/ref.sockets.php[2]
Swoole\Coroutine\MySQL: https://wiki.swoole.com/#/coroutine_client/mysql[3]
协程调度: https://wiki.swoole.com/#/coroutine?id=%e5%8d%8f%e7%a8%8b%e8%b0%83%e5%ba%a6[4]
异步 IO: https://wiki.swoole.com/#/learn?id=%e5%90%8c%e6%ad%a5io%e5%bc%82%e6%ad%a5io[5]
unixSocket: https://wiki.swoole.com/#/learn?id=%e4%bb%80%e4%b9%88%e6%98%afipc[6]
Socket 客户端: https://wiki.swoole.com/#/coroutine_client/socket[7]
TCP 数据包边界问题: https://wiki.swoole.com/#/learn?id=tcp%e6%95%b0%e6%8d%ae%e5%8c%85%e8%be%b9%e7%95%8c%e9%97%ae%e9%a2%98[8]
www.conf去创建若干子进程,子进程用于处理实际的业务。当有客户端来连接9000端口时,空闲子进程会自己去accept,如果子进程全部处于忙碌状态,新进的待accept的连接会被master放进队列里,等待fpm子进程空闲;: http://www.conf去创建若干子进程,子进程用于处理实际的业务。当有客户端来连接9000端口时,空闲子进程会自己去accept,如果子进程全部处于忙碌状态,新进的待accept的连接会被master放进队列里,等待fpm子进程空闲;
发表评论 取消回复