laravel-swoole的扩展不兼容消息队列该怎么办()

这段时间用laravel8+laravel-swoole做项目,可发现laravel-swoole的扩展不兼容消息队列;
laravel-swoole的扩展不兼容消息队列该怎么办()
文章图片

思来想去这咋办呢,这咋办呢.咋办那就自己写咯!还好thinkphp-swoole扩展已经兼容了,那不就嘿嘿嘿!
laravel-swoole的扩展不兼容消息队列该怎么办()
文章图片

直接上修改的思路和代码!开干!
一种是增加另外启动的命令或者在swoole启动的时候一起启动消息队列进行消费,我这么懒的人一个命令能解决的,绝不写两命令.
首先重写swoole启动命令

isRunning()) { $this->error('Failed! swoole_http_server process is already running.'); return; }$host= Arr::get($this->config, 'server.host'); $port= Arr::get($this->config, 'server.port'); $hotReloadEnabled = Arr::get($this->config, 'hot_reload.enabled'); $queueEnabled= Arr::get($this->config, 'queue.enabled'); $accessLogEnabled = Arr::get($this->config, 'server.access_log'); $coroutineEnable= Arr::get($this->config, 'coroutine.enable'); $this->info('Starting swoole http server...'); $this->info("Swoole http server started: "); if ($this->isDaemon()) { $this->info( '> (You can run this command to ensure the ' . 'swoole_http_server process is running: ps aux|grep "swoole")' ); }$manager = $this->laravel->make(Manager::class); $server= $this->laravel->make(Server::class); if ($accessLogEnabled) { $this->registerAccessLog(); } //热更新重写 if ($hotReloadEnabled) { $manager->addProcess($this->getHotReloadProcessNow($server)); } //启动消息队列进行消费 if ($queueEnabled) { $this->prepareQueue($manager); }if ($coroutineEnable) { Runtime::enableCoroutine(true, Arr::get($this->config, 'coroutine.flags', SWOOLE_HOOK_ALL)); }$manager->run(); }/** * @param Server $server * @return Process|void */ protected function getHotReloadProcessNow($server) { return new Process(function () use ($server) { $watcher = new FileWatcher( Arr::get($this->config, 'hot_reload.include', []), Arr::get($this->config, 'hot_reload.exclude', []), Arr::get($this->config, 'hot_reload.name', []) ); $watcher->watch(function () use ($server) { $server->reload(); }); }, false, 0, true); }}

InteractsWithQueue 类
laravel->make(QueueManager::class); $queueManager->attachToServer($manager, $this->output); } }

Manager类
container = $container; }/** * @param ServerManager $server */ public function attachToServer(ServerManager $server, OutputStyle $output) { $this->output = $output; $this->listenForEvents(); $this->createWorkers(); foreach ($this->workers as $worker) { $server->addProcess(new Process($worker, false, 0, true)); } }/** * 运行消息队列命令 */ public function run(): void { @cli_set_process_title("swoole queue: manager process"); $this->listenForEvents(); $this->createWorkers(); $pool = new Pool(count($this->workers)); $pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, int $workerId) { $process = $pool->getProcess($workerId); run($this->workers[$workerId], $process); }); $pool->start(); }/** * 创建执行任务 */ protected function createWorkers() { $workers = $this->getConfig('queue.workers', []); foreach ($workers as $queue => $options) {if (strpos($queue, '@') !== false) { [$queue, $connection] = explode('@', $queue); } else { $connection = null; }$this->workers[] = function (Process $process) use ($options, $connection, $queue) {@cli_set_process_title("swoole queue: worker process"); /** @var Worker $worker */ $worker = $this->container->make('queue.worker'); /** @var WorkerOptions $option */ $option = $this->container->make(WorkerOptions::class); $option->sleep = Arr::get($options, "sleep", 3); $option->maxTries = Arr::get($options, "tries", 0); $option->timeout = Arr::get($options, "timeout", 60); $timer = Timer::after($option->timeout * 1000, function () use ($process) { $process->exit(); }); $worker->runNextJob($connection, $queue, $option); Timer::clear($timer); }; } }/** * 注册事件 */ protected function listenForEvents() { $this->container->make('events')->listen(JobFailed::class, function (JobFailed $event) { $this->writeOutput($event->job); $this->logFailedJob($event); }); }/** * 记录失败任务 * @param JobFailed $event */ protected function logFailedJob(JobFailed $event) { $this->container['queue.failer']->log( $event->connection, $event->job->getQueue(), $event->job->getRawBody(), $event->exception ); }/** * Write the status output for the queue worker. * * @param Job $job * @param$status */ protected function writeOutput(Job $job, $status) { switch ($status) { case 'starting': $this->writeStatus($job, 'Processing', 'comment'); break; case 'success': $this->writeStatus($job, 'Processed', 'info'); break; case 'failed': $this->writeStatus($job, 'Failed', 'error'); break; } }/** * Format the status output for the queue worker. * * @param Job $job * @param string $status * @param string $type * @return void */ protected function writeStatus(Job $job, $status, $type) { $this->output->writeln(sprintf( "<{$type}>[%s][%s] %s %s", date('Y-m-d H:i:s'), $job->getJobId(), str_pad("{$status}:", 11), $job->getName() )); }}

增加CrmebServiceProvider类
app->singleton(Manager::class, function ($app) { return new Manager($app, 'laravel'); }); $this->app->alias(Manager::class, 'swoole.manager'); $this->app->singleton('queue.worker', function ($app) { $isDownForMaintenance = function () { return $this->app->isDownForMaintenance(); }; return new Worker( $app['queue'], $app['events'], $app[ExceptionHandler::class], $isDownForMaintenance ); }); }/** * Boot websocket routes. * * @return void */ protected function bootWebsocketRoutes() { require base_path('vendor/swooletw/laravel-swoole') . '/routes/laravel_routes.php'; }/** * Register access log middleware to container. * * @return void */ protected function pushAccessLogMiddleware() { $this->app->make(Kernel::class)->pushMiddleware(AccessLog::class); }/** * Register commands. */ protected function registerCommands() { $this->commands([ HttpServerCommand::class, ]); }/** * Merge configurations. */ protected function mergeConfigs() { $this->mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_http.php', 'swoole_http'); $this->mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_websocket.php', 'swoole_websocket'); }/** * Publish files of this package. */ protected function publishFiles() { $this->publishes([ base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_http.php' => base_path('config/swoole_http.php'), base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_websocket.php' => base_path('config/swoole_websocket.php'), base_path('vendor/swooletw/laravel-swoole') . '/routes/websocket.php' => base_path('routes/websocket.php'), ], 'laravel-swoole'); } }

然后再把\crmeb\swoole\CrmebServiceProvider::class放入config/app.php中的providers中加载重写了swoole的命令启动方式
配置config/swoole_http.php
return [ 'queue'=> [ //是否开启自动消费队列 'enabled' => true, 'workers' => [ //队列名称 'CRMEB' => [] ] ], ];

输入命令:
php artisan crmeb:http restart
swoole启动后就可以自动消费队列了
最后给大哥们推荐个项目行行好大哥们点点就行,感谢了:
PC商城+移动端uniapp多端兼容+全新架构模式
点链接的大哥们,一夜暴富,万事顺心,春风不倒,夜夜笙歌
laravel-swoole的扩展不兼容消息队列该怎么办()
文章图片

【laravel-swoole的扩展不兼容消息队列该怎么办()】如果你觉得这篇文章对你有点用的话,麻烦请给我们的开源项目点点star: http://github.crmeb.net/u/defu 不胜感激 !

    推荐阅读