Skip to content

Commit

Permalink
Adds Pusher routes (#16)
Browse files Browse the repository at this point in the history
* close with message

* ensure app exists

* handle event trigger

* scaffold routes

* implement events controller

* update phpunit config

* implement batch events endpoint

* implement channels route

* wip

* add channels test

* use correct id

* wip

* revert

* add channel endpoint

* Fix code styling

* implements users routes

* implement terminate user endpoint

* formatting
  • Loading branch information
joedixon authored Nov 19, 2023
1 parent 2300297 commit c69b7b4
Show file tree
Hide file tree
Showing 29 changed files with 835 additions and 82 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
"pestphp/pest": "^2.0",
"phpstan/phpstan": "^1.10",
"ratchet/pawl": "^0.4.1",
"react/async": "^4.0"
"react/async": "^4.0",
"react/http": "^1.9"
},
"autoload": {
"psr-4": {
Expand Down
45 changes: 23 additions & 22 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" colors="true" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/10.1/phpunit.xsd">
<testsuites>
<testsuite name="Unit">
<directory suffix="Test.php">./tests/Unit</directory>
</testsuite>
<testsuite name="Feature">
<directory suffix="Test.php">./tests/Feature</directory>
</testsuite>
</testsuites>
<coverage>
<include>
<directory suffix=".php">./src</directory>
</include>
</coverage>
<php>
<env name="APP_KEY" value="base64:uz4B1RtFO57QGzbZX1kRYX9hIRB50+QzqFeg9zbFJlY="/>
<env name="PUSHER_APP_ID" value="123456"/>
<env name="PUSHER_APP_KEY" value="pusher-key"/>
<env name="PUSHER_APP_SECRET" value="pusher-secret"/>
<env name="REVERB_API_GATEWAY_CONNECTION_CACHE" value="redis"/>
</php>
</phpunit>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" colors="true" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/10.4/phpunit.xsd">
<testsuites>
<testsuite name="Unit">
<directory suffix="Test.php">./tests/Unit</directory>
</testsuite>
<testsuite name="Feature">
<directory suffix="Test.php">./tests/Feature</directory>
</testsuite>
</testsuites>
<coverage/>
<php>
<env name="APP_KEY" value="base64:uz4B1RtFO57QGzbZX1kRYX9hIRB50+QzqFeg9zbFJlY="/>
<env name="PUSHER_APP_ID" value="123456"/>
<env name="PUSHER_APP_KEY" value="pusher-key"/>
<env name="PUSHER_APP_SECRET" value="pusher-secret"/>
<env name="REVERB_API_GATEWAY_CONNECTION_CACHE" value="redis"/>
</php>
<source>
<include>
<directory suffix=".php">./src</directory>
</include>
</source>
</phpunit>
23 changes: 0 additions & 23 deletions phpunit.xml.dist.bak

This file was deleted.

4 changes: 2 additions & 2 deletions src/Channels/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ public function broadcast(Application $app, array $payload, Connection $except =
{
collect(App::make(ChannelManager::class)->for($app)->connections($this))
->each(function ($connection) use ($payload, $except) {
if ($except && $except->identifier() === $connection->identifier()) {
if ($except && $except->id() === $connection->id()) {
return;
}

if (isset($payload['except']) && $payload['except'] === $connection->identifier()) {
if (isset($payload['except']) && $payload['except'] === $connection->id()) {
return;
}

Expand Down
2 changes: 1 addition & 1 deletion src/ClientEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static function whisper(Connection $connection, array $payload): void
{
Event::dispatch(
$connection->app(),
$payload + ['except' => $connection->identifier()],
$payload + ['except' => $connection->id()],
$connection
);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Concerns/ClosesConnections.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ trait ClosesConnections
/**
* Close the connection.
*/
protected function close(Connection $connection, int $statusCode = 400, array $headers = []): void
protected function close(Connection $connection, int $statusCode = 400, string $message = '', array $headers = []): void
{
$response = new Response($statusCode, $headers);
$response = new Response($statusCode, $headers, $message);

$connection->send(Message::toString($response));
$connection->close();
Expand Down
2 changes: 1 addition & 1 deletion src/Contracts/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public function disconnect(): void

App::make(ConnectionManager::class)
->for($this->app())
->disconnect($this->identifier());
->disconnect($this->id());

$this->terminate();
}
Expand Down
5 changes: 4 additions & 1 deletion src/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Laravel\Reverb;

use Illuminate\Support\Arr;
use Illuminate\Support\Facades\App;
use Laravel\Reverb\Channels\ChannelBroker;
use Laravel\Reverb\Contracts\Connection;
Expand Down Expand Up @@ -33,10 +34,12 @@ public static function dispatch(Application $app, array $payload, Connection $co
*/
public static function dispatchSynchronously(Application $app, array $payload, Connection $connection = null): void
{
$channels = isset($payload['channel']) ? [$payload['channel']] : $payload['channels'];
$channels = Arr::wrap($payload['channels'] ?? $payload['channel'] ?? []);

foreach ($channels as $channel) {
unset($payload['channels']);
$channel = ChannelBroker::create($channel);
$payload['channel'] = $channel->name();

$channel->broadcast($app, $payload, $connection);
}
Expand Down
12 changes: 9 additions & 3 deletions src/Http/Router.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,18 @@ public function dispatch(RequestInterface $request, Connection $connection): mix
try {
$route = $this->matcher->match($uri->getPath());
} catch (MethodNotAllowedException $e) {
return $this->close($connection, 405, ['Allow' => $e->getAllowedMethods()]);
return $this->close($connection, 405, 'Method now allowed', ['Allow' => $e->getAllowedMethods()]);
} catch (ResourceNotFoundException $e) {
return $this->close($connection, 404);
return $this->close($connection, 404, 'Not found.');
}

return $route['_controller']($request, $connection, ...Arr::except($route, ['_controller', '_route']));
$response = $route['_controller']($request, $connection, ...Arr::except($route, ['_controller', '_route']));

if (! $this->isWebSocketRequest($request)) {
return $connection->send($response)->close();
}

return null;
}

/**
Expand Down
11 changes: 10 additions & 1 deletion src/Http/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ public function start(): void
$this->loop->run();
}

/**
* Stop the Http server
*/
public function stop(): void
{
$this->loop->stop();
$this->socket->close();
}

/**
* Handle an incoming request.
*/
Expand All @@ -71,7 +80,7 @@ protected function createRequest(string $message, Connection $connection): Reque
try {
return Request::from($message, $connection);
} catch (OverflowException $e) {
$this->close($connection, 413);
$this->close($connection, 413, 'Payload too large.');
}
}
}
12 changes: 8 additions & 4 deletions src/Managers/ChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ public function app(): ?Application
*/
public function subscribe(Channel $channel, Connection $connection, $data = []): void
{
$this->connections[$this->application->id()][$channel->name()][$connection->identifier()] = $connection;
$this->connections[$this->application->id()][$channel->name()][$connection->id()] = $connection;
}

/**
* Unsubscribe from a channel.
*/
public function unsubscribe(Channel $channel, Connection $connection): void
{
unset($this->connections[$this->application->id()][$channel->name()][$connection->identifier()]);
unset($this->connections[$this->application->id()][$channel->name()][$connection->id()]);
}

/**
Expand Down Expand Up @@ -89,16 +89,20 @@ public function connections(Channel $channel): array
/**
* Get the given channel from the cache.
*/
protected function channel(Channel $channel): Collection
public function channel(Channel $channel): Collection
{
return $this->channels($channel);
}

/**
* Get the channels from the cache.
*/
protected function channels(Channel $channel = null): Collection
public function channels(Channel $channel = null): Collection
{
if (! isset($this->connections[$this->application->id()])) {
$this->connections[$this->application->id()] = [];
}

$channels = $this->connections[$this->application->id()];

if ($channel) {
Expand Down
2 changes: 1 addition & 1 deletion src/Managers/ConnectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public function all(): array
*/
public function save(Connection $connection): void
{
$this->connections[$this->application->id()][$connection->identifier()] = $connection;
$this->connections[$this->application->id()][$connection->id()] = $connection;
}

/**
Expand Down
29 changes: 29 additions & 0 deletions src/Pusher/Http/Controllers/ChannelController.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

namespace Laravel\Reverb\Pusher\Http\Controllers;

use Laravel\Reverb\Channels\ChannelBroker;
use Laravel\Reverb\Http\Connection;
use Psr\Http\Message\RequestInterface;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Response;

class ChannelController extends Controller
{
/**
* Handle the request.
*/
public function handle(RequestInterface $request, Connection $connection, ...$args): Response
{
$info = explode(',', $this->query['info'] ?? '');
$connections = $this->channels->channel(ChannelBroker::create($args['channel']));
$totalConnections = count($connections);

return new JsonResponse((object) array_filter([
'occupied' => $totalConnections > 0,
'user_count' => in_array('user_count', $info) ? $totalConnections : null,
'subscription_count' => in_array('subscription_count', $info) ? $totalConnections : null,
'cache' => in_array('cache', $info) ? '{}' : null,
], fn ($item) => $item !== null));
}
}
27 changes: 27 additions & 0 deletions src/Pusher/Http/Controllers/ChannelUsersController.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace Laravel\Reverb\Pusher\Http\Controllers;

use Laravel\Reverb\Channels\ChannelBroker;
use Laravel\Reverb\Channels\PresenceChannel;
use Laravel\Reverb\Http\Connection;
use Psr\Http\Message\RequestInterface;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Response;

class ChannelUsersController extends Controller
{
/**
* Handle the request.
*/
public function handle(RequestInterface $request, Connection $connection, ...$args): Response
{
$channel = ChannelBroker::create($args['channel']);

if (! $channel instanceof PresenceChannel) {
return new JsonResponse((object) [], 400);
}

return new JsonResponse((object) []);
}
}
31 changes: 31 additions & 0 deletions src/Pusher/Http/Controllers/ChannelsController.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

namespace Laravel\Reverb\Pusher\Http\Controllers;

use Illuminate\Support\Str;
use Laravel\Reverb\Http\Connection;
use Psr\Http\Message\RequestInterface;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Response;

class ChannelsController extends Controller
{
/**
* Handle the request.
*/
public function handle(RequestInterface $request, Connection $connection, ...$args): Response
{
$channels = $this->channels->channels();
$info = explode(',', $this->query['info'] ?? '');

if (isset($this->query['filter_by_prefix'])) {
$channels = $channels->filter(fn ($connections, $name) => Str::startsWith($name, $this->query['filter_by_prefix']));
}

$channels = $channels->mapWithKeys(function ($connections, $name) use ($info) {
return [$name => array_filter(['user_count' => in_array('user_count', $info) ? count($connections) : null])];
});

return new JsonResponse((object) ['channels' => $channels]);
}
}
Loading

0 comments on commit c69b7b4

Please sign in to comment.