Php|php使用 Swoole - TaskWorker进程实现异步操 Mysql数据库示例代码
php使用 Swoole - TaskWorker进程实现异步操 Mysql数据库示例代码
- 数据库设计
- 创建mysql_server.php
- 客户端实例代码
- cli模式下执行
- task常见问题整理
- 1、task传递数据大小
- 2、task传递对象
- 3、task的onfinish回调
数据库设计
/*
Navicat Premium Data Transfer Source Server: 20.20.24.12
Source Server Type: MySQL
Source Server Version : 50641
Source Host: 20.20.24.12:3306
Source Schema: test Target Server Type: MySQL
Target Server Version : 50641
File Encoding: 65001 Date: 16/11/2019 22:40:12
*/SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for user
-- ----------------------------
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`(
`uid` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
`username` varchar(15) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
`password` char(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
PRIMARY KEY (`uid`) USING BTREE
) ENGINE = MyISAM AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
创建mysql_server.php
serv = new swoole_server("0.0.0.0", 9501);
//设置task进程的参数
$this->serv->set(array(
'worker_num' => 8,
'daemonize' => false,
'max_request' => 10000,
'dispatch_mode' => 3,
'debug_mode' => 1,
'task_worker_num' => 8
));
//定义回调函数
$this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
$this->serv->on('Connect', array($this, 'onConnect'));
$this->serv->on('Receive', array($this, 'onReceive'));
$this->serv->on('Close', array($this, 'onClose'));
//绑定task回调函数
$this->serv->on('Task', array($this, 'onTask'));
$this->serv->on('Finish', array($this, 'onFinish'));
$this->serv->start();
}//打印连接客户端函数
public function onConnect($serv, $fd, $from_id)
{
echo "Client {$fd} connect \n";
}//打印关掉连接的客户端
public function onClose($serv, $fd, $from_id)
{
echo "Client {$fd} close connection \n";
}//mysql的pdo连接方法
public function onWorkerStart($serv, $worker_id)
{
//只有当task进程的时候,才会创建pdo连接
if ($serv->taskworker) {
//设置要连接的mysql
$this->pdo = new PDO(
"mysql:host=localhost;
port=3306;
dbname=test",
"root", "123456",
array(
PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES 'UTF8'",
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_PERSISTENT => true
)
);
echo "Task Worker\n";
} else {
echo "Worker Process \n";
}
}//调用插入mysql数据的函数
public function onReceive(swoole_server $serv, $fd, $from_id, $data)
{
//获取得到客户端提交的用户名称密码 json_decode解码
$user_info = json_decode($data , true);
$task = array(
'sql' => 'insert into user values(?,?,?)',
'params' => array(null, $user_info['username'], $user_info['password']),
'fd' => $fd
);
//将次任务放入到task方法中
$serv->task(json_encode($task));
}//task执行任务
public function onTask($serv, $task_id, $from_id, $data)
{
try {
//接收data 使用decode解码
$data = https://www.it610.com/article/json_decode($data, true);
//使用pdo执行sql语句
$statement = $this->pdo->prepare($data['sql']);
$statement->execute($data['params']);
$serv->send($data['fd'], 'Insert Succeed');
return "true";
} catch (PDOException $e) {
var_dump($e);
return false;
}
}public function onFinish($serv, $task_id, $data)
{
var_dump($data);
}}new MySQLPool();
客户端实例代码
client = new swoole_client(SWOOLE_SOCK_TCP);
}public function connect()
{
if (!$this->client->connect("127.0.0.1", 9501, 1)) {
echo "Error: {$fp->errMsg}[{$fp->errCode}]\n";
}
fwrite(STDOUT, "请输入用户名称:");
$username = trim(fgets(STDIN));
fwrite(STDOUT, "请输入密码:");
$password = trim(fgets(STDIN));
//组成数组
$user_info = array(
'username' => $username,
'password' => $password,
);
//由于send发送的是一个字符串,所以必须使用json_encode编码
$this->client->send(json_encode($user_info));
sleep(1);
$message = $this->client->recv();
echo "Get Message From Server:{$message}\n";
}public function test()
{
$this->client = new swoole_client(SWOOLE_SOCK_TCP);
}
}$client = new Client();
$client->connect();
cli模式下执行 首先执行 mysql的swoole服务
php mysql_server.php执行客户端文件
文章图片
$ php mysql_client.php查询数据库中显示结果如下:
请输入用户名称:qipa250
请输入密码:123890
Get Message From Server:Insert Succeed
文章图片
文章图片
mysql> select * from user;task常见问题整理 1、task传递数据大小
±----±---------±---------+
| uid | username | password |
±----±---------±---------+
| 1 | qipa250 | 123890 |
±----±---------±---------+
1 row in set (0.00 sec)
文章图片
数据小于8K,直接通过管道传递;数据大于8K,写入临时文件传递;
2、task传递对象
可以通过序列化传递一个对象的拷贝
task中对对象的改变不会反映到worker进程中
数据库连接,网络连接对象不可以传递
3、task的onfinish回调
task的onfinish回调挥发会调用task方法的worker进程
推荐阅读
- 由浅入深理解AOP
- 【译】20个更有效地使用谷歌搜索的技巧
- mybatisplus如何在xml的连表查询中使用queryWrapper
- MybatisPlus|MybatisPlus LambdaQueryWrapper使用int默认值的坑及解决
- MybatisPlus使用queryWrapper如何实现复杂查询
- iOS中的Block
- Linux下面如何查看tomcat已经使用多少线程
- thinkphp|thinkphp 3.2 如何调用第三方类库
- CGI,FastCGI,PHP-CGI与PHP-FPM
- 使用composer自动加载类文件