laravel8中laravel-swoole的扩展不兼容消息队列怎么办?

                                                                               

下面由laravel教程栏目给大家介绍laravel-swoole消息队列,希望对需要的朋友有所帮助!

这段时间用laravel8+laravel-swoole做项目,可发现laravel-swoole的扩展不兼容消息队列;
laravel8中laravel-swoole的扩展不兼容消息队列怎么办?

思来想去这咋办呢,这咋办呢.咋办那就自己写咯!还好thinkphp-swoole扩展已经兼容了,那不就嘿嘿嘿!
laravel8中laravel-swoole的扩展不兼容消息队列怎么办?

直接上修改的思路和代码!开干!

一种是增加另外启动的命令或者在swoole启动的时候一起启动消息队列进行消费,我这么懒的人一个命令能解决的,绝不写两命令.

首先重写swoole启动命令

<?php namespace crmebswoolecommand;   use IlluminateSupportArr; use SwooleProcess; use SwooleTWhttpServerFacadesServer; use SwooleTWHttpServerManager; use crmebswooleserverInteractsWithQueue; use crmebswooleserverFileWatcher; use SwooleRuntime;  class HttpServerCommand extends SwooleTWHttpCommandsHttpServerCommand {     use InteractsWithQueue;      /**      * The name and signature of the console command.      *      * @var string      */     protected $signature = &#39;crmeb:http {action : start|stop|restart|reload|infos}&#39;;      /**      * Run swoole_http_server.      */     protected function start()     {         if ($this->isRunning()) {             $this-&gt;error('Failed! swoole_http_server process is already running.');              return;         }          $host             = Arr::get($this-&gt;config, 'server.host');         $port             = Arr::get($this-&gt;config, 'server.port');         $hotReloadEnabled = Arr::get($this-&gt;config, 'hot_reload.enabled');         $queueEnabled     = Arr::get($this-&gt;config, 'queue.enabled');         $accessLogEnabled = Arr::get($this-&gt;config, 'server.access_log');         $coroutineEnable  = Arr::get($this-&gt;config, 'coroutine.enable');          $this-&gt;info('Starting swoole http server...');         $this-&gt;info("Swoole http server started: <http:>");         if ($this-&gt;isDaemon()) {             $this-&gt;info(                 '&gt; (You can run this command to ensure the ' .                 'swoole_http_server process is running: ps aux|grep "swoole")'             );         }          $manager = $this-&gt;laravel-&gt;make(Manager::class);         $server  = $this-&gt;laravel-&gt;make(Server::class);          if ($accessLogEnabled) {             $this-&gt;registerAccessLog();         }         //热更新重写         if ($hotReloadEnabled) {             $manager-&gt;addProcess($this-&gt;getHotReloadProcessNow($server));         }         //启动消息队列进行消费         if ($queueEnabled) {             $this-&gt;prepareQueue($manager);         }          if ($coroutineEnable) {             Runtime::enableCoroutine(true, Arr::get($this-&gt;config, 'coroutine.flags', SWOOLE_HOOK_ALL));         }          $manager-&gt;run();     }      /**      * @param Server $server      * @return Process|void      */     protected function getHotReloadProcessNow($server)     {         return new Process(function () use ($server) {             $watcher = new FileWatcher(                 Arr::get($this-&gt;config, 'hot_reload.include', []),                 Arr::get($this-&gt;config, 'hot_reload.exclude', []),                 Arr::get($this-&gt;config, 'hot_reload.name', [])             );              $watcher-&gt;watch(function () use ($server) {                 $server-&gt;reload();             });         }, false, 0, true);     }  }</http:>

InteractsWithQueue 类

<?php namespace crmebswooleserver;   use crmebswoolequeueManager as QueueManager; use SwooleTWHttpServerManager;  /**  * Trait InteractsWithQueue  * @package crmebswooleserver  */ trait InteractsWithQueue {     public function prepareQueue(Manager $manager)     {         /** @var QueueManager $queueManager */         $queueManager = $this->laravel-&gt;make(QueueManager::class);          $queueManager-&gt;attachToServer($manager, $this-&gt;output);     } }

Manager类

<?php namespace crmebswoolequeue;   use IlluminateContractsContainerContainer; use SwooleConstant; use SwooleProcess; use SwooleProcessPool; use SwooleTimer; use IlluminateSupportArr; use IlluminateQueueEventsJobFailed; use IlluminateQueueWorker; use crmebswooleserverWithContainer; use IlluminateQueueJobsJob; use function SwooleCoroutinerun; use IlluminateQueueWorkerOptions; use SwooleTWHttpServerManager as ServerManager; use IlluminateConsoleOutputStyle;  class Manager {     use WithContainer;      /**      * Container.      *      * @var IlluminateContractsContainerContainer      */     protected $container;      /**      * @var OutputStyle      */     protected $output;      /**      * @var Closure[]      */     protected $workers = [];      /**      * Manager constructor.      * @param Container $container      */     public function __construct(Container $container)     {         $this->container = $container;     }      /**      * @param ServerManager $server      */     public function attachToServer(ServerManager $server, OutputStyle $output)     {         $this-&gt;output = $output;         $this-&gt;listenForEvents();         $this-&gt;createWorkers();         foreach ($this-&gt;workers as $worker) {             $server-&gt;addProcess(new Process($worker, false, 0, true));         }     }      /**      * 运行消息队列命令      */     public function run(): void     {         @cli_set_process_title("swoole queue: manager process");          $this-&gt;listenForEvents();         $this-&gt;createWorkers();          $pool = new Pool(count($this-&gt;workers));          $pool-&gt;on(Constant::EVENT_WORKER_START, function (Pool $pool, int $workerId) {             $process = $pool-&gt;getProcess($workerId);             run($this-&gt;workers[$workerId], $process);         });          $pool-&gt;start();     }      /**      * 创建执行任务      */     protected function createWorkers()     {         $workers = $this-&gt;getConfig('queue.workers', []);          foreach ($workers as $queue =&gt; $options) {              if (strpos($queue, '@') !== false) {                 [$queue, $connection] = explode('@', $queue);             } else {                 $connection = null;             }              $this-&gt;workers[] = function (Process $process) use ($options, $connection, $queue) {                  @cli_set_process_title("swoole queue: worker process");                  /** @var Worker $worker */                 $worker = $this-&gt;container-&gt;make('queue.worker');                 /** @var WorkerOptions $option */                 $option = $this-&gt;container-&gt;make(WorkerOptions::class);                  $option-&gt;sleep = Arr::get($options, "sleep", 3);                 $option-&gt;maxTries = Arr::get($options, "tries", 0);                 $option-&gt;timeout = Arr::get($options, "timeout", 60);                  $timer = Timer::after($option-&gt;timeout * 1000, function () use ($process) {                     $process-&gt;exit();                 });                  $worker-&gt;runNextJob($connection, $queue, $option);                  Timer::clear($timer);             };         }     }      /**      * 注册事件      */     protected function listenForEvents()     {         $this-&gt;container-&gt;make('events')-&gt;listen(JobFailed::class, function (JobFailed $event) {             $this-&gt;writeOutput($event-&gt;job);              $this-&gt;logFailedJob($event);         });     }      /**      * 记录失败任务      * @param JobFailed $event      */     protected function logFailedJob(JobFailed $event)     {         $this-&gt;container['queue.failer']-&gt;log(             $event-&gt;connection,             $event-&gt;job-&gt;getQueue(),             $event-&gt;job-&gt;getRawBody(),             $event-&gt;exception         );     }      /**      * Write the status output for the queue worker.      *      * @param Job $job      * @param     $status      */     protected function writeOutput(Job $job, $status)     {         switch ($status) {             case 'starting':                 $this-&gt;writeStatus($job, 'Processing', 'comment');                 break;             case 'success':                 $this-&gt;writeStatus($job, 'Processed', 'info');                 break;             case 'failed':                 $this-&gt;writeStatus($job, 'Failed', 'error');                 break;         }     }      /**      * Format the status output for the queue worker.      *      * @param Job $job      * @param string $status      * @param string $type      * @return void      */     protected function writeStatus(Job $job, $status, $type)     {         $this-&gt;output-&gt;writeln(sprintf(             "[%s][%s] %s{$type}&gt; %s",             date('Y-m-d H:i:s'),             $job-&gt;getJobId(),             str_pad("{$status}:", 11), $job-&gt;getName()         ));     }  }

增加CrmebServiceProvider类

<?php namespace crmebswoole;   use IlluminateContractsDebugExceptionHandler; use IlluminateContractsHttpKernel; use crmebswoolecommandHttpServerCommand; use IlluminateQueueWorker; use SwooleTWHttpHttpServiceProvider; use SwooleTWHttpMiddlewareAccessLog; use SwooleTWHttpServerManager;  /**  * Class CrmebServiceProvider  * @package crmebswoole  */ class CrmebServiceProvider extends HttpServiceProvider {        /**      * Register manager.      *      * @return void      */     protected function registerManager()     {         $this->app-&gt;singleton(Manager::class, function ($app) {             return new Manager($app, 'laravel');         });          $this-&gt;app-&gt;alias(Manager::class, 'swoole.manager');          $this-&gt;app-&gt;singleton('queue.worker', function ($app) {             $isDownForMaintenance = function () {                 return $this-&gt;app-&gt;isDownForMaintenance();             };              return new Worker(                 $app['queue'],                 $app['events'],                 $app[ExceptionHandler::class],                 $isDownForMaintenance             );         });     }      /**      * Boot websocket routes.      *      * @return void      */     protected function bootWebsocketRoutes()     {         require base_path('vendor/swooletw/laravel-swoole') . '/routes/laravel_routes.php';     }      /**      * Register access log middleware to container.      *      * @return void      */     protected function pushAccessLogMiddleware()     {         $this-&gt;app-&gt;make(Kernel::class)-&gt;pushMiddleware(AccessLog::class);     }      /**      * Register commands.      */     protected function registerCommands()     {         $this-&gt;commands([             HttpServerCommand::class,         ]);     }      /**      * Merge configurations.      */     protected function mergeConfigs()     {         $this-&gt;mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_http.php', 'swoole_http');         $this-&gt;mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_websocket.php', 'swoole_websocket');     }      /**      * Publish files of this package.      */     protected function publishFiles()     {         $this-&gt;publishes([             base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_http.php' =&gt; base_path('config/swoole_http.php'),             base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_websocket.php' =&gt; base_path('config/swoole_websocket.php'),             base_path('vendor/swooletw/laravel-swoole') . '/routes/websocket.php' =&gt; base_path('routes/websocket.php'),         ], 'laravel-swoole');     } }

然后再把crmebswooleCrmebServiceProvider::class放入config/app.php中的providers中加载重写了swoole的命令启动方式

配置config/swoole_http.php

return [     'queue'        => [         //是否开启自动消费队列         'enabled' => true,         'workers' => [             //队列名称             'CRMEB' => []         ]     ],];

输入命令:
php artisan crmeb:http restart

swoole启动后就可以自动消费队列了。

相关推荐:最新的五个Laravel视频教程

以上就是laravel8中laravel-

© 版权声明
THE END
喜欢就支持一下吧
点赞12 分享