Php|php使用 Swoole - TaskWorker进程实现异步操 Mysql数据库示例代码


php使用 Swoole - TaskWorker进程实现异步操 Mysql数据库示例代码

    • 数据库设计
    • 创建mysql_server.php
    • 客户端实例代码
    • cli模式下执行
    • task常见问题整理
      • 1、task传递数据大小
      • 2、task传递对象
      • 3、task的onfinish回调
【Php|php使用 Swoole - TaskWorker进程实现异步操 Mysql数据库示例代码】
数据库设计
/* Navicat Premium Data Transfer Source Server: 20.20.24.12 Source Server Type: MySQL Source Server Version : 50641 Source Host: 20.20.24.12:3306 Source Schema: test Target Server Type: MySQL Target Server Version : 50641 File Encoding: 65001 Date: 16/11/2019 22:40:12 */SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for user -- ---------------------------- DROP TABLE IF EXISTS `user`; CREATE TABLE `user`( `uid` int(10) UNSIGNED NOT NULL AUTO_INCREMENT, `username` varchar(15) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '', `password` char(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '', PRIMARY KEY (`uid`) USING BTREE ) ENGINE = MyISAM AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;

创建mysql_server.php
serv = new swoole_server("0.0.0.0", 9501); //设置task进程的参数 $this->serv->set(array( 'worker_num' => 8, 'daemonize' => false, 'max_request' => 10000, 'dispatch_mode' => 3, 'debug_mode' => 1, 'task_worker_num' => 8 )); //定义回调函数 $this->serv->on('WorkerStart', array($this, 'onWorkerStart')); $this->serv->on('Connect', array($this, 'onConnect')); $this->serv->on('Receive', array($this, 'onReceive')); $this->serv->on('Close', array($this, 'onClose')); //绑定task回调函数 $this->serv->on('Task', array($this, 'onTask')); $this->serv->on('Finish', array($this, 'onFinish')); $this->serv->start(); }//打印连接客户端函数 public function onConnect($serv, $fd, $from_id) { echo "Client {$fd} connect \n"; }//打印关掉连接的客户端 public function onClose($serv, $fd, $from_id) { echo "Client {$fd} close connection \n"; }//mysql的pdo连接方法 public function onWorkerStart($serv, $worker_id) { //只有当task进程的时候,才会创建pdo连接 if ($serv->taskworker) { //设置要连接的mysql $this->pdo = new PDO( "mysql:host=localhost; port=3306; dbname=test", "root", "123456", array( PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES 'UTF8'", PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, PDO::ATTR_PERSISTENT => true ) ); echo "Task Worker\n"; } else { echo "Worker Process \n"; } }//调用插入mysql数据的函数 public function onReceive(swoole_server $serv, $fd, $from_id, $data) { //获取得到客户端提交的用户名称密码 json_decode解码 $user_info = json_decode($data , true); $task = array( 'sql' => 'insert into user values(?,?,?)', 'params' => array(null, $user_info['username'], $user_info['password']), 'fd' => $fd ); //将次任务放入到task方法中 $serv->task(json_encode($task)); }//task执行任务 public function onTask($serv, $task_id, $from_id, $data) { try { //接收data 使用decode解码 $data = https://www.it610.com/article/json_decode($data, true); //使用pdo执行sql语句 $statement = $this->pdo->prepare($data['sql']); $statement->execute($data['params']); $serv->send($data['fd'], 'Insert Succeed'); return "true"; } catch (PDOException $e) { var_dump($e); return false; } }public function onFinish($serv, $task_id, $data) { var_dump($data); }}new MySQLPool();

客户端实例代码
client = new swoole_client(SWOOLE_SOCK_TCP); }public function connect() { if (!$this->client->connect("127.0.0.1", 9501, 1)) { echo "Error: {$fp->errMsg}[{$fp->errCode}]\n"; } fwrite(STDOUT, "请输入用户名称:"); $username = trim(fgets(STDIN)); fwrite(STDOUT, "请输入密码:"); $password = trim(fgets(STDIN)); //组成数组 $user_info = array( 'username' => $username, 'password' => $password, ); //由于send发送的是一个字符串,所以必须使用json_encode编码 $this->client->send(json_encode($user_info)); sleep(1); $message = $this->client->recv(); echo "Get Message From Server:{$message}\n"; }public function test() { $this->client = new swoole_client(SWOOLE_SOCK_TCP); } }$client = new Client(); $client->connect();

cli模式下执行 首先执行 mysql的swoole服务
php mysql_server.php
Php|php使用 Swoole - TaskWorker进程实现异步操 Mysql数据库示例代码
文章图片

执行客户端文件
$ php mysql_client.php
请输入用户名称:qipa250
请输入密码:123890
Get Message From Server:Insert Succeed
Php|php使用 Swoole - TaskWorker进程实现异步操 Mysql数据库示例代码
文章图片

查询数据库中显示结果如下:
Php|php使用 Swoole - TaskWorker进程实现异步操 Mysql数据库示例代码
文章图片

mysql> select * from user;
±----±---------±---------+
| uid | username | password |
±----±---------±---------+
| 1 | qipa250 | 123890 |
±----±---------±---------+
1 row in set (0.00 sec)
Php|php使用 Swoole - TaskWorker进程实现异步操 Mysql数据库示例代码
文章图片

task常见问题整理 1、task传递数据大小
数据小于8K,直接通过管道传递;数据大于8K,写入临时文件传递;
2、task传递对象
可以通过序列化传递一个对象的拷贝
task中对对象的改变不会反映到worker进程中
数据库连接,网络连接对象不可以传递
3、task的onfinish回调
task的onfinish回调挥发会调用task方法的worker进程

    推荐阅读