php消息队列memcache

字体大小: 中小 标准 ->行高大小: 标准
<?php  
/* 
 * memcache队列类 
 * 支持多进程并发写入、读取 
 * 边写边读,AB面轮值替换 
 * @author lkk/lianq.net 
 * @create on 9:25 2012-9-28 
 * 
 * @example: 
    $obj = new memcacheQueue('duilie'); 
    $obj->add('1asdf'); 
    $obj->getQueueLength(); 
    $obj->read(11); 
    $obj->get(8); 
 */  
  
class memcacheQueue{  
    public static   $client;            //memcache客户端连接  
    public          $access;            //队列是否可更新     
    private         $currentSide;       //当前轮值的队列面:A/B  
    private         $lastSide;          //上一轮值的队列面:A/B  
    private         $sideAHead;         //A面队首值  
    private         $sideATail;         //A面队尾值  
    private         $sideBHead;         //B面队首值  
    private         $sideBTail;         //B面队尾值  
    private         $currentHead;       //当前队首值  
    private         $currentTail;       //当前队尾值  
    private         $lastHead;          //上轮队首值  
    private         $lastTail;          //上轮队尾值   
    private         $expire;            //过期时间,秒,1~2592000,即30天内;0为永不过期  
    private         $sleepTime;         //等待解锁时间,微秒  
    private         $queueName;         //队列名称,唯一值  
    private         $retryNum;          //重试次数,= 10 * 理论并发数  
      
    const   MAXNUM      = 2000;                 //(单面)最大队列数,建议上限10K  
    const   HEAD_KEY    = '_lkkQueueHead_';     //队列首key  
    const   TAIL_KEY    = '_lkkQueueTail_';     //队列尾key  
    const   VALU_KEY    = '_lkkQueueValu_';     //队列值key  
    const   LOCK_KEY    = '_lkkQueueLock_';     //队列锁key  
    const   SIDE_KEY    = '_lkkQueueSide_';     //轮值面key  
      
    /* 
     * 构造函数 
     * @param   [config]    array   memcache服务器参数 
     * @param   [queueName] string  队列名称 
     * @param   [expire]    string  过期时间 
     * @return  NULL 
     */  
    public function __construct($queueName ='',$expire='',$config =''){  
        if(empty($config)){  
            self::$client = memcache_pconnect('localhost',11211);  
        }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')  
            self::$client = memcache_pconnect($config['host'],$config['port']);  
        }elseif(is_string($config)){//"127.0.0.1:11211"  
            $tmp = explode(':',$config);  
            $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';  
            $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';  
            self::$client = memcache_pconnect($conf['host'],$conf['port']);       
        }  
        if(!self::$client) return false;  
          
        ignore_user_abort(TRUE);//当客户断开连接,允许继续执行  
        set_time_limit(0);//取消脚本执行延时上限  
          
        $this->access = false;  
        $this->sleepTime = 1000;  
        $expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;  
        $this->expire = $expire;  
        $this->queueName = $queueName;  
        $this->retryNum = 10000;  
          
        $side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);  
        $this->getHeadNTail($queueName);  
        if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;  
        if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;  
        if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;  
        if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;  
    }  
      
    /* 
     * 获取队列首尾值 
     * @param   [queueName] string  队列名称 
     * @return  NULL 
     */  
    private function getHeadNTail($queueName){  
        $this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);  
        $this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);  
        $this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);  
        $this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);  
    }  
      
    /* 
     * 获取当前轮值的队列面 
     * @return  string  队列面名称 
     */  
    public function getCurrentSide(){  
        $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);  
        if($currentSide == 'A'){  
            $this->currentSide = 'A';  
            $this->lastSide = 'B';     
  
            $this->currentHead   = $this->sideAHead;  
            $this->currentTail   = $this->sideATail;  
            $this->lastHead      = $this->sideBHead;  
            $this->lastTail      = $this->sideBTail;            
        }else{  
            $this->currentSide = 'B';  
            $this->lastSide = 'A';  
  
            $this->currentHead   = $this->sideBHead;  
            $this->currentTail   = $this->sideBTail;  
            $this->lastHead      = $this->sideAHead;  
            $this->lastTail      = $this->sideATail;                        
        }  
          
        return $this->currentSide;  
    }  
      
    /* 
     * 队列加锁 
     * @return boolean 
     */  
    private function getLock(){  
        if($this->access === false){  
            while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){  
                usleep($this->sleepTime);  
                @$i++;  
                if($i > $this->retryNum){//尝试等待N次  
                    return false;  
                    break;  
                }  
            }  
            return $this->access = true;  
        }  
        return false;  
    }  
      
    /* 
     * 队列解锁 
     * @return NULL 
     */  
    private function unLock(){  
        memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);  
        $this->access = false;  
    }  
      
    /* 
     * 添加数据 
     * @param   [data]  要存储的值 
     * @return  boolean 
     */  
    public function add($data){  
        $result = false;  
        if(!$this->getLock()){  
            return $result;  
        }   
        $this->getHeadNTail($this->queueName);  
        $this->getCurrentSide();  
          
        if($this->isFull()){  
            $this->unLock();  
            return false;  
        }  
          
        if($this->currentTail < self::MAXNUM){  
            $value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;  
            if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){  
                $this->changeTail();  
                $result = true;  
            }  
        }else{//当前队列已满,更换轮值面  
            $this->unLock();  
            $this->changeCurrentSide();  
            return $this->add($data);  
        }  
  
        $this->unLock();  
        return $result;  
    }  
      
    /* 
     * 取出数据 
     * @param   [length]    int 数据的长度 
     * @return  array 
     */  
    public function get($length=0){  
        if(!is_numeric($length)) return false;  
        if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有  
        if(!$this->getLock()) return false;  
  
        if($this->isEmpty()){  
            $this->unLock();  
            return false;  
        }  
          
        $keyArray   = $this->getKeyArray($length);  
        $lastKey    = $keyArray['lastKey'];  
        $currentKey = $keyArray['currentKey'];  
        $keys       = $keyArray['keys'];  
        $this->changeHead($this->lastSide,$lastKey);  
        $this->changeHead($this->currentSide,$currentKey);  
          
        $data   = @memcache_get(self::$client, $keys);  
        foreach($keys as $v){//取出之后删除  
            @memcache_delete(self::$client, $v, 0);  
        }  
        $this->unLock();  
  
        return $data;  
    }  
      
    /* 
     * 读取数据 
     * @param   [length]    int 数据的长度 
     * @return  array 
     */  
    public function read($length=0){  
        if(!is_numeric($length)) return false;  
        if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有  
        $keyArray   = $this->getKeyArray($length);  
        $data   = @memcache_get(self::$client, $keyArray['keys']);  
        return $data;  
    }  
      
    /* 
     * 获取队列某段长度的key数组 
     * @param   [length]    int 队列长度 
     * @return  array 
     */  
    private function getKeyArray($length){  
        $result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array());  
        $this->getHeadNTail($this->queueName);  
        $this->getCurrentSide();  
        if(empty($length)) return $result;  
          
        //先取上一面的key  
        $i = $result['lastKey'] = 0;  
        for($i=0;$i<$length;$i++){  
            $result['lastKey'] = $this->lastHead + $i;  
            if($result['lastKey'] >= $this->lastTail) break;  
            $result['keys'][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result['lastKey'];  
        }  
          
        //再取当前面的key  
        $j = $length - $i;  
        $k = $result['currentKey'] = 0;  
        for($k=0;$k<$j;$k++){  
            $result['currentKey'] = $this->currentHead + $k;  
            if($result['currentKey'] >= $this->currentTail) break;  
            $result['keys'][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result['currentKey'];  
        }  
  
        return $result;  
    }  
      
    /* 
     * 更新当前轮值面队列尾的值 
     * @return  NULL 
     */  
    private function changeTail(){  
        $tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;  
        memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;  
        //memcache_increment(self::$client, $tail_key, 1);//队列尾+1  
        $v = memcache_get(self::$client, $tail_key) +1;  
        memcache_set(self::$client, $tail_key,$v,false,$this->expire);  
    }  
      
    /* 
     * 更新队列首的值 
     * @param   [side]      string  要更新的面 
     * @param   [headValue] int     队列首的值 
     * @return  NULL 
     */  
    private function changeHead($side,$headValue){  
        if($headValue < 1) return false;  
        $head_key = $this->queueName .$side . self::HEAD_KEY;  
        $tail_key = $this->queueName .$side . self::TAIL_KEY;  
        $sideTail = memcache_get(self::$client, $tail_key);  
        if($headValue < $sideTail){  
            memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);  
        }elseif($headValue >= $sideTail){  
            $this->resetSide($side);  
        }  
    }  
      
    /* 
     * 重置队列面,即将该队列面的队首、队尾值置为0 
     * @param   [side]  string  要重置的面 
     * @return  NULL 
     */  
    private function resetSide($side){  
        $head_key = $this->queueName .$side . self::HEAD_KEY;  
        $tail_key = $this->queueName .$side . self::TAIL_KEY;  
        memcache_set(self::$client, $head_key,0,false,$this->expire);  
        memcache_set(self::$client, $tail_key,0,false,$this->expire);  
    }  
      
      
    /* 
     * 改变当前轮值队列面 
     * @return  string 
     */  
    private function changeCurrentSide(){  
        $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);  
        if($currentSide == 'A'){  
            memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire);  
            $this->currentSide = 'B';  
        }else{  
            memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);  
            $this->currentSide = 'A';  
        }  
        return $this->currentSide;  
    }  
      
    /* 
     * 检查当前队列是否已满 
     * @return  boolean 
     */  
    public function isFull(){  
        $result = false;  
        if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){  
            $result = true;  
        }  
        return $result;  
    }  
      
    /* 
     * 检查当前队列是否为空 
     * @return  boolean 
     */  
    public function isEmpty(){  
        $result = true;  
        if($this->sideATail > 0 || $this->sideBTail > 0){  
            $result = false;  
        }  
        return $result;  
    }  
      
    /* 
     * 获取当前队列的长度 
     * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度 
     * @return  int 
     */  
    public function getQueueLength(){  
        $this->getHeadNTail($this->queueName);  
        $this->getCurrentSide();  
  
        $sideALength = $this->sideATail - $this->sideAHead;  
        $sideBLength = $this->sideBTail - $this->sideBHead;  
        $result = $sideALength + $sideBLength;  
          
        return $result;  
    }  
      
  
    /* 
     * 清空当前队列数据,仅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三个key 
     * @return  boolean 
     */  
    public function clear(){  
        if(!$this->getLock()) return false;  
        for($i=0;$i<self::MAXNUM;$i++){  
            @memcache_delete(self::$client, $this->queueName.'A'. self::VALU_KEY .$i, 0);  
            @memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0);  
        }  
        $this->unLock();  
        $this->resetSide('A');  
        $this->resetSide('B');  
        return true;  
    }  
      
    /* 
     * 清除所有memcache缓存数据 
     * @return  NULL 
     */  
    public function memFlush(){  
        memcache_flush(self::$client);  
    }  
}  
 利用PHP操作Linux消息队列完成进程间通信

当我们开发的系统需要使用多进程方式运行时,进程间通信便成了至关重要的环节。消息队列(message queue)是Linux系统进程间通信的一种方式。 关于Linux系统进程通信的概念及实现可查看:http://www.ibm.com/developerworks/cn/linux/l-ipc/ 关于Linux系统消息队列的概念及实现可查看:http://www.ibm.com/developerworks/cn/linux/l-ipc/part4/ PHP的sysvmsg模块是对Linux系统支持的System V IPC中的System V消息队列函数族的封装。我们需要利用sysvmsg模块提供的函数来进进程间通信。先来看一段示例代码_1:

<?php  
$message_queue_key = ftok(__FILE__, 'a');  
  
$message_queue = msg_get_queue($message_queue_key, 0666);  
var_dump($message_queue);  
  
$message_queue_status = msg_stat_queue($message_queue);  
print_r($message_queue_status);  
  
//向消息队列中写  
msg_send($message_queue, 1, "Hello,World!");  
  
$message_queue_status = msg_stat_queue($message_queue);  
print_r($message_queue_status);  
  
//从消息队列中读  
msg_receive($message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT);  
print_r($message."\r\n");  
  
msg_remove_queue($message_queue);  
?>
示例代码_1只是展示了PHP操作消息队列函数的应用。下面的代码具体描述了进程间通信的场景

<?php  
$message_queue_key = ftok ( __FILE__, 'a' );  
$message_queue = msg_get_queue ( $message_queue_key, 0666 );  
  
$pids = array ();  
for($i = 0; $i < 5; $i ++) {  
    //创建子进程  
    $pids [$i] = pcntl_fork ();  
      
    if ($pids [$i]) {  
        echo "No.$i child process was created, the pid is $pids[$i]\r\n";  
    } elseif ($pids [$i] == 0) {  
        $pid = posix_getpid ();  
        echo "process.$pid is writing now\r\n";  
          
        msg_send ( $message_queue, 1, "this is process.$pid's data\r\n" );  
        posix_kill ( $pid, SIGTERM );  
    }  
}  
  
do {  
    msg_receive ( $message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT );  
    echo $message;  
  
    //需要判断队列是否为空,如果为空就退出  
//break;  
} while ( true )  
?>

此文章由 http://www.ositren.com 收集整理 ,地址为: http://www.ositren.com/htmls/69718.html