MQ|基于Swoft2.x框架实现php操作rabbitMQ

RabbitMQ由于默认是基于AMQP协议 , php需要安装一下AMQP才能对RabbitMQ进行操作
由于swoft是基于swoole实现的框架 , swoole无法在windows使用 , 所以需要安装一套linux的环境 , 不会安装的找我的博客(Linux配置RabbitMQ)或自行百度
PHP安装AMQP客户端

composer require php-amqplib/php-amqplib


处理数据必读 【MQ|基于Swoft2.x框架实现php操作rabbitMQ】使用了swoft自定义进程 , 并在bean.php中加载 , 不懂可自行百度或看我之前的文章(Swoft2.X 使用进程处理Redis队列_苗先生的PHP记录的博客-CSDN博客)
topic和fanout 都可不通过queue进行发送 , 也就是声明一个交换机,直接发送数据,详见我的topic的demo
这样子处理数据的时候只需要创建一个空名的queue,通过routingkey读取数据
  • direct:路由模式 , 如果 routing key 匹配,那么 Message 就会被传递到相应的 queue , 如果没有处理的数据是会阻塞在这里等待消费
  • fanout:订阅模式 ,会向响应的 queue 广播 , 订阅模式是没有路由键routing_key的
  • topic:主题模式 , 通过路由键# 获取所有, *获取部分, 对 key 进行模式匹配,比如 ab* 可以传递到所有 ab* 的 queue
  • dlx: 死信队列 (延时队列) ,消息变成死信有以下几种情况
消息被拒绝(basic.reject / basic.nack),并且requeue = false
消息TTL过期
队列达到最大长度
direct路由模式(简单队列) 发送数据 控制器:
channel(); $channel->queue_declare('swoft_queue_test', false,false,false,false); # 消息 $msg = new AMQPMessage('swoft_queue_test'.time(),[ 'content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]); $channel->confirm_select(); // 发布确认模式//推送成功 $channel->set_ack_handler( function (AMQPMessage $message) { echo "发送成功: " . $message->body . PHP_EOL; } ); //推送失败 $channel->set_nack_handler( function (AMQPMessage $message) { echo "发送失败: " . $message->body . PHP_EOL; } ); # 发送 $channel->basic_publish($msg,'','swoft_queue_test'); $channel->wait_for_pending_acks(); $channel->close(); $connect->close(); }}

处理数据
channel(); $channel->queue_declare('swoft_queue_test', false,false,false,false,false); # 回调 $callback = function ($msg){echo $msg->body.PHP_EOL; }; $channel->basic_consume('swoft_queue_test','',false,true,false,false,$callback); while(count($channel->callbacks)){ $channel->wait(); } $channel->close(); $connect->close(); } }

订阅模式fanout 发送数据
/** * 订阅模式 * @RequestMapping(route="fanout") */ public function fanout() { $exchangeName = 'fanout-ex'; $queueName='fanout-test'; $connect = new AMQPStreamConnection('39.105.156.191',5672,'guest','guest'); $channel = $connect->channel(); $channel->exchange_declare($exchangeName,'fanout',false,true,false); $channel->queue_declare($queueName, false,false,false,false); $channel->queue_bind($queueName,$exchangeName); # 消息 $msg = new AMQPMessage('订阅消息: '.time(),[ 'content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]); $channel->confirm_select(); // 发布确认模式//推送成功 $channel->set_ack_handler( function (AMQPMessage $message) { echo "发送成功: " . $message->body . PHP_EOL; } ); //推送失败 $channel->set_nack_handler( function (AMQPMessage $message) { echo "发送失败: " . $message->body . PHP_EOL; } ); # 发送 $channel->basic_publish($msg,$exchangeName); $channel->wait_for_pending_acks(); $channel->close(); $connect->close(); }

处理数据
channel(); $channel->exchange_declare($exchangeName,'fanout',false,true,false); $channel->queue_declare($queueName, false,false,false,false); $channel->queue_bind($queueName,$exchangeName); # 回调 $callback = function ($msg){ echo '订阅'.$msg->body.PHP_EOL; }; $channel->basic_consume($queueName,'',false,true,false,false,$callback); while(count($channel->callbacks)){ $channel->wait(); } $channel->close(); $connect->close(); } }

主题模式topic 发送数据
我这里声明了三个路由键 , 分别是
A方法: a.route.queue
B方法: b.route.queue
C方法: c.routes.queue
发送主题模式数据是不需要 $channel->queue_declare声明队列的

/** * 主题模式 * @RequestMapping(route="topic-a",method={"GET"}) */ public function topica() { $exchangeName = 'topic-ex'; $routingKey = 'a.route.queue'; $connect = new AMQPStreamConnection('39.105.156.191',5672,'guest','guest'); $channel = $connect->channel(); $channel->exchange_declare($exchangeName,'topic',false,true,false); # 消息 $msg = new AMQPMessage('主题消息:a.route.queue '.time(),[ 'content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]); $channel->confirm_select(); // 发布确认模式//推送成功 $channel->set_ack_handler( function (AMQPMessage $message) { echo "发送成功: " . $message->body . PHP_EOL; } ); //推送失败 $channel->set_nack_handler( function (AMQPMessage $message) { echo "发送失败: " . $message->body . PHP_EOL; } ); # 发送 $channel->basic_publish($msg,$exchangeName,$routingKey); $channel->wait_for_pending_acks(); $channel->close(); $connect->close(); }/** * 主题模式 * @RequestMapping(route="topic-b",method={"GET"}) */ public function topicb() { $exchangeName = 'topic-ex'; $routingKey = 'b.route.queue'; $connect = new AMQPStreamConnection('39.105.156.191',5672,'guest','guest'); $channel = $connect->channel(); $channel->exchange_declare($exchangeName,'topic',false,true,false); # 消息 $msg = new AMQPMessage('主题消息:b.route.queue '.time(),[ 'content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]); $channel->confirm_select(); // 发布确认模式//推送成功 $channel->set_ack_handler( function (AMQPMessage $message) { echo "发送成功: " . $message->body . PHP_EOL; } ); //推送失败 $channel->set_nack_handler( function (AMQPMessage $message) { echo "发送失败: " . $message->body . PHP_EOL; } ); # 发送 $channel->basic_publish($msg,$exchangeName,$routingKey); $channel->wait_for_pending_acks(); $channel->close(); $connect->close(); }/** * 主题模式 * @RequestMapping(route="topic-c",method={"GET"}) */ public function topicc() { $exchangeName = 'topic-ex'; $routingKey = 'c.routes.queue'; $connect = new AMQPStreamConnection('39.105.156.191',5672,'guest','guest'); $channel = $connect->channel(); $channel->exchange_declare($exchangeName,'topic',false,true,false); # 消息 $msg = new AMQPMessage('主题消息:c.routes.queue '.time(),[ 'content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]); $channel->confirm_select(); // 发布确认模式//推送成功 $channel->set_ack_handler( function (AMQPMessage $message) { echo "发送成功: " . $message->body . PHP_EOL; } ); //推送失败 $channel->set_nack_handler( function (AMQPMessage $message) { echo "发送失败: " . $message->body . PHP_EOL; } ); # 发送 $channel->basic_publish($msg,$exchangeName,$routingKey); $channel->wait_for_pending_acks(); $channel->close(); $connect->close(); }

处理数据
这里可以使用通配符 * #来获取不同路由键的数据
比如我这里*.routes.* , 只能获取到c.routes.queue
而# 能获取到所有的数据
这里通过声明一个空名队列绑定不同的路由键 , 达到获取不同路由键的数据
通配符*
channel(); $channel->exchange_declare($exchangeName,'topic',false,true,false); list($queueName,,) =$channel->queue_declare('', false,true,true,false); $channel->queue_bind($queueName,$exchangeName,$routingKey); # 回调 $callback = function ($msg){ echo '接收消息topic * :'.$msg->body.PHP_EOL; }; $channel->basic_consume($queueName,'',false,true,false,false,$callback); while(count($channel->callbacks)){ $channel->wait(); } $channel->close(); $connect->close(); } }


通配符#
channel(); $channel->exchange_declare($exchangeName,'topic',false,true,false); list($queueName,,) =$channel->queue_declare('', false,true,true,false); $channel->queue_bind($queueName,$exchangeName,$routingKey); # 回调 $callback = function ($msg){ echo '接收消息topic # :'.$msg->body.PHP_EOL; }; $channel->basic_consume($queueName,'',false,true,false,false,$callback); while(count($channel->callbacks)){ $channel->wait(); } $channel->close(); $connect->close(); } }

死信队列 发送数据 处理数据

    推荐阅读