Socket 深度探索 4 PHP (一)

字体大小: 中小 标准 ->行高大小: 标准
Socket(套接字)一直是网络层的底层核心内容,也是 TCP/IP 以及 UDP 底层协议的实现通道。随着互联网信息时代的爆炸式发展,当代服务器的性能问题面临越来越大的挑战,著名的 C10K 问题(http://www.kegel.com/c10k.html)也随之出现。幸亏通过大牛们的不懈努力,区别于传统的 select/poll 的 epoll/kqueue 方式出现了,目前 linux2.6 以上的内核都普遍支持,这是 Socket 领域一项巨大的进步,不仅解决了 C10K 问题,也渐渐成为了当代互联网的底层核心技术。libevent 库就是其中一个比较出彩的项目(现在非常多的开源项目都有用到,包括 Memcached),感兴趣的朋友可以研究一下。

由于网络上系统介绍这个部分的文章并不多,而涉及 PHP 的就更少了,所以石头君在这里希望通过《Socket深度探究4PHP》这个系列给对这个领域感兴趣的读者们一定的帮助,也希望大家能和我一起对这个问题进行更深入的探讨。首先,解释一下目前 Socket 领域比较易于混淆的概念有:阻塞/非阻塞、同步/异步、多路复用等。

1、阻塞/非阻塞:这两个概念是针对 IO 过程中进程的状态来说的,阻塞 IO 是指调用结果返回之前,当前线程会被挂起;相反,非阻塞指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。

2、同步/异步:这两个概念是针对调用如果返回结果来说的,所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回;相反,当一个异步过程调用发出后,调用者不能立刻得到结果,实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。

3、多路复用(IO/Multiplexing):为了提高数据信息在网络通信线路中传输的效率,在一条物理通信线路上建立多条逻辑通信信道,同时传输若干路信号的技术就叫做多路复用技术。对于 Socket 来说,应该说能同时处理多个连接的模型都应该被称为多路复用,目前比较常用的有 select/poll/epoll/kqueue 这些 IO 模型(目前也有像 Apache 这种每个连接用单独的进程/线程来处理的 IO 模型,但是效率相对比较差,也很容易出问题,所以暂时不做介绍了)。在这些多路复用的模式中,异步阻塞/非阻塞模式的扩展性和性能最好。

感觉概念很抽象对吧,“一切答案在于现场”,下面让我们从三种经典的 PHP Socket IO 模型实例来对以上的概念再做一次分析:

1、使用 accept 阻塞的古老模型:属于同步阻塞 IO 模型,代码如下:

socket_server.php

[php] view plaincopyprint?
1.<?php 
2./** 
3.* SocketServer Class 
4.* By James.Huang <shagoo#gmail.com> 
5.**/ 
6.set_time_limit(0); 
7.class SocketServer 
8.{ 
9.private static $socket; 
10.function SocketServer($port) 
11.{ 
12.global $errno, $errstr; 
13.if ($port < 1024) { 
14.die("Port must be a number which bigger than 1024/n"); 
15.} 
16.17.$socket = stream_socket_server("tcp://0.0.0.0:{$port}", $errno, $errstr); 
18.if (!$socket) die("$errstr ($errno)"); 
19.20.// stream_set_timeout($socket, -1); // 保证服务端 socket 不会超时,似乎没用:) 
21.22.while ($conn = stream_socket_accept($socket, -1)) { // 这样设置不超时才油用 
23.static $id = 0; 
24.static $ct = 0; 
25.$ct_last = $ct; 
26.$ct_data = ''; 
27.$buffer = ''; 
28.$id++; // increase on each accept 
29.echo "Client $id come./n"; 
30.while (!preg_match('//r?/n/', $buffer)) { // 没有读到结束符,继续读 
31.// if (feof($conn)) break; // 防止 popen 和 fread 的 bug 导致的死循环 
32.$buffer = fread($conn, 1024); 
33.echo 'R'; // 打印读的次数 
34.$ct += strlen($buffer); 
35.$ct_data .= preg_replace('//r?/n/', '', $buffer); 
36.} 
37.$ct_size = ($ct - $ct_last) * 8; 
38.echo "[$id] " . __METHOD__ . " > " . $ct_data . "/n"; 
39.fwrite($conn, "Received $ct_size byte data./r/n"); 
40.fclose($conn); 
41.} 
42.43.fclose($socket); 
44.} 
45.} 
46.new SocketServer(2000); 
<?php
/**
* SocketServer Class
* By James.Huang <shagoo#gmail.com>
**/
set_time_limit(0);
class SocketServer
{
private static $socket;
function SocketServer($port)
{
global $errno, $errstr;
if ($port < 1024) {
die("Port must be a number which bigger than 1024/n");
}
$socket = stream_socket_server("tcp://0.0.0.0:{$port}", $errno, $errstr);
if (!$socket) die("$errstr ($errno)");
//		stream_set_timeout($socket, -1); // 保证服务端 socket 不会超时,似乎没用:)
while ($conn = stream_socket_accept($socket, -1)) { // 这样设置不超时才油用
static $id = 0;
static $ct = 0;
$ct_last = $ct;
$ct_data = '';
$buffer = '';
$id++; // increase on each accept
echo "Client $id come./n";
while (!preg_match('//r?/n/', $buffer)) { // 没有读到结束符,继续读
//				if (feof($conn)) break; // 防止 popen 和 fread 的 bug 导致的死循环
$buffer = fread($conn, 1024);
echo 'R'; // 打印读的次数
$ct += strlen($buffer);
$ct_data .= preg_replace('//r?/n/', '', $buffer);
}
$ct_size = ($ct - $ct_last) * 8;
echo "[$id] " . __METHOD__ . " > " . $ct_data . "/n";
fwrite($conn, "Received $ct_size byte data./r/n");
fclose($conn);
}
fclose($socket);
}
}
new SocketServer(2000);
socket_client.php

[php] view plaincopyprint?
1.<?php 
2./** 
3.* Socket Test Client 
4.* By James.Huang <shagoo#gmail.com> 
5.**/ 
6.function debug ($msg) 
7.{ 
8.// echo $msg; 
9.error_log($msg, 3, '/tmp/socket.log'); 
10.} 
11.if ($argv[1]) { 
12.13.$socket_client = stream_socket_client('tcp://0.0.0.0:2000', $errno, $errstr, 30); 
14.15.// stream_set_blocking($socket_client, 0); 
16.// stream_set_timeout($socket_client, 0, 100000); 
17.18.if (!$socket_client) { 
19.die("$errstr ($errno)"); 
20.} else { 
21.$msg = trim($argv[1]); 
22.for ($i = 0; $i < 10; $i++) { 
23.$res = fwrite($socket_client, "$msg($i)"); 
24.usleep(100000); 
25.echo 'W'; // 打印写的次数 
26.// debug(fread($socket_client, 1024)); // 将产生死锁,因为 fread 在阻塞模式下未读到数据时将等待 
27.} 
28.fwrite($socket_client, "/r/n"); // 传输结束符 
29.debug(fread($socket_client, 1024)); 
30.fclose($socket_client); 
31.} 
32.} 
33.else { 
34.35.// $phArr = array(); 
36.// for ($i = 0; $i < 10; $i++) { 
37.// $phArr[$i] = popen("php ".__FILE__." '{$i}:test'", 'r'); 
38.// } 
39.// foreach ($phArr as $ph) { 
40.// pclose($ph); 
41.// } 
42.43.for ($i = 0; $i < 10; $i++) { 
44.system("php ".__FILE__." '{$i}:test'"); 
45.} 
46.} 
<?php
/**
* Socket Test Client
* By James.Huang <shagoo#gmail.com>
**/
function debug ($msg)
{
//	echo $msg;
error_log($msg, 3, '/tmp/socket.log');
}
if ($argv[1]) {
$socket_client = stream_socket_client('tcp://0.0.0.0:2000', $errno, $errstr, 30);
//	stream_set_blocking($socket_client, 0);
//	stream_set_timeout($socket_client, 0, 100000);
if (!$socket_client) {
die("$errstr ($errno)");
} else {
$msg = trim($argv[1]);
for ($i = 0; $i < 10; $i++) {
$res = fwrite($socket_client, "$msg($i)");
usleep(100000);
echo 'W'; // 打印写的次数
//			debug(fread($socket_client, 1024)); // 将产生死锁,因为 fread 在阻塞模式下未读到数据时将等待
}
fwrite($socket_client, "/r/n"); // 传输结束符
debug(fread($socket_client, 1024));
fclose($socket_client);
}
}
else {
//	$phArr = array();
//	for ($i = 0; $i < 10; $i++) {
//		$phArr[$i] = popen("php ".__FILE__." '{$i}:test'", 'r');
//	}
//	foreach ($phArr as $ph) {
//		pclose($ph);
//	}
for ($i = 0; $i < 10; $i++) {
system("php ".__FILE__." '{$i}:test'");
}
}
首先,解释一下以上的代码逻辑:客户端 socket_client.php 循环发送数据,最后发送结束符;服务端 socket_server.php 使用 accept 阻塞方式接收 socket 连接,然后循环接收数据,直到收到结束符,返回结果数据(接收到的字节数)。虽然逻辑很简单,但是其中有几种情况很值得分析一下:

A> 默认情况下,运行 php socket_client.php test,客户端打出 10 个 W,服务端打出若干个 R 后面是接收到的数据,/tmp/socket.log 记录下服务端返回的接收结果数据。这种情况很容易理解,不再赘述。然后,使用 telnet 命令同时打开多个客户端,你会发现服务器一个时间只处理一个客户端,其他需要在后面“排队”;这就是阻塞 IO 的特点,这种模式的弱点很明显,效率极低。

B> 只打开 socket_client.php 第 26 行的注释代码,再次运行 php socket_client.php test 客户端打出一个 W,服务端也打出一个 R,之后两个程序都卡住了。这是为什么呢,分析逻辑后你会发现,这是由于客户端在未发送结束符之前就向服务端要返回数据;而服务端由于未收到结束符,也在向客户端要结束符,造成死锁。而之所以只打出一个 W 和 R,是因为 fread 默认是阻塞的。要解决这个死锁,必须打开 socket_client.php 第 16 行的注释代码,给 socket 设置一个 0.1 秒的超时,再次运行你会发现隔 0.1 秒出现一个 W 和 R 之后正常结束,服务端返回的接收结果数据也正常记录了。可见 fread 缺省是阻塞的,我们在编程的时候要特别注意,如果没有设置超时,就很容易会出现死锁。

C> 只打开 15 行注释,运行 php socket_client.php test,结果基本和情况 A 相同,唯一不同的是 /tmp/socket.log 没有记录下返回数据。这里可以看出客户端运行在阻塞和非阻塞模式的区别,当然在客户端不在乎接受结果的情况下,可以使用非阻塞模式来获得最大效率。

D> 运行 php socket_client.php 是连续运行 10 次上面的逻辑,这个没什么问题;但是很奇怪的是如果你使用 35 - 41 行的代码,用 popen 同时开启 10 个进程来运行,就会造成服务器端的死循环,十分怪异!后来经调查发现只要是用 popen 打开的进程创建的连接会导致 fread 或者 socket_read 出错直接返回空字串,从而导致死循环,查阅 PHP 源代码后发现 PHP 的 popen 和 fread 函数已经完全不是 C 原生的了,里面都插入了大量的 php_stream_* 实现逻辑,初步估计是其中的某个 bug 导致的 Socket 连接中断所导致的,解决方法就是打开 socket_server.php 中 31 行的代码,如果连接中断则跳出循环,但是这样一来就会有很多数据丢失了,这个问题需要特别注意!

2、使用 select/poll 的同步模型:属于同步非阻塞 IO 模型,代码如下:

select_server.php

[php] view plaincopyprint?
1.<?php 
2./** 
3.* SelectSocketServer Class 
4.* By James.Huang <shagoo#gmail.com> 
5.**/ 
6.set_time_limit(0); 
7.class SelectSocketServer 
8.{ 
9.private static $socket; 
10.private static $timeout = 60; 
11.private static $maxconns = 1024; 
12.private static $connections = array(); 
13.function SelectSocketServer($port) 
14.{ 
15.global $errno, $errstr; 
16.if ($port < 1024) { 
17.die("Port must be a number which bigger than 1024/n"); 
18.} 
19.20.$socket = socket_create_listen($port); 
21.if (!$socket) die("Listen $port failed"); 
22.23.socket_set_nonblock($socket); // 非阻塞 
24.25.while (true) 
26.{ 
27.$readfds = array_merge(self::$connections, array($socket)); 
28.$writefds = array(); 
29.30.// 选择一个连接,获取读、写连接通道 
31.if (socket_select($readfds, $writefds, $e = null, $t = self::$timeout)) 
32.{ 
33.// 如果是当前服务端的监听连接 
34.if (in_array($socket, $readfds)) { 
35.// 接受客户端连接 
36.$newconn = socket_accept($socket); 
37.$i = (int) $newconn; 
38.$reject = ''; 
39.if (count(self::$connections) >= self::$maxconns) { 
40.$reject = "Server full, Try again later./n"; 
41.} 
42.// 将当前客户端连接放入 socket_select 选择 
43.self::$connections[$i] = $newconn; 
44.// 输入的连接资源缓存容器 
45.$writefds[$i] = $newconn; 
46.// 连接不正常 
47.if ($reject) { 
48.socket_write($writefds[$i], $reject); 
49.unset($writefds[$i]); 
50.self::close($i); 
51.} else { 
52.echo "Client $i come./n"; 
53.} 
54.// remove the listening socket from the clients-with-data array 
55.$key = array_search($socket, $readfds); 
56.unset($readfds[$key]); 
57.} 
58.59.// 轮循读通道 
60.foreach ($readfds as $rfd) { 
61.// 客户端连接 
62.$i = (int) $rfd; 
63.// 从通道读取 
64.$line = @socket_read($rfd, 2048, PHP_NORMAL_READ); 
65.if ($line === false) { 
66.// 读取不到内容,结束连接 67.echo "Connection closed on socket $i./n"; 
68.self::close($i); 
69.continue; 
70.} 
71.$tmp = substr($line, -1); 
72.if ($tmp != "/r" && $tmp != "/n") { 
73.// 等待更多数据 
74.continue; 
75.} 
76.// 处理逻辑 
77.$line = trim($line); 
78.if ($line == "quit") { 
79.echo "Client $i quit./n"; 
80.self::close($i); 
81.break; 
82.} 
83.if ($line) { 
84.echo "Client $i >>" . $line . "/n"; 
85.} 
86.} 
87.88.// 轮循写通道 
89.foreach ($writefds as $wfd) { 
90.$i = (int) $wfd; 
91.$w = socket_write($wfd, "Welcome Client $i!/n"); 
92.} 
93.} 
94.} 
95.} 
96.97.function close ($i) 
98.{ 
99.socket_shutdown(self::$connections[$i]); 
100.socket_close(self::$connections[$i]); 
101.unset(self::$connections[$i]); 
102.} 
103.} 
104.new SelectSocketServer(2000); 
<?php
/**
* SelectSocketServer Class
* By James.Huang <shagoo#gmail.com>
**/
set_time_limit(0);
class SelectSocketServer
{
private static $socket;
private static $timeout = 60;
private static $maxconns = 1024;
private static $connections = array();
function SelectSocketServer($port)
{
global $errno, $errstr;
if ($port < 1024) {
die("Port must be a number which bigger than 1024/n");
}
$socket = socket_create_listen($port);
if (!$socket) die("Listen $port failed");
socket_set_nonblock($socket); // 非阻塞
while (true)
{
$readfds = array_merge(self::$connections, array($socket));
$writefds = array();
// 选择一个连接,获取读、写连接通道
if (socket_select($readfds, $writefds, $e = null, $t = self::$timeout))
{
// 如果是当前服务端的监听连接
if (in_array($socket, $readfds)) {
// 接受客户端连接
$newconn = socket_accept($socket);
$i = (int) $newconn;
$reject = '';
if (count(self::$connections) >= self::$maxconns) {
$reject = "Server full, Try again later./n";
}
// 将当前客户端连接放入 socket_select 选择
self::$connections[$i] = $newconn;
// 输入的连接资源缓存容器
$writefds[$i] = $newconn;
// 连接不正常
if ($reject) {
socket_write($writefds[$i], $reject);
unset($writefds[$i]);
self::close($i);
} else {
echo "Client $i come./n";
}
// remove the listening socket from the clients-with-data array
$key = array_search($socket, $readfds);
unset($readfds[$key]);
}
// 轮循读通道
foreach ($readfds as $rfd) {
// 客户端连接
$i = (int) $rfd;
// 从通道读取
$line = @socket_read($rfd, 2048, PHP_NORMAL_READ);
if ($line === false) {
// 读取不到内容,结束连接
echo "Connection closed on socket $i./n";
self::close($i);
continue;
}
$tmp = substr($line, -1);
if ($tmp != "/r" && $tmp != "/n") {
// 等待更多数据
continue;
}
// 处理逻辑
$line = trim($line);
if ($line == "quit") {
echo "Client $i quit./n";
self::close($i);
break;
}
if ($line) {
echo "Client $i >>" . $line . "/n";
}
}
// 轮循写通道
foreach ($writefds as $wfd) {
$i = (int) $wfd;
$w = socket_write($wfd, "Welcome Client $i!/n");
}
}
}
}
function close ($i)
{
socket_shutdown(self::$connections[$i]);
socket_close(self::$connections[$i]);
unset(self::$connections[$i]);
}
}
new SelectSocketServer(2000);
select_client.php

[php] view plaincopyprint?
1.<?php 
2./** 
3.* SelectSocket Test Client 
4.* By James.Huang <shagoo#gmail.com> 
5.**/ 
6.function debug ($msg) 
7.{ 
8.// echo $msg; 
9.error_log($msg, 3, '/tmp/socket.log'); 
10.} 
11.if ($argv[1]) { 
12.13.$socket_client = stream_socket_client('tcp://0.0.0.0:2000', $errno, $errstr, 30); 
14.15.// stream_set_timeout($socket_client, 0, 100000); 
16.17.if (!$socket_client) { 
18.die("$errstr ($errno)"); 
19.} else { 
20.$msg = trim($argv[1]); 
21.for ($i = 0; $i < 10; $i++) { 
22.$res = fwrite($socket_client, "$msg($i)/n"); 
23.usleep(100000); 
24.// debug(fread($socket_client, 1024)); // 将产生死锁,因为 fread 在阻塞模式下未读到数据时将等待 
25.} 
26.fwrite($socket_client, "quit/n"); // add end token 
27.debug(fread($socket_client, 1024)); 
28.fclose($socket_client); 
29.} 
30.} 
31.else { 
32.33.$phArr = array(); 
34.for ($i = 0; $i < 10; $i++) { 
35.$phArr[$i] = popen("php ".__FILE__." '{$i}:test'", 'r'); 
36.} 
37.foreach ($phArr as $ph) { 
38.pclose($ph); 
39.} 
40.41.// for ($i = 0; $i < 10; $i++) { 
42.// system("php ".__FILE__." '{$i}:test'"); 
43.// } 
44.} 
<?php
/**
* SelectSocket Test Client
* By James.Huang <shagoo#gmail.com>
**/
function debug ($msg)
{
//	echo $msg;
error_log($msg, 3, '/tmp/socket.log');
}
if ($argv[1]) {
$socket_client = stream_socket_client('tcp://0.0.0.0:2000', $errno, $errstr, 30);
//	stream_set_timeout($socket_client, 0, 100000);
if (!$socket_client) {
die("$errstr ($errno)");
} else {
$msg = trim($argv[1]);
for ($i = 0; $i < 10; $i++) {
$res = fwrite($socket_client, "$msg($i)/n");
usleep(100000);
//			debug(fread($socket_client, 1024)); // 将产生死锁,因为 fread 在阻塞模式下未读到数据时将等待
}
fwrite($socket_client, "quit/n"); // add end token
debug(fread($socket_client, 1024));
fclose($socket_client);
}
}
else {
$phArr = array();
for ($i = 0; $i < 10; $i++) {
$phArr[$i] = popen("php ".__FILE__." '{$i}:test'", 'r');
}
foreach ($phArr as $ph) {
pclose($ph);
}
//	for ($i = 0; $i < 10; $i++) {
//		system("php ".__FILE__." '{$i}:test'");
//	}
}
以上代码的逻辑也很简单,select_server.php 实现了一个类似聊天室的功能,你可以使用 telnet 工具登录上去,和其他用户文字聊天,也可以键入“quit”命令离开;而 select_client.php 则模拟了一个登录用户连续发 10 条信息,然后退出。这里也分析两个问题:

A> 这里如果我们执行 php select_client.php 程序将会同时打开 10 个连接,同时进行模拟登录用户操作;观察服务端打印的数据你会发现服务端确实是在同时处理这些连接,这就是多路复用实现的非阻塞 IO 模型,当然这个模型并没有真正的实现异步,因为最终服务端程序还是要去通道里面读取数据,得到结果后同步返回给客户端。如果这次你也使用 telnet 命令同时打开多个客户端,你会发现服务端可以同时处理这些连接,这就是非阻塞 IO,当然比古老的阻塞 IO 效率要高多了,但是这种模式还是有局限的,继续看下去你就会发现了~

B> 我在 select_server.php 中设置了几个参数,大家可以调整试试:
$timeout :表示的是 select 的超时时间,这个一般来说不要太短,否则会导致 CPU 负载过高。
$maxconns :表示的是最大连接数,客户端超过这个数的话,服务器会拒绝接收。这里要提到的一点是,由于 select 是通过句柄来读写的,所以会受到系统默认参数 __FD_SETSIZE 的限制,一般默认值为 1024,修改的话需要重新编译内核;另外通过测试发现 select 模式的性能会随着连接数的增大而线性便差(详情见《Socket深度探究4PHP(二)》),这也就是 select 模式最大的问题所在,所以如果是超高并发服务器建议使用下一种模式。

3、使用 epoll/kqueue 的异步模型:属于异步阻塞/非阻塞 IO 模型,代码如下:

epoll_server.php

[php] view plaincopyprint?
1.<?php 
2./** 
3.* EpollSocketServer Class (use libevent) 
4.* By James.Huang <shagoo#gmail.com> 
5.* 
6.* Defined constants: 
7.* 
8.* EV_TIMEOUT (integer) 
9.* EV_READ (integer) 
10.* EV_WRITE (integer) 
11.* EV_SIGNAL (integer) 
12.* EV_PERSIST (integer) 
13.* EVLOOP_NONBLOCK (integer) 
14.* EVLOOP_ONCE (integer) 
15.**/ 
16.set_time_limit(0); 
17.class EpollSocketServer 
18.{ 
19.private static $socket; 
20.private static $connections; 
21.private static $buffers; 
22.23.function EpollSocketServer ($port) 
24.{ 
25.global $errno, $errstr; 
26.27.if (!extension_loaded('libevent')) { 
28.die("Please install libevent extension firstly/n"); 
29.} 
30.31.if ($port < 1024) { 
32.die("Port must be a number which bigger than 1024/n"); 
33.} 
34.35.$socket_server = stream_socket_server("tcp://0.0.0.0:{$port}", $errno, $errstr); 
36.if (!$socket_server) die("$errstr ($errno)"); 
37.38.stream_set_blocking($socket_server, 0); // 非阻塞 
39.40.$base = event_base_new(); 
41.$event = event_new(); 
42.event_set($event, $socket_server, EV_READ | EV_PERSIST, array(__CLASS__, 'ev_accept'), $base); 
43.event_base_set($event, $base); 
44.event_add($event); 
45.event_base_loop($base); 
46.47.self::$connections = array(); 
48.self::$buffers = array(); 
49.} 
50.51.function ev_accept($socket, $flag, $base) 
52.{ 
53.static $id = 0; 
54.55.$connection = stream_socket_accept($socket); 
56.stream_set_blocking($connection, 0); 
57.58.$id++; // increase on each accept 
59.60.$buffer = event_buffer_new($connection, array(__CLASS__, 'ev_read'), array(__CLASS__, 'ev_write'), array(__CLASS__, 'ev_error'), $id); 
61.event_buffer_base_set($buffer, $base); 
62.event_buffer_timeout_set($buffer, 30, 30); 
63.event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff); 
64.event_buffer_priority_set($buffer, 10); 
65.event_buffer_enable($buffer, EV_READ | EV_PERSIST); 
66.67.// we need to save both buffer and connection outside 
68.self::$connections[$id] = $connection; 
69.self::$buffers[$id] = $buffer; 
70.} 
71.72.function ev_error($buffer, $error, $id) 
73.{ 
74.event_buffer_disable(self::$buffers[$id], EV_READ | EV_WRITE); 
75.event_buffer_free(self::$buffers[$id]); 
76.fclose(self::$connections[$id]); 
77.unset(self::$buffers[$id], self::$connections[$id]); 
78.} 
79.80.function ev_read($buffer, $id) 
81.{ 
82.static $ct = 0; 
83.$ct_last = $ct; 
84.$ct_data = ''; 
85.while ($read = event_buffer_read($buffer, 1024)) { 
86.$ct += strlen($read); 
87.$ct_data .= $read; 
88.} 
89.$ct_size = ($ct - $ct_last) * 8; 
90.echo "[$id] " . __METHOD__ . " > " . $ct_data . "/n"; 
91.event_buffer_write($buffer, "Received $ct_size byte data./r/n"); 
92.} 
93.94.function ev_write($buffer, $id) 
95.{ 
96.echo "[$id] " . __METHOD__ . "/n"; 
97.} 
98.} 
99.new EpollSocketServer(2000); 
<?php
/**
* EpollSocketServer Class (use libevent)
* By James.Huang <shagoo#gmail.com>
*
* Defined constants:
*
* EV_TIMEOUT (integer)
* EV_READ (integer)
* EV_WRITE (integer)
* EV_SIGNAL (integer)
* EV_PERSIST (integer)
* EVLOOP_NONBLOCK (integer)
* EVLOOP_ONCE (integer)
**/
set_time_limit(0);
class EpollSocketServer
{
private static $socket;
private static $connections;
private static $buffers;
function EpollSocketServer ($port)
{
global $errno, $errstr;
if (!extension_loaded('libevent')) {
die("Please install libevent extension firstly/n");
}
if ($port < 1024) {
die("Port must be a number which bigger than 1024/n");
}
$socket_server = stream_socket_server("tcp://0.0.0.0:{$port}", $errno, $errstr);
if (!$socket_server) die("$errstr ($errno)");
stream_set_blocking($socket_server, 0); // 非阻塞
$base = event_base_new();
$event = event_new();
event_set($event, $socket_server, EV_READ | EV_PERSIST, array(__CLASS__, 'ev_accept'), $base);
event_base_set($event, $base);
event_add($event);
event_base_loop($base);
self::$connections = array();
self::$buffers = array();
}
function ev_accept($socket, $flag, $base)
{
static $id = 0;
$connection = stream_socket_accept($socket);
stream_set_blocking($connection, 0);
$id++; // increase on each accept
$buffer = event_buffer_new($connection, array(__CLASS__, 'ev_read'), array(__CLASS__, 'ev_write'), array(__CLASS__, 'ev_error'), $id);
event_buffer_base_set($buffer, $base);
event_buffer_timeout_set($buffer, 30, 30);
event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff);
event_buffer_priority_set($buffer, 10);
event_buffer_enable($buffer, EV_READ | EV_PERSIST);
// we need to save both buffer and connection outside
self::$connections[$id] = $connection;
self::$buffers[$id] = $buffer;
}
function ev_error($buffer, $error, $id)
{
event_buffer_disable(self::$buffers[$id], EV_READ | EV_WRITE);
event_buffer_free(self::$buffers[$id]);
fclose(self::$connections[$id]);
unset(self::$buffers[$id], self::$connections[$id]);
}
function ev_read($buffer, $id)
{
static $ct = 0;
$ct_last = $ct;
$ct_data = '';
while ($read = event_buffer_read($buffer, 1024)) {
$ct += strlen($read);
$ct_data .= $read;
}
$ct_size = ($ct - $ct_last) * 8;
echo "[$id] " . __METHOD__ . " > " . $ct_data . "/n";
event_buffer_write($buffer, "Received $ct_size byte data./r/n");
}
function ev_write($buffer, $id)
{
echo "[$id] " . __METHOD__ . "/n";
}
}
new EpollSocketServer(2000);
epoll_client.php

[php] view plaincopyprint?
1.<?php 
2./** 
3.* EpollSocket Test Client 
4.* By James.Huang <shagoo#gmail.com> 
5.**/ 
6.function debug ($msg) 
7.{ 
8.// echo $msg; 
9.error_log($msg, 3, '/tmp/socket.log'); 
10.} 
11.if ($argv[1]) { 
12.$socket_client = stream_socket_client('tcp://0.0.0.0:2000', $errno, $errstr, 30); 
13.// stream_set_blocking($socket_client, 0); 
14.if (!$socket_client) { 
15.die("$errstr ($errno)"); 
16.} else { 
17.$msg = trim($argv[1]); 
18.for ($i = 0; $i < 10; $i++) { 
19.$res = fwrite($socket_client, "$msg($i)"); 
20.usleep(100000); 
21.debug(fread($socket_client, 1024)); 
22.} 
23.fclose($socket_client); 
24.} 
25.} 
26.else { 
27.28.$phArr = array(); 
29.for ($i = 0; $i < 10; $i++) { 
30.$phArr[$i] = popen("php ".__FILE__." '{$i}:test'", 'r'); 
31.} 
32.foreach ($phArr as $ph) { 
33.pclose($ph); 
34.} 
35.36.// for ($i = 0; $i < 10; $i++) { 
37.// system("php ".__FILE__." '{$i}:test'"); 
38.// } 
39.} 
<?php
/**
* EpollSocket Test Client
* By James.Huang <shagoo#gmail.com>
**/
function debug ($msg)
{
//	echo $msg;
error_log($msg, 3, '/tmp/socket.log');
}
if ($argv[1]) {
$socket_client = stream_socket_client('tcp://0.0.0.0:2000', $errno, $errstr, 30);
//	stream_set_blocking($socket_client, 0);
if (!$socket_client) {
die("$errstr ($errno)");
} else {
$msg = trim($argv[1]);
for ($i = 0; $i < 10; $i++) {
$res = fwrite($socket_client, "$msg($i)");
usleep(100000);
debug(fread($socket_client, 1024));
}
fclose($socket_client);
}
}
else {
$phArr = array();
for ($i = 0; $i < 10; $i++) {
$phArr[$i] = popen("php ".__FILE__." '{$i}:test'", 'r');
}
foreach ($phArr as $ph) {
pclose($ph);
}
//	for ($i = 0; $i < 10; $i++) {
//		system("php ".__FILE__." '{$i}:test'");
//	}
}
先说一下,以上的例子是基于 PHP 的 libevent 扩展实现的,需要运行的话要先安装此扩展,参考:http://pecl.php.net/package/libevent。

这个例子做的事情和前面介绍的第一个模型一样,epoll_server.php 实现的服务端也是接受客户端数据,然后返回结果(接收到的字节数)。但是,当你运行 php epoll_client.php 的时候你会发现服务端打印出来的结果和 accept 阻塞模型就大不一样了,当然运行效率也有极大的提升,这是为什么呢?接下来就介绍一下 epoll/kqueue 模型:在介绍 select 模式的时候我们提到了这种模式的局限,而 epoll 就是为了解决 poll 的这两个缺陷而生的。首先,epoll 模式基本没有限制(参考 cat /proc/sys/fs/file-max 默认就达到 300K,很令人兴奋吧,其实这也就是所谓基于 epoll 的 Erlang 服务端可以同时处理这么多并发连接的根本原因,不过现在 PHP 理论上也可以做到了,呵呵);另外,epoll 模式的性能也不会像 select 模式那样随着连接数的增大而变差,测试发现性能还是很稳定的(下篇会有详细介绍)。

epoll 工作有两种模式 LT(level triggered) 和 ET(edge-triggered),前者是缺省模式,同时支持阻塞和非阻塞 IO 模式,虽然性能比后者差点,但是比较稳定,一般来说在实际运用中,我们都是用这种模式(ET 模式和 WinSock 都是纯异步非阻塞模型)。而另外一点要说的是 libevent 是在编译阶段选择系统的 I/O demultiplex 机制的,不支持在运行阶段根据配置再次选择,所以我们在这里也就不细讨论 libevent 的实现的细节了,如果朋友有兴趣进一步了解的话,请参考:http://monkey.org/~provos/libevent/。

此文章由 http://www.ositren.com 收集整理 ,地址为: http://www.ositren.com/htmls/67218.html