yurunsoft / workerman-gateway-sdk
一个支持在 Swoole 或其它非 Workerman 环境,开发 Gateway Worker 的组件。
Installs: 65 814
Dependents: 2
Suggesters: 0
Security: 0
Stars: 7
Watchers: 3
Forks: 1
Open Issues: 0
Requires
- php: >=7.1
- workerman/gateway-worker: ^3.0
Requires (Dev)
- friendsofphp/php-cs-fixer: 2.18.3
- phpstan/phpstan: 0.12.82
- phpunit/phpunit: ^7.5|^8.0|^9.0
- swoole/ide-helper: ^4.6
This package is auto-updated.
Last update: 2024-10-24 16:17:54 UTC
README
一个支持在 Swoole 或其它非 Workerman 环境,开发 Gateway Worker 的组件。
支持用 Workerman Gateway 做网关,Swoole 编写业务代码。
安装
composer require yurunsoft/workerman-gateway-sdk
Swoole Demo
<?php declare(strict_types=1); use GatewayWorker\Lib\Context; use GatewayWorker\Lib\Gateway; use GatewayWorker\Protocols\GatewayProtocol; use Swoole\Coroutine; use Swoole\Coroutine\Channel; use function Swoole\Coroutine\parallel; use Workerman\Gateway\Config\GatewayWorkerConfig; use Workerman\Gateway\Gateway\Contract\IGatewayClient; use Workerman\Gateway\Gateway\GatewayWorkerClient; require dirname(__DIR__) . '/vendor/autoload.php'; Co\run(function () { $channel = new Channel(1024); Coroutine::create(function () use ($channel) { // 通过 Channel 实现单进程多协程任务处理 parallel(swoole_cpu_num(), function () use ($channel) { while (true) { $result = $channel->pop(); if (false === $result) { break; } switch ($result['type']) { case 'onException': /** @var Throwable $th */ ['th' => $th] = $result['data']; // 异常处理 var_dump($th->getMessage(), $th->getTraceAsString()); break; case 'onGatewayMessage': /** @var IGatewayClient $client */ ['client' => $client, 'message' => $message] = $result['data']; var_dump($message); $clientId = Context::addressToClientId($message['local_ip'], $message['local_port'], $message['connection_id']); switch ($message['cmd']) { case GatewayProtocol::CMD_ON_CONNECT: // 连接 var_dump('connect:' . $clientId); break; case GatewayProtocol::CMD_ON_MESSAGE: var_dump('message:' . $clientId, 'body:' . $message['body']); $data = json_decode($message['body'], true); switch ($data['action'] ?? '') { case 'send': // {"action":"send", "content":"test content"} // 广播给所有用户 Gateway::sendToAll(json_encode([ 'action' => 'receive', 'content' => $data['content'] ?? '', ])); break; } break; case GatewayProtocol::CMD_ON_CLOSE: var_dump('close:' . $clientId); break; case GatewayProtocol::CMD_ON_WEBSOCKET_CONNECT: var_dump('websocket connect:' . $clientId, 'body:', $message['body']); break; } break; } } }); }); $config = new GatewayWorkerConfig(); $config->setRegisterAddress('127.0.0.1:1238'); // Gateway Client 配置 Gateway::$registerAddress = $config->getRegisterAddress(); $workerKey = getmypid() . '-' . Coroutine::getuid(); // Gateway Worker $client = new GatewayWorkerClient($workerKey, $config); // 异常处理 $client->onException = function (Throwable $th) use ($channel) { $channel->push([ 'type' => 'onException', 'data' => [ 'th' => $th, ], ]); }; // 网关消息 $client->onGatewayMessage = function (IGatewayClient $client, array $message) use ($channel) { $channel->push([ 'type' => 'onGatewayMessage', 'data' => [ 'client' => $client, 'message' => $message, ], ]); }; $client->run(); });