• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

PHP memcache实现消息队列实例_php

php 搞代码 3年前 (2018-06-21) 103次浏览 已收录 0个评论

现在memcache在服务器缓存应用比较广泛,下面我来介绍memcache实现消息队列等待的一个例子,有需要了解的朋友可参考。

memche消息队列的原理就是在key上做文章,用以做一个连续的数字加上前缀记录序列化以后消息或者日志。然后通过定时程序将内容落地到文件或者数据库

php实现消息队列的用处比如在做发送邮件时发送大量邮件很费时间的问题,那么可以采取队列。
方便实现队列的轻量级队列服务器是:
starling支持memcache协议的轻量级持久化服务器
https://github.com/starling/starling
Beanstalkd轻量、高效,支持持久化,每秒可处理3000左右的队列

http://www.gaodaima.com/51200.htmlPHP memcache实现消息队列实例_php

http://kr.github.com/beanstalkd/
php中也可以使用memcache/memcached来实现消息队列。

connect('127.0.0.1', 11211); } return $mc; } /** * mc 计数器,增加计数并返回新的计数 * @param string $key   计数器 * @param int $offset   计数增量,可为负数.0为不改变计数 * @param int $time     时间 * @return int/false    失败是返回false,成功时返回更新计数器后的计数 */ static public function set_counter( $key, $offset, $time=0 ){ $mc = self::mc_init(); $val = $mc->get($key); if( !is_numeric($val)  $val < 0 ){ $ret = $mc->set( $key, 0, $time ); if( !$ret ) return false; $val = 0; } $offset = intval( $offset ); if( $offset > 0 ){ return $mc->increment( $key, $offset ); }elseif( $offset < 0 ){ return $mc->decrement( $key, -$offset ); } return $val; } /** * 写入队列 * @param string $key * @param mixed $value * @return bool */ static public function input( $key, $value ){ $mc = self::mc_init(); $w_key = self::PREFIX.$key.'W'; $v_key = self::PREFIX.$key.self::set_counter($w_key, 1); return $mc->set( $v_key, $value ); } /** * 读取队列里的数据 * @param string $key * @param int $max  最多读取条数 * @return array */ static public function output( $key, $max=100 ){ $out = array(); $mc = self::mc_init(); $r_key = self::PREFIX.$key.'R'; $w_key = self::PREFIX.$key.'W'; $r_p   = self::set_counter( $r_key, 0 );//读指针 $w_p   = self::set_counter( $w_key, 0 );//写指针 if( $r_p == 0 ) $r_p = 1; while( $w_p >= $r_p ){ if( --$max < 0 ) break; $v_key = self::PREFIX.$key.$r_p; $r_p = self::set_counter( $r_key, 1 ); $out[] = $mc->get( $v_key ); $mc->delete($v_key); } return $out; } } /** 使用方法: QMC::input($key, $value );//写入队列 $list = QMC::output($key);//读取队列 */ ?>

基于PHP共享内存实现的消息队列:

[email protected] * @created 2009-12-23 */ class ShmQueue { private $maxQSize = 0; // 队列最大长度 private $front = 0; // 队头指针 private $rear = 0;  // 队尾指针 private $blockSize = 256;  // 块的大小(byte) private $memSize = 25600;  // 最大共享内存(byte) private $shmId = 0; private $filePtr = './shmq.ptr'; private $semId = 0; public function __construct() { $shmkey = ftok(__FILE__, 't'); $this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize ); $this->maxQSize = $this->memSize / $this->blockSize; // 申?一个信号量 $this->semId = sem_get($shmkey, 1); sem_acquire($this->semId); // 申请进入临界区 $this->init(); } private function init() { if ( file_exists($this->filePtr) ){ $contents = file_get_contents($this->filePtr); $data = explode( '', $contents ); if ( isset($data[0]) && isset($data[1])){ $this->front = (int)$data[0]; $this->rear  = (int)$data[1]; } } } public function getLength() { return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize; } public function enQueue( $value ) { if ( $this->ptrInc($this->rear) == $this->front ){ // 队满 return false; } $data = $this->encode($value); shmop_write($this->shmId, $data, $this->rear ); $this->rear = $this->ptrInc($this->rear); return true; } public function deQueue() { if ( $this->front == $this->rear ){ // 队空 return false; } $value = shmop_read($this->shmId, $this->front, $this->blockSize-1); $this->front = $this->ptrInc($this->front); return $this->decode($value); } private function ptrInc( $ptr ) { return ($ptr + $this->blockSize) % ($this->memSize); } private function encode( $value ) { $data = serialize($value) . "__eof"; echo ''; echo strlen($data); echo ''; echo $this->blockSize -1; echo ''; if ( strlen($data) > $this->blockSize -1 ){ throw new Exception(strlen($data)." is overload block size!"); } return $data; } private function decode( $value ) { $data = explode("__eof", $value); return unserialize($data[0]); } public function __destruct() { $data = $this->front . '' . $this->rear; file_put_contents($this->filePtr, $data); sem_release($this->semId); // 出临界区, 释放信号量 } } /* // 进队操作 $shmq = new ShmQueue(); $data = 'test data'; $shmq->enQueue($data); unset($shmq); // 出队操作 $shmq = new ShmQueue(); $data = $shmq->deQueue(); unset($shmq); */ ?>

对于一个很大的消息队列,频繁进行进行大数据库的序列化 和 反序列化,有太耗费。下面是我用PHP 实现的一个消息队列,只需要在尾部插入一个数据,就操作尾部,不用操作整个消息队列进行读取,与操作。但是,这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性。如果消息不是非常的密集,比如几秒钟才一个,还是可以考虑这样使用的。
如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作。下面是代码:
代码如下:

class Memcache_Queue  {  private $memcache;  private $name;  private $prefix;  function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__")  {  if ($memcache == null) {  throw new Exception("memcache object is null, new the object first.");  }  $this->memcache = $memcache;  $this->name = $name;  $this->prefix = $prefix;  $this->maxSize = $maxSize;  $this->front = 0;  $this->real = 0;  $this->size = 0;  }  function __get($name)  {  return $this->get($name);  }  function __set($name, $value)  {  $this->add($name, $value);  return $this;  }  function isEmpty()  {  return $this->size == 0;  }  function isFull()  {  return $this->size == $this->maxSize;  }  function enQueue($data)  {  if ($this->isFull()) {  throw new Exception("Queue is Full");  }  $this->increment("size");  $this->set($this->real, $data);  $this->set("real", ($this->real + 1) % $this->maxSize);  return $this;  }  function deQueue()  {  if ($this->isEmpty()) {  throw new Exception("Queue is Empty");  }  $this->decrement("size");  $this->delete($this->front);  $this->set("front", ($this->front + 1) % $this->maxSize);  return $this;  }  function getTop()  {  return $this->get($this->front);  }  function getAll()  {  return $this->getPage();  }  function getPage($offset = 0, $limit = 0)  {  if ($this->isEmpty()  $this->size < $offset) {  return null;  }  $keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize);  $num = 1;  for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)  {  $keys[] = $this->getKeyByPos($pos);  $num++;  if ($limit > 0 && $limit == $num) {  break;  }  }  return array_values($this->memcache->get($keys));  }  function makeEmpty()  {  $keys = $this->getAllKeys();  foreach ($keys as $value) {  $this->delete($value);  }  $this->delete("real");  $this->delete("front");  $this->delete("size");  $this->delete("maxSize");  }  private function getAllKeys()  {  if ($this->isEmpty())  {  return array();  }  $keys[] = $this->getKeyByPos($this->front);  for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)  {  $keys[] = $this->getKeyByPos($pos);  }  return $keys;  }  private function add($pos, $data)  {  $this->memcache->add($this->getKeyByPos($pos), $data);  return $this;  }  private function increment($pos)  {  return $this->memcache->increment($this->getKeyByPos($pos));  }  private function decrement($pos)  {  $this->memcache->decrement($this->getKeyByPos($pos));  }  private function set($pos, $data)  {  $this->memcache->set($this->getKeyByPos($pos), $data);  return $this;  }  private function get($pos)  {  return $this->memcache->get($this->getKeyByPos($pos));  }  private function delete($pos)  {  return $this->memcache->delete($this->getKeyByPos($pos));  }  private function getKeyByPos($pos)  {  return $this->prefix . $this->name . $pos;  }  } 

欢迎大家阅读《PHP memcache实现消息队列实例_php》,跪求各位点评,若觉得好的话请收藏本文,by 搞代码


喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址