博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
php-rabbitmq笔记(一)
阅读量:6370 次
发布时间:2019-06-23

本文共 3782 字,大约阅读时间需要 12 分钟。

本文是学习使用php-rabbitmq的一些笔记,结合,使用TP5的command运行服务端测试,由于还在学习使用中,可能会有疏漏错误之处,完善中。。。。

下面从最简单的开始

Sending(发送消息  msg->queue)

 

use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;public function send(){
     //连接rabbitmq服务 主机 端口号 用户名 密码 $connection = new AMQPStreamConnection('localhost',5672,'guest','guest'); $channel = $connection->channel(); //创建一个队列 $channel->queue_declare('new_tasks',false,false,false,false);     //消息内容 $data = time(); $msg = new AMQPMessage($data);
    //消息进入队列
    $info = $channel->basic_publish($msg,'','new_tasks');     //关闭     $channel->close();     $connection->close();  }

 Receiving(接收消费  queue->msg)

     $connection = new AMQPStreamConnection('localhost',5672,'guest','guest');        $channel = $connection->channel();        //声明队列        $channel->queue_declare('new_tasks',false,false,false,false);      //回调  其中逻辑自定义        $callback = function($msg){            echo  '[x] Received',$msg->body,"\n";            sleep(3);  //统计消息中有多少个点,就睡几秒            echo " [x] Done\n";            Db::name('user')->update(['id'=>10236,'delete_time'=>$msg->body]);        };      //当有多个消费者进行循环调度时,下面一行确保公平分发队列中的信息,每个消费者每次取一条        $channel->basic_qos(null, 1, null);        //从队列获取信息,并执行回调        $channel->basic_consume('new_tasks','',false,true,false,false,$callback);        while(count($channel->callbacks)){            $channel->wait();        }

之后可运行生产脚本和消费脚本测试,若要进行循环调度,可运行多个消费脚本,这里用的TH5的命令行,php think Sending,php think Receiving运行。

上面代码实际上可用性不高,当消费者进行一个复杂任务时,一旦RabbitMQ向客户发送消息,它立即将其标记为删除,而消费者在执行过程中出现问题,任务并未完成,但此时消息已经丢失。我们并不想丢失这些消息,希望当一个消费者挂掉之后,任务自动由另一个消费者接管,这里需要添加---消息确认。

重写Sending(消息确认)

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');        $channel = $connection->channel();        //第三个参数为true时,表示队列持久化,需要和服务端一致        $channel->queue_declare('new_tasks',false,true,false,false);        $data = time();        //设置delivery_mode表示设置消息持久化        $msg = new AMQPMessage($data,array('delivery_mode' => AMQPMessage :: DELIVERY_MODE_PERSISTENT));        $info = $channel->basic_publish($msg,'','new_task');        $channel->close();        $connection->close();

 

重写Receiving(消息确认)

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');        $channel = $connection->channel();        //将第三个参数改为true,第三个参数为true时,表示队列持久化,需要和客户端一致        $channel->queue_declare('new_tasks',false,true,false,false);        $callback = function($msg){            echo  '[x] Received',$msg->body,"\n";            sleep(3);  //统计消息中有多少个点,就睡几秒            echo " [x] Done\n";            Db::name('user')->update(['id'=>10236,'delete_time'=>$msg->body]);            //确认消息  消费者发回ack            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);        };        $channel->basic_qos(null, 1, null);        //第四个参数默认false 表示将进行消息确认,为true时不进行消息确认        $channel->basic_consume('new_tasks','',false,false,false,false,$callback);        while(count($channel->callbacks)){            $channel->wait();        }

到这里,执行重写之后的脚本,到运行两个消费脚本,如果往队列中加入10条信息,理论上两个消费者将各自执行其中5条(基于$channel->basic_qos(null, 1, null);)。若其中一个消费者的确认消息代码被注释掉,无法返回ack,即下面一条被注释

$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

那么这个消费者只能获得一条信息,另一个消费者获得9条,且这9条在执行完任务后,从队列中删除,而未返回确认消息的那一条仍留在队列中,这时Ctrl+C停止运行这个不返回确认消息的消费者,这条消息将重新分给正常的消费者。

持久性

上面重写后的代码还有其他地方相较于重写之前的代码有区别,

重写之后,第三个参数改为true,保证队列持久

$channel->queue_declare('new_tasks',false,true,false,false);

设置delivery_mode属性,使消息持久

$msg = new AMQPMessage($data,array('delivery_mode' => AMQPMessage :: DELIVERY_MODE_PERSISTENT));

这样可以保证队列持久性,及时重启,队列仍然存在,而保证消息持久是将其写入磁盘,但当rabbitmq接收了消息,并未完成保存时,此时写入了缓存,未写入磁盘,根据官方说明,该种方式可用,但只能用于简单队列,否则,需要发布确认。

转载于:https://www.cnblogs.com/zczhen/p/10288733.html

你可能感兴趣的文章
大数据、云计算与广州安防
查看>>
阿里云内存数据库Memcache升级 提供256G缓存
查看>>
从物理环境迁移到云环境的几个关键步骤
查看>>
成为准DRAM的小幻想? Supermicro推出增内存的双槽服务器
查看>>
联想仅仅依靠不断的重组是不够的
查看>>
Facebook创始人马克·扎克伯格总结的创业三大黄金法则
查看>>
博科第六代FC获戴尔EMC、富士通、日立数据和NetApp拥戴
查看>>
秋色园QBlog技术原理解析:Module之页面基类设计(五)
查看>>
虚拟化天花板将近,后虚拟化时代如何应对?
查看>>
恶意网站可利用这个新漏洞拖垮Windows 7和8电脑
查看>>
MEMS传感器要闯过的三关
查看>>
天合光能提交美股退市请求 正式私有化
查看>>
区块链技术将如何适用于企业业务
查看>>
为了OCP英特尔拼了,一大波新科技正在路上
查看>>
前白宫反恐首席顾问:NSA可以破解圣贝纳迪恐怖份子所有的iPhone
查看>>
Java最小堆解决TopK问题
查看>>
100万的大数据人才缺口,谁来解决?
查看>>
商标转让和域名转让的区别是什么?
查看>>
《数值分析(原书第2版)》—— 1.1 二分法
查看>>
Instor公司发布一款免费的数据中心成本估算工具
查看>>