多端口监听

在开发服务器的时候,我们常常要同对外开放很多端口,比如开放80端口提供http服务,开放9501端口提供Tcp服务,开放9502端口提供websocket服务,这时我们不需要new多个server,只需要用listen方法新增端口监听和协议即可
由于协程服务类Swoole\Coroutine\Http\Server没有多端口函数,我们用异步服务类Swoole\Http\Server来编写,代码如下


$server = new \Swoole\Http\Server('0.0.0.0',88);
$server->on('request',function (\Swoole\Http\Request $request,\Swoole\Http\Response $response){
  $route = $request->server['request_uri'];
  echo $route;
  $response->end("<h1>Hello 你来自{$route}</h1>");
});
$server->on('start',function(){
  echo '服务启动';
});
$tcp = $server->listen('0.0.0.0',9501,SWOOLE_SOCK_TCP);
$webSocket = $server->listen('0.0.0,0',9502,SWOOLE_SOCK_TCP);
$webSocket->set(['open_websocket_protocol'=>true]);
$webSocket->on('receive',function(){
  echo 'Websocket收到了数据';
});
$server->start();

$webSocket端口对象要添加on方法,不然会报 Swoole\Server::start(): require onMessage callback 错误

关于Server的构造参数,请自行阅读官方文档:https://wiki.swoole.com/#/server/tcp_init

通过上面的代码我们就建立了3个服务88端口的http服务、9501端口的TCP服务,9502端口的websocket服务
那么如何接收请求呢,会不会混乱因为已经用端口区分开了,让每个端口各自处理请求,用的是Port对象的on方法,listen函数会返回一个\Swoole\Server\Port对象,根据协议不同,Port对象可以处理的事件不同,具体如下
TCP 服务器[1]

  • • onConnect

  • • onClose

  • • onReceive

UDP 服务器[2]

  • • onPacket

  • • onReceive

HTTP 服务器[3]

  • • onRequest

WebSocket 服务器[4]

  • • onMessage

  • • onOpen

  • • onHandshake

Swoole在物联网的应用

MODBUS通讯

用Swoole可以开发物联网应用,假设我们手上有一个环境监测设备,设备采用标准 MODBUS-RTU 通信协议,RS485 信号输出,现在我们就来开发一个服务器和这个设备通讯,
1、首先,我们一般会拿到设备的对接文档,大概长这样子的

图片


图片


图片


2、然后我们随便找一个网络调试工具,用于模拟设备端来开发调试

图片

3、我们就开始写服务端代码
假如我们要向03号设备查询当前环境的温度值,根据文档,我们需要构造的发送端数据是

地址码功能码起始地址数据长度校验码低位校验码高位
0x030x03num2hex(501)(2个字节)num2hex(2)(2个字节)crc16低位crc16高位

function num2hex($num,$len){
    $hex = base_convert($num,10,16);
    //一个字节=2个16进制位
    $len = $len*2;
    if(strlen($hex)<$len){
        $hex = str_repeat('0',$len-strlen($hex)).$hex;
    }
    return $hex;
}

function crc16($string)
{
    $hex = pack('H*', $string);
    $crc = 0xFFFF;
    for ($x = 0; $x < strlen($hex); $x++) {
        $crc = $crc ^ ord($hex[$x]);
        for ($y = 0; $y < 8; $y++) {
            if (($crc & 0x0001) == 0x0001) {
                $crc = (($crc >> 1) ^ 0xA001);
            } else {
                $crc = $crc >> 1;
            }
        }
    }
    return $crc;
}

        $low = sprintf('%02s',dechex($crc %256));// 低八位

        $high = sprintf('%02s',dechex(floor($crc /256)));// 高8位

通过函数的处理,便可得出发送数据为
03 03 01 f5 00 02 d4 27

go(function(){
    $server = new \Swoole\Coroutine\Server('0.0.0.0',9501,false,true);
    $server->handle(function (\Swoole\Coroutine\Server\Connection $conn){
        echo '您好,欢迎进入聊天室'."\n";
        $socket = $conn->exportSocket();
        //向设备发送数据
        $command = '030301f50002d427';
        $conn->send(hex2bin($command));
        while (true){
            $data = $conn->recv();
            if ($data === '' || $data === false) {
//                $errCode = swoole_last_error();
//                $errMsg = socket_strerror($errCode);
//                echo "协程风格下链接是动态创建和销毁的,可以设置5秒没收到数据自动超时关闭\n";
//                $conn->close();
                break;
            }
            else{
                echo '收到数据:'.bin2hex($data);
            }
        }
        //向设备发送请求数据

        echo '客户端:'.$socket->fd.'的链接已经关闭';
    });
    $server->start();
});

在设备端点击连接后就收到了数据请求了

图片


这时设备端需要回复应答帧给服务端去解析

图片


图片


解析的过程刚好是刚才组装数据的逆向,由于设备发来的数据可能存在丢包,所以要先做一下crc校验,校验非常简单,去掉最后2个字节,用0303040292ff9b通过crc16函数算出crc的低位和高位,然后对比79fd看是否对应上,如果对应上了,那数据就是正确的了。拿到数据后,再通过进制换算就可以拿到温度数据了。
0xFF9B (十六进制)= -101 => 温度 = -10.1℃

TCP 粘包问题

Swoole在没有并发的情况下快速启动中的代码可以正常运行,但是并发高了就会有 TCP 数据包边界问题,TCP 协议在底层机制上解决了 UDP 协议的顺序和丢包重传问题,但相比 UDP 又带来了新的问题,TCP 协议是流式的,数据包没有边界,应用程序使用 TCP 通信就会面临这些难题,俗称** TCP 粘包问题**。
因为 TCP 通信是流式的,在接收 1 个大数据包时,可能会被拆分成多个数据包发送。多次 Send 底层也可能会合并成一次进行发送。这里就需要 2 个操作来解决:

  • • 分包:Server 收到了多个数据包,需要拆分数据包

  • • 合包:Server 收到的数据只是包的一部分,需要缓存数据,合并成完整的包

所以 TCP 网络通信时需要设定通信协议。常见的 TCP 通用网络通信协议有 HTTP、HTTPS、FTP、SMTP、POP3、IMAP、SSH、Redis、Memcache、MySQL 。
值得一提的是,Swoole 内置了很多常见通用协议的解析,来解决这些协议的服务器的 TCP 数据包边界问题,只需要简单的配置即可,参考 open_http_protocol[5]/open_http2_protocol[6]/open_websocket_protocol[7]/open_mqtt_protocol[8]
前面的例子中,我们用的modbus不在通用协议里面,那么如何处理呢,其实,除了通用协议外还可以自定义协议,Swoole 支持了 2 种类型的自定义网络通信协议

  • • EOF 结束符协议

此协议处理的原理是每个数据包结尾加一串特殊字符表示包已结束,所以才用此处理方式的话要保证每个数据段里面不能有EOF结束符,否则会造成分包错误,设置的代码如下

$server->set(array(
    'open_eof_check' => true,
    'package_eof' => "\r\n",
));
$client->set(array(
    'open_eof_check' => true,
    'package_eof' => "\r\n",
));
  • • 固定包头 + 包体协议

固定包头的方法是最通用的,我们前面用的modbus可以通过这种方式来解决粘包问题。在服务器端程序中经常能看到。这种协议的特点是一个数据包总是由包头 + 包体 2 部分组成。包头由一个字段指定了包体或整个包的长度,长度一般是使用 2 字节 /4 字节整数来表示。服务器收到包头后,可以根据长度值来精确控制需要再接收多少数据就是完整的数据包。Swoole 的配置可以很好的支持这种协议,可以灵活地设置 4 项参数应对所有情况,具体文档在这:https://wiki.swoole.com/#/server/setting?id=open_length_check,这对前面的modbus,我们的需要的配置如下

    $server->set(array(
        'open_length_check'   => true,
        'package_length_func' => function ($data) {
            //提取前面3个字节,第3个字节为数据区长度,然后+5(包括2个字节校验码)就是总长度
            if (strlen($data) < 3) {
                return 0;
            }
            $length = hexdec(bin2hex($data[2]));
            if ($length <= 0) {
                return -1;
            }
            $total = $length + 5;
            echo $total;
            return $total;
        },
        'package_max_length'  => 2000000,  //协议最大长度
    ));

搭建Mqtt服务

关于什么是MQTT,请看《什么是MQTT,物联网MQTT协议详解》[9]
通过设置 open_mqtt_protocol[10] 选项,启用后Swoole会解析 MQTT 包头,Worker 进程的 onReceive[11] 事件每次会返回一个完整的 MQTT 数据包。
可以使用 Swoole 作为 MQTT 服务端(broker)或客户端(client),实现一套完整物联网(IOT)解决方案
以下是实现一个MQTT服务端的代码

<?php
function decodeValue($data)
{
    return 256 * ord($data[0]) + ord($data[1]);
}

function decodeString($data)
{
    $length = decodeValue($data);
    return substr($data, 2, $length);
}

function mqttGetHeader($data)
{
    $byte = ord($data[0]);

    $header['type'] = ($byte & 0xF0) >> 4;
    $header['dup'] = ($byte & 0x08) >> 3;
    $header['qos'] = ($byte & 0x06) >> 1;
    $header['retain'] = $byte & 0x01;

    return $header;
}

function eventConnect($header, $data)
{
    $connect_info['protocol_name'] = decodeString($data);
    $offset = strlen($connect_info['protocol_name']) + 2;

    $connect_info['version'] = ord(substr($data, $offset, 1));
    $offset += 1;

    $byte = ord($data[$offset]);
    $connect_info['willRetain'] = ($byte & 0x20 == 0x20);
    $connect_info['willQos'] = ($byte & 0x18 >> 3);
    $connect_info['willFlag'] = ($byte & 0x04 == 0x04);
    $connect_info['cleanStart'] = ($byte & 0x02 == 0x02);
    $offset += 1;

    $connect_info['keepalive'] = decodeValue(substr($data, $offset, 2));
    $offset += 2;
    $connect_info['clientId'] = decodeString(substr($data, $offset));
    return $connect_info;
}

$server = new Swoole\Server('0.0.0.0', 9501, SWOOLE_BASE);

$server->set([
    'open_mqtt_protocol' => true, // 启用 MQTT 协议
    'worker_num' => 1,
]);

$server->on('Connect', function ($server, $fd) {
    echo "mqtt客户端连接进来.\n";
});

$server->on('Receive', function ($server, $fd, $reactor_id, $data) {
    $header = mqttGetHeader($data);//swoole默认为什么解析了
    var_dump($header);
    if ($header['type'] == 1) {
        $resp = chr(32) . chr(2) . chr(0) . chr(0);
        eventConnect($header, substr($data, 2));
        $server->send($fd, $resp);
    } elseif ($header['type'] == 3) {
        $offset = 2;
        $topic = decodeString(substr($data, $offset));
        $offset += strlen($topic) + 2;
        $msg = substr($data, $offset);
        echo "client msg: {$topic}\n----------\n{$msg}\n";
        //file_put_contents(__DIR__.'/data.log', $data);
    }
    echo "收到消息,长度为" . strlen($data) . "\n";
});

$server->on('Close', function ($server, $fd) {
    echo "客户端关闭连接.\n";
});
$server->on('start',function (){
    echo "Mqtt Server启动了!";
});

$server->start();

启动后,我们通过Mqtt工具MQTTX来测试

图片


图片


异步任务

异步任务在任何系统中都至关重要,在 Server 程序中如果需要执行很耗时的操作,比如一个聊天服务器发送广播,比如文件传输,比如Web 服务器中发送邮件。如果直接去执行这些函数就会阻塞当前进程,导致服务器响应变慢,因此,我们需要异步任务
下面演示用代码演示,下载一本8M多的MQTT pdf书籍,书籍地址:https://www.laojunsay.com/wp-content/uploads/2023/04/Gaston-C.-Hillar-MQTT-Essentials-A-Lightweight-IoT-Protocol-2017-Packt-Publishing-libgen.li_.pdf

$server = new \Swoole\Server('0.0.0.0',9501,SWOOLE_PROCESS,SWOOLE_SOCK_TCP);
$server->set(array(
    'worker_num'    => 4,     // 进程数
    'task_worker_num'=>4
));

//此回调函数在worker进程中执行。
$server->on('Receive', function($serv, $fd, $reactor_id, $data) {
    //投递异步任务
    $url = $data;

    echo "收到一个下载地址:地址为:{$url}\n";
    $taskInfo = ['fd'=>$fd,'url'=>$url];
    $task_id = $serv->task($taskInfo);
    echo "立即指派给异步任务去下载: 任务id={$task_id},我要去处理别的事情了\n";
});

//处理异步任务(此回调函数在task进程中执行)。
$server->on('Task', function ($serv, $task_id, $reactor_id, $data) {
    echo "异步开始下载,[id={$task_id}]".PHP_EOL;
    $url = $data['url'];
    $ch = curl_init();
    $timeout = 60;
    curl_setopt($ch, CURLOPT_URL, $url);
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
    curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $timeout);//在需要用户检测的网页里需要增加下面两行
    curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
    //curl_setopt($ch, CURLOPT_HTTPAUTH, CURLAUTH_ANY);
    //curl_setopt($ch, CURLOPT_USERPWD, US_NAME.”:”.US_PWD);
    $contents = curl_exec($ch);
    curl_close($ch);
    file_put_contents('./mqtt.pdf',$contents);
    //返回任务执行的结果
    $finishInfo = ['msg'=>"资源:{$data['url']} -> 下载完成!",'fd'=>$data['fd']];
    $serv->finish($finishInfo);
});

//处理异步任务的结果(此回调函数在worker进程中执行)。
$server->on('Finish', function ($serv, $task_id, $data) {
    echo "异步任务[{$task_id}] 处理完成: {$data['msg']}".PHP_EOL;
    $serv->send($data['fd'],iconv('utf-8','gb2312',$data['msg']));
});

$server->on('start',function (){
    echo '服务器启动!'."\r\n";
});
$server->start();

演示结果

图片


图片


图片