基于swoole、redis做一个消息通知功能
利用swoole开启常驻进程,需要几个按自己的情况来定,swoole进程数最好是和服务器cpu核数相等 (推荐学习: swoole视频教程)
利用swoole启动的常驻进程不断的去探测redis队列里面的值,可以按键值来做一个快中慢这样的权重处理,需要急需处理,数据量大的可以用多几个进程,一般的可以分配不同的进程数来执行。
下面上代码:
swoole启动代码
function run() { try { $swoole = new swoole_server(127.0.0.1, 9999); $swoole->set([ 'daemonize' => 1, //是否开启守护进程 'worker_num' => 8, //实际需要去设定 'log_file' => __APP_LOGS_PATH__ . '/swoole.log' ]); $swoole->on('WorkerStart', 'onWorkerStart'); $swoole->on('Receive', 'onReceive'); $swoole->start(); } catch (Exception $e) { logs(['err_code' => $e->getCode(), 'err_msg' => $e->getMessage()], 'error'); } }
swoole实时监测redis队列里的数据,根据键值进行权重排比
代码
function onWorkerStart(swoole_server $swoole, $worker_id) { $chQuick = [0, 1, 2, 3]; $chNormal = [4, 5]; $chSlow = [6]; for ($i = 1; $i llen(QUEUE_QUICK)) $keys[] = QUEUE_QUICK; if ($keys) $queueData = $redis->brpop(QUEUE_QUICK, 5); } elseif (in_array($worker_id, $chNormal)) { if ($redis->llen(QUEUE_NORMAL)) $keys[] = QUEUE_NORMAL; if ($redis->llen(QUEUE_QUICK)) $keys[] = QUEUE_QUICK; if ($keys) $queueData = $redis->brpop(QUEUE_NORMAL, QUEUE_QUICK, 5); } elseif (in_array($worker_id, $chSlow)) { if ($redis->llen(QUEUE_SLOW)) $keys[] = QUEUE_SLOW; if ($redis->llen(QUEUE_NORMAL)) $keys[] = QUEUE_NORMAL; if ($redis->llen(QUEUE_QUICK)) $keys[] = QUEUE_QUICK; if ($keys) $queueData = $redis->brpop(QUEUE_SLOW, QUEUE_QUICK, QUEUE_NORMAL, 5); } else { if ($redis->llen(QUEUE_FAIL)) $keys[] = QUEUE_FAIL; if ($redis->llen(QUEUE_SLOW)) $keys[] = QUEUE_SLOW; if ($redis->llen(QUEUE_NORMAL)) $keys[] = QUEUE_NORMAL; if ($redis->llen(QUEUE_QUICK)) $keys[] = QUEUE_QUICK; if ($keys) $queueData = $redis->brpop(QUEUE_FAIL, QUEUE_QUICK, QUEUE_NORMAL, QUEUE_SLOW, 5); } logs('test'.$keys.'%%'.$queueData); if ($queueData) { $queueName = $queueData[0]; $message = $queueData[1]; if ($worker_id == QUEUE_FAIL_WORKER_ID && $queueName == QUEUE_FAIL) { call_user_func_array('retryPostMessage', [&$message, &$redis]); } else { call_user_func_array('postMessage', [&$message, &$redis]); } } else { sleep(5); } } sleep(10); $redis->close(); unset($redis); method_exists($swoole, 'stop') ? $swoole->stop() : @exit; }
里面的for循环是为了配合sleep函数来使用,三次失败的可以记入失败,可以手动去处理。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END