本文是学习使用php-rabbitmq的一些笔记,结合,使用TP5的command运行服务端测试,由于还在学习使用中,可能会有疏漏错误之处,完善中。。。。
下面从最简单的开始
Sending(发送消息 msg->queue)
use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;public function send(){ //连接rabbitmq服务 主机 端口号 用户名 密码 $connection = new AMQPStreamConnection('localhost',5672,'guest','guest'); $channel = $connection->channel(); //创建一个队列 $channel->queue_declare('new_tasks',false,false,false,false); //消息内容 $data = time(); $msg = new AMQPMessage($data);
//消息进入队列
$info = $channel->basic_publish($msg,'','new_tasks'); //关闭 $channel->close(); $connection->close(); }
Receiving(接收消费 queue->msg)
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest'); $channel = $connection->channel(); //声明队列 $channel->queue_declare('new_tasks',false,false,false,false); //回调 其中逻辑自定义 $callback = function($msg){ echo '[x] Received',$msg->body,"\n"; sleep(3); //统计消息中有多少个点,就睡几秒 echo " [x] Done\n"; Db::name('user')->update(['id'=>10236,'delete_time'=>$msg->body]); }; //当有多个消费者进行循环调度时,下面一行确保公平分发队列中的信息,每个消费者每次取一条 $channel->basic_qos(null, 1, null); //从队列获取信息,并执行回调 $channel->basic_consume('new_tasks','',false,true,false,false,$callback); while(count($channel->callbacks)){ $channel->wait(); }
之后可运行生产脚本和消费脚本测试,若要进行循环调度,可运行多个消费脚本,这里用的TH5的命令行,php think Sending,php think Receiving运行。
上面代码实际上可用性不高,当消费者进行一个复杂任务时,一旦RabbitMQ向客户发送消息,它立即将其标记为删除,而消费者在执行过程中出现问题,任务并未完成,但此时消息已经丢失。我们并不想丢失这些消息,希望当一个消费者挂掉之后,任务自动由另一个消费者接管,这里需要添加---消息确认。
重写Sending(消息确认)
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest'); $channel = $connection->channel(); //第三个参数为true时,表示队列持久化,需要和服务端一致 $channel->queue_declare('new_tasks',false,true,false,false); $data = time(); //设置delivery_mode表示设置消息持久化 $msg = new AMQPMessage($data,array('delivery_mode' => AMQPMessage :: DELIVERY_MODE_PERSISTENT)); $info = $channel->basic_publish($msg,'','new_task'); $channel->close(); $connection->close();
重写Receiving(消息确认)
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest'); $channel = $connection->channel(); //将第三个参数改为true,第三个参数为true时,表示队列持久化,需要和客户端一致 $channel->queue_declare('new_tasks',false,true,false,false); $callback = function($msg){ echo '[x] Received',$msg->body,"\n"; sleep(3); //统计消息中有多少个点,就睡几秒 echo " [x] Done\n"; Db::name('user')->update(['id'=>10236,'delete_time'=>$msg->body]); //确认消息 消费者发回ack $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); //第四个参数默认false 表示将进行消息确认,为true时不进行消息确认 $channel->basic_consume('new_tasks','',false,false,false,false,$callback); while(count($channel->callbacks)){ $channel->wait(); }
到这里,执行重写之后的脚本,到运行两个消费脚本,如果往队列中加入10条信息,理论上两个消费者将各自执行其中5条(基于$channel->basic_qos(null, 1, null);)。若其中一个消费者的确认消息代码被注释掉,无法返回ack,即下面一条被注释
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
那么这个消费者只能获得一条信息,另一个消费者获得9条,且这9条在执行完任务后,从队列中删除,而未返回确认消息的那一条仍留在队列中,这时Ctrl+C停止运行这个不返回确认消息的消费者,这条消息将重新分给正常的消费者。
持久性
上面重写后的代码还有其他地方相较于重写之前的代码有区别,
重写之后,第三个参数改为true,保证队列持久
$channel->queue_declare('new_tasks',false,true,false,false);
设置delivery_mode属性,使消息持久
$msg = new AMQPMessage($data,array('delivery_mode' => AMQPMessage :: DELIVERY_MODE_PERSISTENT));
这样可以保证队列持久性,及时重启,队列仍然存在,而保证消息持久是将其写入磁盘,但当rabbitmq接收了消息,并未完成保存时,此时写入了缓存,未写入磁盘,根据官方说明,该种方式可用,但只能用于简单队列,否则,需要发布确认。