From c69b7b45da2ccb21da6734dc703aa05a5ae825f9 Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Sun, 19 Nov 2023 21:22:34 +0000 Subject: [PATCH] Adds Pusher routes (#16) * close with message * ensure app exists * handle event trigger * scaffold routes * implement events controller * update phpunit config * implement batch events endpoint * implement channels route * wip * add channels test * use correct id * wip * revert * add channel endpoint * Fix code styling * implements users routes * implement terminate user endpoint * formatting --- composer.json | 3 +- phpunit.xml.dist | 45 +++--- phpunit.xml.dist.bak | 23 --- src/Channels/Channel.php | 4 +- src/ClientEvent.php | 2 +- src/Concerns/ClosesConnections.php | 4 +- src/Contracts/Connection.php | 2 +- src/Event.php | 5 +- src/Http/Router.php | 12 +- src/Http/Server.php | 11 +- src/Managers/ChannelManager.php | 12 +- src/Managers/ConnectionManager.php | 2 +- .../Http/Controllers/ChannelController.php | 29 ++++ .../Controllers/ChannelUsersController.php | 27 ++++ .../Http/Controllers/ChannelsController.php | 31 ++++ src/Pusher/Http/Controllers/Controller.php | 138 ++++++++++++++++++ .../Controllers/EventsBatchController.php | 57 ++++++++ .../Http/Controllers/EventsController.php | 64 ++++++++ .../Controllers/UsersTerminateController.php | 25 ++++ src/Servers/Reverb/Controller.php | 4 +- src/Servers/Reverb/Factory.php | 14 +- .../Feature/Ratchet/ChannelControllerTest.php | 40 +++++ .../Ratchet/ChannelUsersControllerTest.php | 16 ++ .../Ratchet/ChannelsControllerTest.php | 37 +++++ .../Ratchet/EventsBatchControllerTest.php | 81 ++++++++++ .../Feature/Ratchet/EventsControllerTest.php | 96 ++++++++++++ .../Ratchet/UsersTerminateControllerTest.php | 37 +++++ tests/RatchetTestCase.php | 81 ++++++++-- tests/Unit/EventTest.php | 15 +- 29 files changed, 835 insertions(+), 82 deletions(-) delete mode 100644 phpunit.xml.dist.bak create mode 100644 src/Pusher/Http/Controllers/ChannelController.php create mode 100644 src/Pusher/Http/Controllers/ChannelUsersController.php create mode 100644 src/Pusher/Http/Controllers/ChannelsController.php create mode 100644 src/Pusher/Http/Controllers/Controller.php create mode 100644 src/Pusher/Http/Controllers/EventsBatchController.php create mode 100644 src/Pusher/Http/Controllers/EventsController.php create mode 100644 src/Pusher/Http/Controllers/UsersTerminateController.php create mode 100644 tests/Feature/Ratchet/ChannelControllerTest.php create mode 100644 tests/Feature/Ratchet/ChannelUsersControllerTest.php create mode 100644 tests/Feature/Ratchet/ChannelsControllerTest.php create mode 100644 tests/Feature/Ratchet/EventsBatchControllerTest.php create mode 100644 tests/Feature/Ratchet/EventsControllerTest.php create mode 100644 tests/Feature/Ratchet/UsersTerminateControllerTest.php diff --git a/composer.json b/composer.json index 6122cb67..a419ab1a 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,8 @@ "pestphp/pest": "^2.0", "phpstan/phpstan": "^1.10", "ratchet/pawl": "^0.4.1", - "react/async": "^4.0" + "react/async": "^4.0", + "react/http": "^1.9" }, "autoload": { "psr-4": { diff --git a/phpunit.xml.dist b/phpunit.xml.dist index ee49a779..a453a02e 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,23 +1,24 @@ - - - - ./tests/Unit - - - ./tests/Feature - - - - - ./src - - - - - - - - - - \ No newline at end of file + + + + ./tests/Unit + + + ./tests/Feature + + + + + + + + + + + + + ./src + + + diff --git a/phpunit.xml.dist.bak b/phpunit.xml.dist.bak deleted file mode 100644 index d9c7d277..00000000 --- a/phpunit.xml.dist.bak +++ /dev/null @@ -1,23 +0,0 @@ - - - - - ./tests/Unit - - - ./tests/Feature - - - - - ./src - - - - - - - - - - diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index a247f3a6..aa9d80cb 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -51,11 +51,11 @@ public function broadcast(Application $app, array $payload, Connection $except = { collect(App::make(ChannelManager::class)->for($app)->connections($this)) ->each(function ($connection) use ($payload, $except) { - if ($except && $except->identifier() === $connection->identifier()) { + if ($except && $except->id() === $connection->id()) { return; } - if (isset($payload['except']) && $payload['except'] === $connection->identifier()) { + if (isset($payload['except']) && $payload['except'] === $connection->id()) { return; } diff --git a/src/ClientEvent.php b/src/ClientEvent.php index 52caebf5..f00f733d 100644 --- a/src/ClientEvent.php +++ b/src/ClientEvent.php @@ -33,7 +33,7 @@ public static function whisper(Connection $connection, array $payload): void { Event::dispatch( $connection->app(), - $payload + ['except' => $connection->identifier()], + $payload + ['except' => $connection->id()], $connection ); } diff --git a/src/Concerns/ClosesConnections.php b/src/Concerns/ClosesConnections.php index e51a4f08..9b55a9a6 100644 --- a/src/Concerns/ClosesConnections.php +++ b/src/Concerns/ClosesConnections.php @@ -11,9 +11,9 @@ trait ClosesConnections /** * Close the connection. */ - protected function close(Connection $connection, int $statusCode = 400, array $headers = []): void + protected function close(Connection $connection, int $statusCode = 400, string $message = '', array $headers = []): void { - $response = new Response($statusCode, $headers); + $response = new Response($statusCode, $headers, $message); $connection->send(Message::toString($response)); $connection->close(); diff --git a/src/Contracts/Connection.php b/src/Contracts/Connection.php index ff73ba5b..8ad2d19f 100644 --- a/src/Contracts/Connection.php +++ b/src/Contracts/Connection.php @@ -97,7 +97,7 @@ public function disconnect(): void App::make(ConnectionManager::class) ->for($this->app()) - ->disconnect($this->identifier()); + ->disconnect($this->id()); $this->terminate(); } diff --git a/src/Event.php b/src/Event.php index 1f83213a..78d71cad 100644 --- a/src/Event.php +++ b/src/Event.php @@ -2,6 +2,7 @@ namespace Laravel\Reverb; +use Illuminate\Support\Arr; use Illuminate\Support\Facades\App; use Laravel\Reverb\Channels\ChannelBroker; use Laravel\Reverb\Contracts\Connection; @@ -33,10 +34,12 @@ public static function dispatch(Application $app, array $payload, Connection $co */ public static function dispatchSynchronously(Application $app, array $payload, Connection $connection = null): void { - $channels = isset($payload['channel']) ? [$payload['channel']] : $payload['channels']; + $channels = Arr::wrap($payload['channels'] ?? $payload['channel'] ?? []); foreach ($channels as $channel) { + unset($payload['channels']); $channel = ChannelBroker::create($channel); + $payload['channel'] = $channel->name(); $channel->broadcast($app, $payload, $connection); } diff --git a/src/Http/Router.php b/src/Http/Router.php index 2fa7160d..aaafec3f 100644 --- a/src/Http/Router.php +++ b/src/Http/Router.php @@ -38,12 +38,18 @@ public function dispatch(RequestInterface $request, Connection $connection): mix try { $route = $this->matcher->match($uri->getPath()); } catch (MethodNotAllowedException $e) { - return $this->close($connection, 405, ['Allow' => $e->getAllowedMethods()]); + return $this->close($connection, 405, 'Method now allowed', ['Allow' => $e->getAllowedMethods()]); } catch (ResourceNotFoundException $e) { - return $this->close($connection, 404); + return $this->close($connection, 404, 'Not found.'); } - return $route['_controller']($request, $connection, ...Arr::except($route, ['_controller', '_route'])); + $response = $route['_controller']($request, $connection, ...Arr::except($route, ['_controller', '_route'])); + + if (! $this->isWebSocketRequest($request)) { + return $connection->send($response)->close(); + } + + return null; } /** diff --git a/src/Http/Server.php b/src/Http/Server.php index efa1666b..c7665c68 100644 --- a/src/Http/Server.php +++ b/src/Http/Server.php @@ -45,6 +45,15 @@ public function start(): void $this->loop->run(); } + /** + * Stop the Http server + */ + public function stop(): void + { + $this->loop->stop(); + $this->socket->close(); + } + /** * Handle an incoming request. */ @@ -71,7 +80,7 @@ protected function createRequest(string $message, Connection $connection): Reque try { return Request::from($message, $connection); } catch (OverflowException $e) { - $this->close($connection, 413); + $this->close($connection, 413, 'Payload too large.'); } } } diff --git a/src/Managers/ChannelManager.php b/src/Managers/ChannelManager.php index 5e1d9bc0..6b57a03b 100644 --- a/src/Managers/ChannelManager.php +++ b/src/Managers/ChannelManager.php @@ -45,7 +45,7 @@ public function app(): ?Application */ public function subscribe(Channel $channel, Connection $connection, $data = []): void { - $this->connections[$this->application->id()][$channel->name()][$connection->identifier()] = $connection; + $this->connections[$this->application->id()][$channel->name()][$connection->id()] = $connection; } /** @@ -53,7 +53,7 @@ public function subscribe(Channel $channel, Connection $connection, $data = []): */ public function unsubscribe(Channel $channel, Connection $connection): void { - unset($this->connections[$this->application->id()][$channel->name()][$connection->identifier()]); + unset($this->connections[$this->application->id()][$channel->name()][$connection->id()]); } /** @@ -89,7 +89,7 @@ public function connections(Channel $channel): array /** * Get the given channel from the cache. */ - protected function channel(Channel $channel): Collection + public function channel(Channel $channel): Collection { return $this->channels($channel); } @@ -97,8 +97,12 @@ protected function channel(Channel $channel): Collection /** * Get the channels from the cache. */ - protected function channels(Channel $channel = null): Collection + public function channels(Channel $channel = null): Collection { + if (! isset($this->connections[$this->application->id()])) { + $this->connections[$this->application->id()] = []; + } + $channels = $this->connections[$this->application->id()]; if ($channel) { diff --git a/src/Managers/ConnectionManager.php b/src/Managers/ConnectionManager.php index 429c97a7..e20d57de 100644 --- a/src/Managers/ConnectionManager.php +++ b/src/Managers/ConnectionManager.php @@ -104,7 +104,7 @@ public function all(): array */ public function save(Connection $connection): void { - $this->connections[$this->application->id()][$connection->identifier()] = $connection; + $this->connections[$this->application->id()][$connection->id()] = $connection; } /** diff --git a/src/Pusher/Http/Controllers/ChannelController.php b/src/Pusher/Http/Controllers/ChannelController.php new file mode 100644 index 00000000..b9ff126b --- /dev/null +++ b/src/Pusher/Http/Controllers/ChannelController.php @@ -0,0 +1,29 @@ +query['info'] ?? ''); + $connections = $this->channels->channel(ChannelBroker::create($args['channel'])); + $totalConnections = count($connections); + + return new JsonResponse((object) array_filter([ + 'occupied' => $totalConnections > 0, + 'user_count' => in_array('user_count', $info) ? $totalConnections : null, + 'subscription_count' => in_array('subscription_count', $info) ? $totalConnections : null, + 'cache' => in_array('cache', $info) ? '{}' : null, + ], fn ($item) => $item !== null)); + } +} diff --git a/src/Pusher/Http/Controllers/ChannelUsersController.php b/src/Pusher/Http/Controllers/ChannelUsersController.php new file mode 100644 index 00000000..d0db13cf --- /dev/null +++ b/src/Pusher/Http/Controllers/ChannelUsersController.php @@ -0,0 +1,27 @@ +channels->channels(); + $info = explode(',', $this->query['info'] ?? ''); + + if (isset($this->query['filter_by_prefix'])) { + $channels = $channels->filter(fn ($connections, $name) => Str::startsWith($name, $this->query['filter_by_prefix'])); + } + + $channels = $channels->mapWithKeys(function ($connections, $name) use ($info) { + return [$name => array_filter(['user_count' => in_array('user_count', $info) ? count($connections) : null])]; + }); + + return new JsonResponse((object) ['channels' => $channels]); + } +} diff --git a/src/Pusher/Http/Controllers/Controller.php b/src/Pusher/Http/Controllers/Controller.php new file mode 100644 index 00000000..a2b5b62a --- /dev/null +++ b/src/Pusher/Http/Controllers/Controller.php @@ -0,0 +1,138 @@ +getUri()->getQuery(), $query); + $this->body = $request->getBody()->getContents(); + $this->query = $query; + + try { + $this->setApplication($args['appId'] ?? null); + $this->setConnections(); + $this->setChannels(); + } catch (HttpException $e) { + return $this->close($connection, $e->getStatusCode(), $e->getMessage()); + } + + return $this->handle($request, $connection, ...$args); + } + + /** + * Handle the incoming request. + */ + abstract public function handle(RequestInterface $request, Connection $connection, ...$args): Response; + + /** + * Set the Reverb application instance. + * + * @throws \Symfony\Component\HttpKernel\Exception\HttpException + */ + protected function setApplication(?string $appId): Application + { + if ($this->application) { + return $this->application; + } + + if (! $appId) { + throw new HttpException(400, 'Application ID not provided.'); + } + + try { + return $this->application = app(ApplicationProvider::class)->findById($appId); + } catch (InvalidApplication $e) { + throw new HttpException(404, 'No matching application for ID ['.$appId.'] found.'); + } + } + + /** + * Set the Reverb connection manager instance. + */ + protected function setConnections() + { + $this->connections = app(ConnectionManager::class)->for($this->application); + } + + /** + * Set the Reverb channel manager instance. + */ + protected function setChannels() + { + $this->channels = app(ChannelManager::class)->for($this->application); + } + + /** + * Verify the Pusher signature. + * + * @throws \Symfony\Component\HttpKernel\Exception\HttpException + */ + protected function verifySignature(RequestInterface $request): void + { + $params = Arr::except($this->query, [ + 'auth_signature', 'body_md5', 'appId', 'appKey', 'channelName', + ]); + + if ($this->body !== '') { + $params['body_md5'] = md5($this->body); + } + + ksort($params); + + $signature = implode("\n", [ + $request->getMethod(), + $request->getUri()->getPath(), + $this->formatParams($params), + ]); + + $signature = hash_hmac('sha256', $signature, $this->application->secret()); + + if ($signature !== $this->query['auth_signature']) { + throw new HttpException(401, 'Authentication signature invalid.'); + } + } + + /** + * Format the given parameters into the correct format for signature verification. + */ + protected static function formatParams(array $params): string + { + if (! is_array($params)) { + return $params; + } + + return collect($params)->map(function ($value, $key) { + if (is_array($value)) { + $value = implode(',', $value); + } + + return "{$key}={$value}"; + })->implode('&'); + } +} diff --git a/src/Pusher/Http/Controllers/EventsBatchController.php b/src/Pusher/Http/Controllers/EventsBatchController.php new file mode 100644 index 00000000..523a89d2 --- /dev/null +++ b/src/Pusher/Http/Controllers/EventsBatchController.php @@ -0,0 +1,57 @@ +body, true)); + + $info = $items->map(function ($item) { + Event::dispatch( + $this->application, + [ + 'event' => $item['name'], + 'channel' => $item['channel'], + 'data' => $item['data'], + ], + isset($item['socket_id']) ? $this->connections->find($item['socket_id']) : null + ); + + return isset($item['info']) ? $this->getInfo($item['channel'], $item['info']) : []; + }); + + return $info->some(fn ($item) => count($item) > 0) ? new JsonResponse((object) ['batch' => $info->all()]) : new JsonResponse((object) []); + } + + /** + * Get the info for the given channels. + * + * @param array $channels + * @return array> + */ + protected function getInfo(string $channel, string $info): array + { + $info = explode(',', $info); + $count = count($this->channels->connections(ChannelBroker::create($channel))); + $info = [ + 'user_count' => in_array('user_count', $info) ? $count : null, + 'subscription_count' => in_array('subscription_count', $info) ? $count : null, + ]; + + return array_filter($info, fn ($item) => $item !== null); + } +} diff --git a/src/Pusher/Http/Controllers/EventsController.php b/src/Pusher/Http/Controllers/EventsController.php new file mode 100644 index 00000000..79474e30 --- /dev/null +++ b/src/Pusher/Http/Controllers/EventsController.php @@ -0,0 +1,64 @@ +body, true); + $channels = Arr::wrap($payload['channels'] ?? $payload['channel'] ?? []); + + Event::dispatch( + $this->application, + [ + 'event' => $payload['name'], + 'channels' => $channels, + 'data' => $payload['data'], + ], + isset($payload['socket_id']) ? $this->connections->find($payload['socket_id']) : null + ); + + if (isset($payload['info'])) { + return new JsonResponse((object) $this->getInfo($channels, $payload['info'])); + } + + return new JsonResponse((object) []); + } + + /** + * Get the info for the given channels. + * + * @param array $channels + * @return array> + */ + protected function getInfo(array $channels, string $info): array + { + $info = explode(',', $info); + + $channels = collect($channels)->mapWithKeys(function ($channel) use ($info) { + $count = count($this->channels->connections(ChannelBroker::create($channel))); + $info = [ + 'user_count' => in_array('user_count', $info) ? $count : null, + 'subscription_count' => in_array('subscription_count', $info) ? $count : null, + ]; + + return [$channel => array_filter($info, fn ($item) => $item !== null)]; + })->all(); + + return ['channels' => $channels]; + } +} diff --git a/src/Pusher/Http/Controllers/UsersTerminateController.php b/src/Pusher/Http/Controllers/UsersTerminateController.php new file mode 100644 index 00000000..aa797403 --- /dev/null +++ b/src/Pusher/Http/Controllers/UsersTerminateController.php @@ -0,0 +1,25 @@ +connections->find($args['user'])) { + return new JsonResponse((object) [], 400); + } + + $connection->disconnect(); + + return new JsonResponse((object) []); + } +} diff --git a/src/Servers/Reverb/Controller.php b/src/Servers/Reverb/Controller.php index 4b5f03e8..89b207fc 100644 --- a/src/Servers/Reverb/Controller.php +++ b/src/Servers/Reverb/Controller.php @@ -14,9 +14,9 @@ class Controller /** * Invoke the Reverb WebSocket server. */ - public function __invoke(RequestInterface $request, WsConnection $connection, string $key): void + public function __invoke(RequestInterface $request, WsConnection $connection, string $appKey): void { - $reverbConnection = $this->connection($request, $connection, $key); + $reverbConnection = $this->connection($request, $connection, $appKey); $server = app(Server::class); $server->open($reverbConnection); diff --git a/src/Servers/Reverb/Factory.php b/src/Servers/Reverb/Factory.php index eca76a45..c70ef2be 100644 --- a/src/Servers/Reverb/Factory.php +++ b/src/Servers/Reverb/Factory.php @@ -5,6 +5,12 @@ use Laravel\Reverb\Http\Route; use Laravel\Reverb\Http\Router; use Laravel\Reverb\Http\Server as HttpServer; +use Laravel\Reverb\Pusher\Http\Controllers\ChannelController; +use Laravel\Reverb\Pusher\Http\Controllers\ChannelsController; +use Laravel\Reverb\Pusher\Http\Controllers\ChannelUsersController; +use Laravel\Reverb\Pusher\Http\Controllers\EventsBatchController; +use Laravel\Reverb\Pusher\Http\Controllers\EventsController; +use Laravel\Reverb\Pusher\Http\Controllers\UsersTerminateController; use React\EventLoop\Loop; use React\EventLoop\LoopInterface; use React\Socket\SocketServer; @@ -33,7 +39,13 @@ protected static function routes(): RouteCollection { $routes = new RouteCollection; - $routes->add('sockets', Route::get('/app/{key}', new Controller)); + $routes->add('sockets', Route::get('/app/{appKey}', new Controller)); + $routes->add('events', Route::post('/apps/{appId}/events', new EventsController)); + $routes->add('events_batch', Route::post('/apps/{appId}/batch_events', new EventsBatchController)); + $routes->add('channels', Route::get('/apps/{appId}/channels', new ChannelsController)); + $routes->add('channel', Route::get('/apps/{appId}/channels/{channel}', new ChannelController)); + $routes->add('channel_users', Route::get('/apps/{appId}/channels/{channel}/users', new ChannelUsersController)); + $routes->add('users_terminate', Route::post('/apps/{appId}/users/{user}/terminate_connections', new UsersTerminateController)); return $routes; } diff --git a/tests/Feature/Ratchet/ChannelControllerTest.php b/tests/Feature/Ratchet/ChannelControllerTest.php new file mode 100644 index 00000000..ec100454 --- /dev/null +++ b/tests/Feature/Ratchet/ChannelControllerTest.php @@ -0,0 +1,40 @@ +subscribe('test-channel-one'); + $this->subscribe('test-channel-one'); + + $response = await($this->signedRequest('channels/test-channel-one?info=user_count,subscription_count,cache')); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{"occupied":true,"user_count":2,"subscription_count":2,"cache":"{}"}', $response->getBody()->getContents()); +}); + +it('returns unoccupied when no connections', function () { + $response = await($this->signedRequest('channels/test-channel-one?info=user_count,subscription_count,cache')); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{"occupied":false,"user_count":0,"subscription_count":0,"cache":"{}"}', $response->getBody()->getContents()); +}); + +it('can return only the requested attributes', function () { + $this->subscribe('test-channel-one'); + + $response = await($this->signedRequest('channels/test-channel-one?info=user_count,subscription_count,cache')); + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{"occupied":true,"user_count":1,"subscription_count":1,"cache":"{}"}', $response->getBody()->getContents()); + + $response = await($this->signedRequest('channels/test-channel-one?info=cache')); + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{"occupied":true,"cache":"{}"}', $response->getBody()->getContents()); + + $response = await($this->signedRequest('channels/test-channel-one?info=subscription_count,user_count')); + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{"occupied":true,"user_count":1,"subscription_count":1}', $response->getBody()->getContents()); +}); diff --git a/tests/Feature/Ratchet/ChannelUsersControllerTest.php b/tests/Feature/Ratchet/ChannelUsersControllerTest.php new file mode 100644 index 00000000..38e37c30 --- /dev/null +++ b/tests/Feature/Ratchet/ChannelUsersControllerTest.php @@ -0,0 +1,16 @@ +signedRequest('channels/test-channel/users')); +})->throws(ResponseException::class); + +it('returns the user data', function () { + // +})->todo(); diff --git a/tests/Feature/Ratchet/ChannelsControllerTest.php b/tests/Feature/Ratchet/ChannelsControllerTest.php new file mode 100644 index 00000000..cdb9340d --- /dev/null +++ b/tests/Feature/Ratchet/ChannelsControllerTest.php @@ -0,0 +1,37 @@ +subscribe('test-channel-one'); + $this->subscribe('test-channel-two'); + + $response = await($this->signedRequest('channels?info=user_count')); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{"channels":{"test-channel-one":{"user_count":1},"test-channel-two":{"user_count":1}}}', $response->getBody()->getContents()); +}); + +it('can return filtered channels', function () { + $this->subscribe('test-channel-one'); + $this->subscribe('test-channel-two'); + + $response = await($this->signedRequest('channels?filter_by_prefix=test-channel-t&info=user_count')); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{"channels":{"test-channel-two":{"user_count":1}}}', $response->getBody()->getContents()); +}); + +it('returns empty results if no metrics requested', function () { + $this->subscribe('test-channel-one'); + $this->subscribe('test-channel-two'); + + $response = await($this->signedRequest('channels')); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{"channels":{"test-channel-one":[],"test-channel-two":[]}}', $response->getBody()->getContents()); +}); diff --git a/tests/Feature/Ratchet/EventsBatchControllerTest.php b/tests/Feature/Ratchet/EventsBatchControllerTest.php new file mode 100644 index 00000000..44fb2cbc --- /dev/null +++ b/tests/Feature/Ratchet/EventsBatchControllerTest.php @@ -0,0 +1,81 @@ +signedPostRequest('batch_events', [[ + 'name' => 'NewEvent', + 'channel' => 'test-channel', + 'data' => ['some' => 'data'], + ]])); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{}', $response->getBody()->getContents()); +}); + +it('can receive an event batch trigger with multiple events', function () { + $response = await($this->signedPostRequest('batch_events', [ + [ + 'name' => 'NewEvent', + 'channel' => 'test-channel', + 'data' => ['some' => 'data'], + ], + [ + 'name' => 'AnotherNewEvent', + 'channel' => 'test-channel-two', + 'data' => ['some' => ['more' => 'data']], + ], + ])); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{}', $response->getBody()->getContents()); +}); + +it('can receive an event batch trigger with multiple events and return info for each', function () { + $response = await($this->signedPostRequest('batch_events', [ + [ + 'name' => 'NewEvent', + 'channel' => 'test-channel', + 'data' => ['some' => 'data'], + 'info' => 'user_count', + ], + [ + 'name' => 'AnotherNewEvent', + 'channel' => 'test-channel-two', + 'data' => ['some' => ['more' => 'data']], + 'info' => 'subscription_count', + ], + [ + 'name' => 'YetAnotherNewEvent', + 'channel' => 'test-channel-three', + 'data' => ['some' => ['more' => 'data']], + 'info' => 'subscription_count,user_count', + ], + ])); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{"batch":[{"user_count":0},{"subscription_count":0},{"user_count":0,"subscription_count":0}]}', $response->getBody()->getContents()); +}); + +it('can receive an event batch trigger with multiple events and return info for some', function () { + $response = await($this->signedPostRequest('batch_events', [ + [ + 'name' => 'NewEvent', + 'channel' => 'test-channel', + 'data' => ['some' => 'data'], + 'info' => 'user_count', + ], + [ + 'name' => 'AnotherNewEvent', + 'channel' => 'test-channel-two', + 'data' => ['some' => ['more' => 'data']], + ], + ])); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{"batch":[{"user_count":0},[]]}', $response->getBody()->getContents()); +}); diff --git a/tests/Feature/Ratchet/EventsControllerTest.php b/tests/Feature/Ratchet/EventsControllerTest.php new file mode 100644 index 00000000..2be12c83 --- /dev/null +++ b/tests/Feature/Ratchet/EventsControllerTest.php @@ -0,0 +1,96 @@ +signedPostRequest('events', [ + 'name' => 'NewEvent', + 'channel' => 'test-channel', + 'data' => ['some' => 'data'], + ])); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{}', $response->getBody()->getContents()); +}); + +it('can receive and event trigger for multiple channels', function () { + $response = await($this->signedPostRequest('events', [ + 'name' => 'NewEvent', + 'channels' => ['test-channel-one', 'test-channel-two'], + 'data' => ['some' => 'data'], + ])); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{}', $response->getBody()->getContents()); +}); + +it('can return user counts when requested', function () { + $this->subscribe('test-channel-one'); + + $response = await($this->signedPostRequest('events', [ + 'name' => 'NewEvent', + 'channels' => ['test-channel-one', 'test-channel-two'], + 'data' => ['some' => 'data'], + 'info' => 'user_count', + ])); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{"channels":{"test-channel-one":{"user_count":1},"test-channel-two":{"user_count":0}}}', $response->getBody()->getContents()); +}); + +it('can return subscription counts when requested', function () { + $this->subscribe('test-channel-two'); + + $response = await($this->signedPostRequest('events', [ + 'name' => 'NewEvent', + 'channels' => ['test-channel-one', 'test-channel-two'], + 'data' => ['some' => 'data'], + 'info' => 'subscription_count', + ])); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{"channels":{"test-channel-one":{"subscription_count":0},"test-channel-two":{"subscription_count":1}}}', $response->getBody()->getContents()); +}); + +it('can return user and subscription counts when requested', function () { + $this->subscribe('test-channel-two'); + + $response = await($this->signedPostRequest('events', [ + 'name' => 'NewEvent', + 'channels' => ['test-channel-one', 'test-channel-two'], + 'data' => ['some' => 'data'], + 'info' => 'subscription_count,user_count', + ])); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{"channels":{"test-channel-one":{"user_count":0,"subscription_count":0},"test-channel-two":{"user_count":1,"subscription_count":1}}}', $response->getBody()->getContents()); +}); + +it('can ignore a subscriber', function () { + $connection = $this->connect(); + $this->subscribe('test-channel-two', connection: $connection); + + $promiseOne = $this->messagePromise($connection); + $response = await($this->signedPostRequest('events', [ + 'name' => 'NewEvent', + 'channels' => ['test-channel-one', 'test-channel-two'], + 'data' => ['some' => 'data'], + ])); + + $promiseTwo = $this->messagePromise($connection); + $response = await($this->signedPostRequest('events', [ + 'name' => 'NewEvent', + 'channels' => ['test-channel-one', 'test-channel-two'], + 'data' => ['some' => 'data'], + 'socket_id' => $this->managedConnection()->id(), + ])); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{}', $response->getBody()->getContents()); + expect(await($promiseOne))->toBe('{"event":"NewEvent","data":{"some":"data"},"channel":"test-channel-two"}'); + expect(await($promiseTwo))->toBeFalse(); +}); diff --git a/tests/Feature/Ratchet/UsersTerminateControllerTest.php b/tests/Feature/Ratchet/UsersTerminateControllerTest.php new file mode 100644 index 00000000..c1a8ab59 --- /dev/null +++ b/tests/Feature/Ratchet/UsersTerminateControllerTest.php @@ -0,0 +1,37 @@ +signedPostRequest('channels/users/not-a-user/terminate_connections')); +})->throws(ResponseException::class); + +it('unsubscribes from all channels and terminates a user', function () { + $connection = $this->connect(); + $this->subscribe('test-channel-one', connection: $connection); + $this->subscribe('test-channel-two', connection: $connection); + + $this->subscribe('test-channel-one'); + $this->subscribe('test-channel-two'); + + expect($connections = connectionManager()->all())->toHaveCount(3); + expect(channelManager()->connections(ChannelBroker::create('test-channel-one')))->toHaveCount(2); + expect(channelManager()->connections(ChannelBroker::create('test-channel-two')))->toHaveCount(2); + + $connection = Arr::first($connections); + + $response = await($this->signedPostRequest("users/{$connection->id()}/terminate_connections")); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame('{}', $response->getBody()->getContents()); + expect($connections = connectionManager()->all())->toHaveCount(2); + expect(channelManager()->connections(ChannelBroker::create('test-channel-one')))->toHaveCount(1); + expect(channelManager()->connections(ChannelBroker::create('test-channel-two')))->toHaveCount(1); +}); diff --git a/tests/RatchetTestCase.php b/tests/RatchetTestCase.php index 2bf162bc..96b2adef 100644 --- a/tests/RatchetTestCase.php +++ b/tests/RatchetTestCase.php @@ -3,6 +3,7 @@ namespace Laravel\Reverb\Tests; use Clue\React\Redis\Client; +use Illuminate\Support\Arr; use Illuminate\Support\Str; use Laravel\Reverb\Concerns\InteractsWithAsyncRedis; use Laravel\Reverb\Contracts\Connection; @@ -10,17 +11,19 @@ use Laravel\Reverb\Contracts\ServerProvider; use Laravel\Reverb\Event; use Laravel\Reverb\Loggers\NullLogger; -use Laravel\Reverb\Servers\Ratchet\Factory; +use Laravel\Reverb\Servers\Reverb\Factory; use Ratchet\Client\WebSocket; use React\Async\SimpleFiber; -use React\EventLoop\Factory as LoopFactory; +use React\EventLoop\Loop; use React\Http\Browser; use React\Promise\Deferred; use React\Promise\PromiseInterface; +use React\Promise\Timer\TimeoutException; use ReflectionObject; use function Ratchet\Client\connect; use function React\Async\await; +use function React\Promise\Timer\timeout; class RatchetTestCase extends TestCase { @@ -35,7 +38,7 @@ protected function setUp(): void parent::setUp(); $this->app->instance(Logger::class, new NullLogger); - $this->loop = LoopFactory::create(); + $this->loop = Loop::get(); $this->startServer(); } @@ -121,8 +124,7 @@ protected function resetFiber() public function stopServer() { if ($this->server) { - $this->loop->stop(); - $this->server->socket->close(); + $this->server->stop(); } } @@ -218,7 +220,7 @@ public function subscribe(string $channel, ?array $data = [], string $auth = nul */ public function managedConnection(): ?Connection { - return connectionManager()->all()->last(); + return Arr::last(connectionManager()->all()); } /** @@ -226,7 +228,7 @@ public function managedConnection(): ?Connection * * @param \Ratchet\Client\WebSocketWebSocket $connection */ - public function messagePromise(WebSocket $connection): PromiseInterface + public function messagePromise(WebSocket $connection) { $promise = new Deferred; @@ -234,7 +236,11 @@ public function messagePromise(WebSocket $connection): PromiseInterface $promise->resolve((string) $message); }); - return $promise->promise(); + return timeout($promise->promise(), 0.1, $this->loop) + ->then( + fn ($message) => $message, + fn (TimeoutException $error) => false + ); } /** @@ -268,21 +274,66 @@ public function triggerEvent(string $channel, string $event, array $data = []): $this->assertSame('{}', $response->getBody()->getContents()); } + public function request(string $path, string $method = 'GET', mixed $data = '', string $host = '0.0.0.0', string $port = '8080', string $appId = '123456'): PromiseInterface + { + return (new Browser($this->loop)) + ->request( + $method, + "http://{$host}:{$port}/apps/{$appId}/{$path}", + [], + ($data) ? json_encode($data) : '' + ); + } + + public function signedRequest(string $path, string $method = 'GET', mixed $data = '', string $host = '0.0.0.0', string $port = '8080', string $appId = '123456'): PromiseInterface + { + $hash = md5(json_encode($data)); + $timestamp = time(); + $query = "auth_key=pusher-key&auth_timestamp={$timestamp}&auth_version=1.0&body_md5={$hash}"; + $string = "POST\n/apps/{$appId}/{$path}\n$query"; + $signature = hash_hmac('sha256', $string, 'pusher-secret'); + $path = Str::contains($path, '?') ? "{$path}&{$query}" : "{$path}?{$query}"; + + return $this->request("{$path}&auth_signature={$signature}", $method, $data, $host, $port, $appId); + } + /** * Post a request to the server. */ - public function postToServer( + public function postReqeust(string $path, array $data = [], string $host = '0.0.0.0', string $port = '8080', string $appId = '123456'): PromiseInterface + { + return $this->request($path, 'POST', $data, $host, $port, $appId); + } + + /** + * Post a signed request to the server. + */ + public function signedPostRequest(string $path, array $data = [], string $host = '0.0.0.0', string $port = '8080', string $appId = '123456'): PromiseInterface + { + $hash = md5(json_encode($data)); + $timestamp = time(); + $query = "auth_key=pusher-key&auth_timestamp={$timestamp}&auth_version=1.0&body_md5={$hash}"; + $string = "POST\n/apps/{$appId}/{$path}\n$query"; + $signature = hash_hmac('sha256', $string, 'pusher-secret'); + + return $this->postReqeust("{$path}?{$query}&auth_signature={$signature}", $data, $host, $port, $appId); + } + + public function getWithSignature( string $path, array $data = [], string $host = '0.0.0.0', string $port = '8080', string $appId = '123456' ): PromiseInterface { - return (new Browser($this->loop)) - ->post( - "http://{$host}:{$port}/apps/{$appId}/{$path}", - [], - json_encode($data) - ); + $hash = md5(json_encode($data)); + $timestamp = time(); + $query = "auth_key=pusher-key&auth_timestamp={$timestamp}&auth_version=1.0&body_md5={$hash}"; + $string = "POST\n/apps/{$appId}/{$path}\n$query"; + $signature = hash_hmac('sha256', $string, 'pusher-secret'); + + $path = Str::contains($path, '?') ? "{$path}&{$query}" : "{$path}?{$query}"; + + return $this->request("{$path}&auth_signature={$signature}", 'GET', '', $host, $port, $appId); } } diff --git a/tests/Unit/EventTest.php b/tests/Unit/EventTest.php index c812959a..401836b0 100644 --- a/tests/Unit/EventTest.php +++ b/tests/Unit/EventTest.php @@ -6,7 +6,6 @@ use Laravel\Reverb\Contracts\ChannelManager; use Laravel\Reverb\Contracts\ServerProvider; use Laravel\Reverb\Event; -use Laravel\Reverb\Managers\Connections; it('can publish an event when enabled', function () { $app = app(ApplicationProvider::class)->findByKey('pusher-key'); @@ -25,9 +24,21 @@ $channelManager->shouldReceive('for') ->andReturn($channelManager); $channelManager->shouldReceive('connections')->once() - ->andReturn(Connections::make()); + ->andReturn([]); $this->app->bind(ChannelManager::class, fn () => $channelManager); Event::dispatch(app(ApplicationProvider::class)->findByKey('pusher-key'), ['channel' => 'test-channel']); }); + +it('can broadcast an event for multiple channels', function () { + $channelManager = Mockery::mock(ChannelManager::class); + $channelManager->shouldReceive('for') + ->andReturn($channelManager); + $channelManager->shouldReceive('connections')->twice() + ->andReturn([]); + + $this->app->bind(ChannelManager::class, fn () => $channelManager); + + Event::dispatch(app(ApplicationProvider::class)->findByKey('pusher-key'), ['channels' => ['test-channel-one', 'test-channel-two']]); +});