枕上诗书闲处好,门前风景雨来佳。这篇文章主要讲述#yyds干货盘点#使用php-amqplib实现RabbitMq相关的知识,希望能为你提供帮助。
>
本文环境 Ubuntu 20.04,nginx1.8,php7.3,RabbitMq3.9\\
>
不懂的可以评论或联系我邮箱:owen@owenzhang.com\\
>
著作权归OwenZhang所有。商业转载请联系OwenZhang获得授权,非商业转载请注明出处。
项目代码??https://gitee.com/owenzhang24/tp5??
队列笔记1:
列出队列(Listing queues)
【#yyds干货盘点#使用php-amqplib实现RabbitMq】如果你想查看Rabbitmq队列,并且想知道有多少消息存在其中,你(作为特权用户)可以使用rabbitmqctl 工具:
rabbitmqctl list_queues。
在Windows中,省略sudo:
rabbitmqctl.bat list_queues
2:
工作队列
我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。
一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ就会占用越来越多的内存。
为了排除这种错误,你可以使用rabbitmqctl命令,输出messages_unacknowledged字段:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在window系统运行,去掉sudo:
$ rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
3: rabbitmqctl能够列出服务器上所有的交换器:
$ sudo rabbitmqctl list_exchanges
这个列表中有一些叫做amq.*的交换器。这些都是默认创建的,不过这时候你还不需要使用他们。
4:列出所有现存的绑定
rabbitmqctl list_bindings
5: 如果你想把日志保存到文件中,只需要打开控制台输入: ( ??receive_logs.php?? 源代码)
$ php receive_logs.php > logs_from_rabbit.log
如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:
$ php receive_logs_direct.php info warning error
# => [*] Waiting for logs. To exit press CTRL+C
如果要触发一个error级别的日志,只需要输入:
$ php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Sent error:Run. Run. Or it will explode.
队列安装第一:安装RabbitMq环境??windows环境的rabbitmq安装与启动??
??https://my.oschina.net/owenzhang24/blog/5051652??
第二:
composer require php-amqplib/php-amqplib
第三:代码类
- rabbitMq实现的基础类:application/common/lib/classes/rabbitmq/RabbitMq.php
- 供外部调用的rabbitMq类:application/common/lib/classes/RabbitMqWork.php
- 测试发送消息到rabbitMq中的方法:application/index/controller/Index.php
- 添加php think命令实现接收rabbitMq中的消息:application/common/command/*.php
- 发送消息时直接在自己的方法中调用RabbitMqWork.php类中的几个送消息的方法即可。
- application/common/command/下的类都是实现添加php think命令的类,在configure方法中的setName()中设置命令名称,execute()方法是为了执行接收rabbitMq中的消息,同时在application/command.php中return添加设置的命令名称及对应的命令目录地址。
- 贡献文档
- ??RabbitMQ 中文文档-PHP版???。??https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/??
- ??RabbitMQ官方文档???。??https://www.rabbitmq.com/getstarted.html??
< ?php
//rabbitMq实现的基础类
namespace app\\common\\lib\\classes\\rabbitmq;
use PhpAmqpLib\\Connection\\AMQPStreamConnection;
use PhpAmqpLib\\Message\\AMQPMessage;
class RabbitMq
static private $instance;
static private $connection;
static private $channel;
const DIRECT = direct;
const TOPIC = topic;
const HEADERS = headers;
const FANOUT = fanout;
static private $exchangeNames = [
self::DIRECT => direct_exchange,
self::TOPIC => topic_exchange,
self::HEADERS => headers_exchange,
self::FANOUT => fanout_exchange,
];
const SEVERITYS = [
info,
warning,
error
];
static private $exchangeName = ;
/**
* RabbitMq constructor.
* @param $exchangeType
*/
private function __construct($exchangeType)
self::$connection = new AMQPStreamConnection(localhost, 5672, guest, guest);
self::$channel = self::$connection-> channel();
if (!empty($exchangeType))
self::$exchangeName = self::$exchangeNames[$exchangeType];
self::$channel-> exchange_declare(
self::$exchangeName, //交换机名称
$exchangeType, //路由类型
false, //dont check if a queue with the same name exists 是否检测同名队列
true, //the queue will not survive server restarts 是否开启队列持久化
false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列
);
/**
* 实例化
* @param string $exchangeType
* @return RabbitMq
*/
public static function instance($exchangeType = )
if (!self::$instance instanceof self)
self::$instance = new self($exchangeType);
return self::$instance;
/**
* 防止被外部复制
*/
private function __clone()
/**
* 简单的发送
*/
public function send()
self::$channel-> queue_declare(hello, false, false, false);
$msg = new AMQPMessage(Hello World!);
self::$channel-> basic_publish($msg, , hello);
echo "[X] Sent Hello World!\\n";
/**
* 简单的接收
* @param $queueName
* @param $callback
*/
public function receive($callback)
self::$channel-> queue_declare(hello, false, false, false, true);
echo "[*] Waiting for messages. To exit press CTRL+C\\n";
self::$channel-> basic_consume(hello, , false, true, false, false, $callback);
while (count(self::$channel-> callbacks))
self::$channel-> wait();
/**
* 添加工作队列
* @param string $data
*/
public function addTask($data = https://www.songbingjia.com/android/)
self::$channel-> queue_declare(task_queue, false, true, false, true);
if (empty($data)) $data = https://www.songbingjia.com/android/Hello World!;
$msg = new AMQPMessage(
$data,
array(delivery_mode => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
self::$channel-> basic_publish($msg, , task_queue);
echo "[x] Sent $data \\n";
/**
* 执行工作队列
* @param $callback
*/
public function workTask($callback)
self::$channel-> queue_declare(task_queue, false, true, false, true);
echo[*] Waiting for messages. To exit press CTRL+C, "\\n";
self::$channel-> basic_qos(null, 1, null);
self::$channel-> basic_consume(task_queue, , false, false, false, false, $callback);
while (count(self::$channel-> callbacks))
self::$channel-> wait();
/**
* 发布
* @param string $data
*/
public function sendQueue($data = https://www.songbingjia.com/android/)
if (empty($data)) $data = https://www.songbingjia.com/android/info:Hello World!;
$msg = new AMQPMessage($data);
self::$channel-> basic_publish($msg, self::$exchangeName);
echo "[x] Sent $data \\n";
/**
* 订阅
* @param $callback
*/
public function subscribeQueue($callback)
list($queue_name, ,) = self::$channel-> queue_declare(
"", //队列名称
false, //dont check if a queue with the same name exists 是否检测同名队列
true, //the queue will not survive server restarts 是否开启队列持久化
true, //the queue might be accessed by other channels 队列是否可以被其他队列访问
false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列
);
self::$channel-> queue_bind($queue_name, self::$exchangeName);
echo "[*] Waiting for logs. To exit press CTRL+C \\n";
self::$channel-> basic_consume($queue_name, , false, true, false, false, $callback);
while (count(self::$channel-> callbacks))
self::$channel-> wait();
/**
* 发送(直接交换机)
* @param $routingKey
* @param string $data
*/
public function sendDirect($routingKey, $data = https://www.songbingjia.com/android/)
if (empty($data)) $data = "https://www.songbingjia.com/android/Hello World!";
$msg = new AMQPMessage($data);
self::$channel-> basic_publish($msg, self::$exchangeName, $routingKey);
echo "[x] Sent $routingKey:$data \\n";
/**
* 接收(直接交换机)
* @param \\Closure $callback
* @param array $bindingKeys
*/
public function receiveDirect(\\Closure $callback, array $bindingKeys)
list($queue_namme, ,) = self::$channel-> queue_declare(, false, true, true, false);
foreach ($bindingKeys as $bindingKey)
self::$channel-> queue_bind($queue_namme, self::$exchangeName, $bindingKey);
echo "[x] Waiting for logs. To exit press CTRL+C \\n";
self::$channel-> basic_consume($queue_namme, , false, true, false, false, $callback);
while (count(self::$channel-> callbacks))
self::$channel-> wait();
/**
* 发送(主题交换机)
* @param $routingKey
* @param string $data
*/
public function sendTopic($routingKey, $data = https://www.songbingjia.com/android/)
if (empty($data)) $data = "https://www.songbingjia.com/android/Hello World!";
$msg = new AMQPMessage($data);
self::$channel-> basic_publish($msg, self::$exchangeName, $routingKey);
echo " [x] Sent ", $routingKey, :, $data, " \\n";
/**
* 接收(主题交换机)
* @param \\Closure $callback
* @param array $bindingKeys
*/
public function receiveTopic(\\Closure $callback, array $bindingKeys)
list($queueName, ,) = self::$channel-> queue_declare("", false, true, true, false);
foreach ($bindingKeys as $bindingKey)
self::$channel-> queue_bind($queueName, self::$exchangeName, $bindingKey);
echo[*] Waiting for logs. To exit press CTRL+C, "\\n";
self::$channel-> basic_consume($queueName, , false, true, false, false, $callback);
while (count(self::$channel-> callbacks))
self::$channel-> wait();
/**
* 销毁
*/
public function __destruct()
// TODO: Implement __destruct() method.
self::$channel-> close();
self::$connection-> close();
application/common/lib/classes/RabbitMqWork.php
< ?php
//供外部调用的rabbitMq类
namespace app\\common\\lib\\classes;
use app\\common\\lib\\classes\\rabbitmq\\RabbitMq;
class RabbitMqWork
private $RabbitMq;
public function __construct($exchageType = )
$this-> RabbitMq = RabbitMq::instance($exchageType);
/**
* 发送(普通)
*/
public function send()
$this-> RabbitMq-> send();
/**
* 接收(普通)
* @param $callback
*/
public function receive($callback)
$this-> RabbitMq-> receive($callback);
/**
* 发送(工作队列)
* @param $data
*/
public function addTask($data)
$this-> RabbitMq-> addTask($data);
/**
* 接收(工作队列)
* @param $callback
*/
public function workTask($callback)
$this-> RabbitMq-> workTask($callback);
/**
* 发布(扇形交换机)
* @param $data
*/
public function sendQueue($data)
$this-> RabbitMq-> sendQueue($data);
/**
* 订阅(扇形交换机)
* @param $callback
*/
public function subscribeQueue($callback)
$this-> RabbitMq-> subscribeQueue($callback);
/**
* 发送(直接交换机)
* @param $bindingKey
* @param $data
*/
public function sendDirect($routingKey, $data)
$this-> RabbitMq-> sendDirect($routingKey, $data);
/**
* 接收(直接交换机)
* @param \\Closure $callback
* @param array $bindingKeys
*/
public function receiveDirect(\\Closure $callback, array $bindingKeys)
$this-> RabbitMq-> receiveDirect($callback, $bindingKeys);
/**
* 发送(主题交换机)
* @param $routingKey
* @param $data
*/
public function sendTopic($routingKey, $data)
$this-> RabbitMq-> sendTopic($routingKey, $data);
/**
* 接收(主题交换机)
* @param \\Closure $callback
* @param array $bindingKeys
*/
public function receiveTopic(\\Closure $callback, array $bindingKeys)
$this-> RabbitMq-> receiveTopic($callback, $bindingKeys);
application/index/controller/Index.php
< ?php
namespace app\\index\\controller;
use app\\common\\lib\\classes\\rabbitmq\\RabbitMq;
use app\\common\\lib\\classes\\RabbitMqWork;
use app\\polymerize\\tool\\module\\es\\SearchBlog;
use app\\polymerize\\tool\\module\\es\\SyncBlog;
use think\\Collection;
class Index extends Collection
public function index()
//$this-> send();
//$this-> addTask();
//$this-> sendQueue();
//$this-> sendDirect();
$this-> sendTopic();
var_dump(11);
die();
public function searchBlog()
//$id=1;
//$res = SyncBlog::getInstance()-> syncBlog($id);
$search=11;
$res = SearchBlog::getInstance()-> searchBlog($search, 1, 100);
var_dump($res);
die();
var_dump(1111);
die();
/**
* 发送(普通)
*/
public function send()
$RabbitMqWork = new RabbitMqWork();
$RabbitMqWork-> send();
/**
* 发送(工作队列)
*/
public function addTask()
$data = https://www.songbingjia.com/android/input(data, This is work task!);
$RabbitMqWork = new RabbitMqWork();
$RabbitMqWork-> addTask($data);
/**
* 发送(扇形交换机)
*/
public function sendQueue()
$data = https://www.songbingjia.com/android/input(data, This is send queue1);
$RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT);
$RabbitMqWork-> sendQueue($data);
/**
* 发送(直接交换机)
*/
public function sendDirect()
$data = https://www.songbingjia.com/android/input(data, Hello World!);
$routingKey = input(routingKey, info);
$RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT);
$RabbitMqWork-> sendDirect($routingKey, $data);
/**
* 发送(主题交换机)
*/
public function sendTopic()
$data = https://www.songbingjia.com/android/input(data, Hello World!);
$routingKey = input(routingKey, lazy.boy);
$RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC);
$RabbitMqWork-> sendTopic($routingKey, $data);
application/command.php
< ?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin < 448901948@qq.com>
// +----------------------------------------------------------------------
return [
simpleMq => application\\command\\SimpleWork,
workQueue => application\\command\\WorkQueue,
sendQueue => application\\command\\SendQueue,
directQueue => application\\command\\DirectQueue,
topicQueue => application\\command\\TopicQueue,
];
application/common/command/*.php
application/command/DirectQueue.php
< ?php
/**
* 接收(直接交换机)
* @param \\Closure $callback
* @param array $bindingKeys
*/
namespace app\\command;
use app\\common\\lib\\classes\\rabbitmq\\RabbitMq;
use app\\common\\lib\\classes\\RabbitMqWork;
use think\\console\\Command;
use think\\console\\Input;
use think\\console\\Output;
class DirectQueue extends Command
protected function configure()
parent::configure(); // TODO: Change the autogenerated stub
$this-> setName(directQueue);
protected function execute(Input $input, Output $output)
$RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT);
$callback = function ($msg)
echo "[x] ".$msg-> delivery_info[routing_key].":$msg-> body \\n";
;
$RabbitMqWork-> receiveDirect($callback,RabbitMq::SEVERITYS);
application/command/SendQueue.php
< ?php
/**
* 订阅(扇形交换机)
* @param $callback
*/
namespace app\\command;
use app\\common\\lib\\classes\\rabbitmq\\RabbitMq;
use app\\common\\lib\\classes\\RabbitMqWork;
use think\\console\\Command;
use think\\console\\Input;
use think\\console\\Output;
use think\\Log;
class SendQueue extends Command
protected function configure()
parent::configure(); // TODO: Change the autogenerated stub
$this-> setName(sendQueue);
protected function execute(Input $input, Output $output)
$RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT);
$callback = function ($msg)
echo Receive:;
echo "Msg:$msg-> body \\n";
\\Log::error("Msg:$msg-> body");
;
$RabbitMqWork-> subscribeQueue($callback);
application/command/SimpleWork.php
< ?php
/**
* 接收(普通)
* @param $callback
*/
namespace app\\command;
use app\\common\\lib\\classes\\RabbitMqWork;
use think\\console\\Command;
use think\\console\\Input;
use think\\console\\Output;
use think\\Log;
class SimpleWork extends Command
protected function configure()
parent::configure(); // TODO: Change the autogenerated stub
$this-> setName(simpleMq);
protected function execute(Input $input, Output $output)
$RabbitMqWork= new RabbitMqWork();
$callback = function ($msg)
echo Receive:;
$queueName = $msg-> delivery_info[routing_key];
$msgData = https://www.songbingjia.com/android/$msg-> body;
$isAck = true;
echo Msg:.$msgData."\\n";
echo QueueName:.$queueName."\\n";
if($isAck)
$msg-> delivery_info[channel]-> basic_ack($msg-> delivery_info[delivery_tag]);
;
$RabbitMqWork-> receive($callback);
application/command/TopicQueue.php
< ?php
/**
* 接收(主题交换机)
* @param \\Closure $callback
* @param array $bindingKeys
*/
namespace app\\command;
use app\\common\\lib\\classes\\rabbitmq\\RabbitMq;
use app\\common\\lib\\classes\\RabbitMqWork;
use think\\console\\Command;
use think\\console\\Input;
use think\\console\\Output;
class TopicQueue extends Command
protected function configure()
parent::configure(); // TODO: Change the autogenerated stub
$this-> setName(topicQueue);
protected function execute(Input $input, Output $output)
$RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC);
$callback = function ($msg)
echo[x] ,$msg-> delivery_info[routing_key], :, $msg-> body, "\\n";
;
$bindingKeys = [
*.orange.*,
*.*.rabbit,
lazy.#
];
$RabbitMqWork-> receiveTopic($callback,$bindingKeys);
application/command/WorkQueue.php
< ?php
/**
* 接收(工作队列)
* @param $callback
*/
namespace app\\command;
use app\\common\\lib\\classes\\RabbitMqWork;
use think\\console\\Command;
use think\\console\\Input;
use think\\console\\Output;
class WorkQueue extends Command
protected function configure()
parent::configure(); // TODO: Change the autogenerated stub
$this-> setName(workQueue);
protected function execute(Input $input, Output $output)
$RabbitMqWork = new RabbitMqWork();
$callback = function ($msg)
echo " [x] Received ", $msg-> body, "\\n";
sleep(substr_count($msg-> body, .));
echo " [x] Done", "\\n";
$msg-> delivery_info[channel]-> basic_ack($msg-> delivery_info[delivery_tag]);
;
$RabbitMqWork-> workTask($callback);
Buy me a cup of coffee :)觉得对你有帮助,就给我打赏吧,谢谢!
微信赞赏码链接,点击跳转:
??https://www.owenzhang.com/wechat_reward.png??
推荐阅读
- 正向代理与反向代理#yyds干货盘点#
- 面试官(Spring 用了哪些设计模式(说三种即可))
- Jetpack—LiveData组件的缺陷以及应对策略
- 八大排序算法Java实现(下)-快排归排基数排序
- LVS负载均衡群集之NAT模式
- 排序算法
- Developers must see: 25 best programming sites, how many do you know?
- How to run a website for free forever?
- How does cPanel use Let’s Encrypt and Cloudflare?