Skip to content

Commit

Permalink
Stores only used channels (#32)
Browse files Browse the repository at this point in the history
* wip

* keep channels lean

* formatting
  • Loading branch information
joedixon authored Dec 6, 2023
1 parent 1fc723b commit af6ed68
Show file tree
Hide file tree
Showing 15 changed files with 110 additions and 41 deletions.
7 changes: 6 additions & 1 deletion src/Contracts/ChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ public function all(): array;
/**
* Find the given channel.
*/
public function find(string $channel): Channel;
public function find(string $channel): ?Channel;

/**
* Find the given channel or create it if it doesn't exist.
*/
public function findOrCreate(string $channel): Channel;

/**
* Get all the connections for the given channels.
Expand Down
5 changes: 4 additions & 1 deletion src/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ public static function dispatchSynchronously(Application $app, array $payload, ?

foreach ($channels as $channel) {
unset($payload['channels']);
$channel = app(ChannelManager::class)->for($app)->find($channel);
if (! $channel = app(ChannelManager::class)->for($app)->find($channel)) {
continue;
}

$payload['channel'] = $channel->name();

$channel->broadcast($payload, $connection);
Expand Down
26 changes: 19 additions & 7 deletions src/Managers/ArrayChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,27 @@ public function all(): array
/**
* Find the given channel
*/
public function find(string $channel): Channel
public function find(string $channel): ?Channel
{
return $this->channels($channel);
}

/**
* Find the given channel or create it if it doesn't exist.
*/
public function findOrCreate(string $channelName): Channel
{
if ($channel = $this->find($channelName)) {
return $channel;
}

$channel = ChannelBroker::create($channelName);

$this->applications[$this->application->id()][$channel->name()] = $channel;

return $channel;
}

/**
* Get all the connections for the given channels.
*
Expand Down Expand Up @@ -98,18 +114,14 @@ public function channel(string $channel): Channel
*
* @return \Laravel\Reverb\Channels\Channel|array<string, \Laravel\Reverb\Channels\Channel>
*/
public function channels(?string $channel = null): Channel|array
public function channels(?string $channel = null): Channel|array|null
{
if (! isset($this->applications[$this->application->id()])) {
$this->applications[$this->application->id()] = [];
}

if ($channel) {
if (! isset($this->applications[$this->application->id()][$channel])) {
$this->applications[$this->application->id()][$channel] = ChannelBroker::create($channel);
}

return $this->applications[$this->application->id()][$channel];
return $this->applications[$this->application->id()][$channel] ?? null;
}

return $this->applications[$this->application->id()];
Expand Down
33 changes: 22 additions & 11 deletions src/Managers/CacheChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,28 @@ public function all(): array
/**
* Find the given channel
*/
public function find(string $channel): Channel
public function find(string $channel): ?Channel
{
return $this->channels($channel);
}

/**
* Find the given channel or create it if it doesn't exist.
*/
public function findOrCreate(string $channelName): Channel
{
if ($channel = $this->channels($channelName)) {
return $channel;
}

$channels = $this->repository->get($this->prefix, []);
$channel = ChannelBroker::create($channelName);
$channels[$this->application->id()][$channel->name()] = serialize($channel);
$this->repository->forever($this->prefix, $channels);

return $channel;
}

/**
* Get all the connections for the given channels.
*
Expand Down Expand Up @@ -102,7 +119,7 @@ public function channel(string $channel): Channel
*
* @return \Laravel\Reverb\Channels\Channel|array<string, \Laravel\Reverb\Channels\Channel>
*/
public function channels(?string $channel = null): Channel|array
public function channels(?string $channel = null): Channel|array|null
{
$channels = $this->repository->get($this->prefix, []);

Expand All @@ -111,15 +128,9 @@ public function channels(?string $channel = null): Channel|array
}

if ($channel) {
if (! isset($channels[$this->application->id()][$channel])) {
$channel = ChannelBroker::create($channel);
$channels[$this->application->id()][$channel->name()] = serialize($channel);
$this->repository->forever($this->prefix, $channels);

return $channel;
}

return unserialize($channels[$this->application->id()][$channel]);
return isset($channels[$this->application->id()][$channel])
? unserialize($channels[$this->application->id()][$channel])
: null;
}

return array_map('unserialize', $channels[$this->application->id()] ?: []);
Expand Down
26 changes: 21 additions & 5 deletions src/Pusher/Concerns/InteractsWithChannelInformation.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,37 @@ protected function infoForChannels(array $channels, string $info): array
protected function info(string $channel, string $info): array
{
$info = explode(',', $info);
$channel = app(ChannelManager::class)->find($channel);

if (! $channel = app(ChannelManager::class)->find($channel)) {
return [];
}
return array_filter(
$channel ? $this->occupiedInfo($channel, $info) : $this->unoccupiedInfo($info),
fn ($item) => $item !== null
);
}

/**
* Get the channel information for the given occupied channel.
*/
protected function occupiedInfo(Channel $channel, array $info): array
{
$count = count($channel->connections());

$info = [
return [
'occupied' => in_array('occupied', $info) ? $count > 0 : null,
'user_count' => in_array('user_count', $info) && $this->isPresenceChannel($channel) ? $count : null,
'subscription_count' => in_array('subscription_count', $info) && ! $this->isPresenceChannel($channel) ? $count : null,
'cache' => in_array('cache', $info) && $this->isCacheChannel($channel) ? $channel->cachedPayload() : null,
];
}

return array_filter($info, fn ($item) => $item !== null);
/**
* Get the channel information for the given unoccupied channel.
*/
protected function unoccupiedInfo(array $info): array
{
return [
'occupied' => in_array('occupied', $info) ? false : null,
];
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/Pusher/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public function subscribe(Connection $connection, string $channel, ?string $auth
{
$channel = $this->channels
->for($connection->app())
->find($channel);
->findOrCreate($channel);

$channel->subscribe($connection, $auth, $data);

Expand All @@ -69,7 +69,7 @@ public function unsubscribe(Connection $connection, string $channel): void
$channel = $this->channels
->for($connection->app())
->find($channel)
->unsubscribe($connection);
?->unsubscribe($connection);
}

/**
Expand Down
6 changes: 5 additions & 1 deletion src/Pusher/Http/Controllers/ChannelUsersController.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public function __invoke(RequestInterface $request, Connection $connection, stri

$channel = $this->channels->find($channel);

if (! $channel) {
return new JsonResponse((object) [], 404);
}

if (! $this->isPresenceChannel($channel)) {
return new JsonResponse((object) [], 400);
}
Expand All @@ -30,6 +34,6 @@ public function __invoke(RequestInterface $request, Connection $connection, stri
->map(fn ($data) => ['id' => $data['user_id']])
->values();

return new JsonResponse((object) ['users' => $connections]);
return new JsonResponse(['users' => $connections]);
}
}
2 changes: 1 addition & 1 deletion src/Pusher/Http/Controllers/EventsBatchController.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public function __invoke(RequestInterface $request, Connection $connection, stri

if ($info->some(fn ($item) => count($item) > 0)) {
return new JsonResponse(
['batch' => $info->each(fn ($item) => (object) $item)->all()]
['batch' => $info->map(fn ($item) => (object) $item)->all()]
);
}

Expand Down
2 changes: 1 addition & 1 deletion tests/Feature/Reverb/ChannelControllerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
$response = await($this->signedRequest('channels/test-channel-one?info=user_count,subscription_count,cache'));

expect($response->getStatusCode())->toBe(200);
expect($response->getBody()->getContents())->toBe('{"occupied":false,"subscription_count":0}');
expect($response->getBody()->getContents())->toBe('{"occupied":false}');
});

it('can return cache channel attributes', function () {
Expand Down
7 changes: 6 additions & 1 deletion tests/Feature/Reverb/ChannelUsersControllerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@
uses(ReverbTestCase::class);

it('returns an error when presence channel not provided', function () {
subscribe('test-channel');
await($this->signedRequest('channels/test-channel/users'));
})->throws(ResponseException::class);

it('returns an error when unoccupied channel provided', function () {
await($this->signedRequest('channels/presence-test-channel/users'));
})->throws(ResponseException::class);

it('returns the user data', function () {
$channel = app(ChannelManager::class)
->for(app()->make(ApplicationProvider::class)->findByKey('pusher-key'))
->find('presence-test-channel');
->findOrCreate('presence-test-channel');
$channel->subscribe($connection = new FakeConnection('test-connection-one'), validAuth($connection->id(), 'presence-test-channel', $data = json_encode(['user_id' => 1, 'user_info' => ['name' => 'Taylor']])), $data);
$channel->subscribe($connection = new FakeConnection('test-connection-two'), validAuth($connection->id(), 'presence-test-channel', $data = json_encode(['user_id' => 2, 'user_info' => ['name' => 'Joe']])), $data);
$channel->subscribe($connection = new FakeConnection('test-connection-three'), validAuth($connection->id(), 'presence-test-channel', $data = json_encode(['user_id' => 3, 'user_info' => ['name' => 'Jess']])), $data);
Expand Down
8 changes: 6 additions & 2 deletions tests/Feature/Reverb/EventsBatchControllerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
});

it('can receive an event batch trigger with multiple events and return info for each', function () {
subscribe('presence-test-channel');
subscribe('test-channel-two');
subscribe('test-channel-three');
$response = await($this->signedPostRequest('batch_events', ['batch' => [
[
'name' => 'NewEvent',
Expand All @@ -60,10 +63,11 @@
]]));

expect($response->getStatusCode())->toBe(200);
expect($response->getBody()->getContents())->toBe('{"batch":[{"user_count":0},{"subscription_count":0},{"subscription_count":0}]}');
expect($response->getBody()->getContents())->toBe('{"batch":[{"user_count":1},{"subscription_count":1},{"subscription_count":1}]}');
});

it('can receive an event batch trigger with multiple events and return info for some', function () {
subscribe('presence-test-channel');
$response = await($this->signedPostRequest('batch_events', ['batch' => [
[
'name' => 'NewEvent',
Expand All @@ -79,5 +83,5 @@
]]));

expect($response->getStatusCode())->toBe(200);
expect($response->getBody()->getContents())->toBe('{"batch":[{"user_count":0},[]]}');
expect($response->getBody()->getContents())->toBe('{"batch":[{"user_count":1},{}]}');
});
5 changes: 4 additions & 1 deletion tests/Feature/Reverb/ServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@

it('can receive a cached message when joining a cache channel', function () {
$connection = connect();
subscribe('cache-test-channel');

$this->triggerEvent(
'cache-test-channel',
Expand All @@ -112,6 +113,7 @@

it('can receive a cached message when joining a private cache channel', function () {
$connection = connect();
subscribe('private-cache-test-channel');

$this->triggerEvent(
'private-cache-test-channel',
Expand All @@ -126,6 +128,7 @@

it('can receive a cached message when joining a presence cache channel', function () {
$connection = connect();
subscribe('presence-cache-test-channel');

$this->triggerEvent(
'presence-cache-test-channel',
Expand Down Expand Up @@ -224,7 +227,7 @@

(new PruneStaleConnections)->handle(channels());

expect(channels()->find('test-channel')->connections())->toHaveCount(0);
expect(channels()->find('test-channel'))->toBeNull();

$connection->assertReceived('{"event":"pusher:ping"}');
$connection->assertReceived('{"event":"pusher:error","data":"{\"code\":4201,\"message\":\"Pong reply not received in time\"}"}');
Expand Down
1 change: 1 addition & 0 deletions tests/Unit/ClientEventTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
$this->channelConnectionManager->shouldReceive('for')
->andReturn($this->channelConnectionManager);
$this->app->instance(ChannelConnectionManager::class, $this->channelConnectionManager);
channels()->findOrCreate('test-channel');
});

it('can forward a client message', function () {
Expand Down
5 changes: 5 additions & 0 deletions tests/Unit/EventTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

$this->app->instance(ChannelConnectionManager::class, $channelConnectionManager);

channels()->findOrCreate('test-channel');

Event::dispatch(app(ApplicationProvider::class)->findByKey('pusher-key'), ['channel' => 'test-channel']);
});

Expand All @@ -39,5 +41,8 @@

$this->app->instance(ChannelConnectionManager::class, $channelConnectionManager);

channels()->findOrCreate('test-channel-one');
channels()->findOrCreate('test-channel-two');

Event::dispatch(app(ApplicationProvider::class)->findByKey('pusher-key'), ['channels' => ['test-channel-one', 'test-channel-two']]);
});
14 changes: 7 additions & 7 deletions tests/Unit/Managers/ChannelManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
$this->connection = new FakeConnection;
$this->channelManager = $this->app->make(ChannelManager::class)
->for($this->connection->app());
$this->channel = $this->channelManager->find('test-channel-0');
$this->channel = $this->channelManager->findOrCreate('test-channel-0');
});

it('can subscribe to a channel', function () {
Expand All @@ -29,7 +29,7 @@
it('can get all channels', function () {
$channels = collect(['test-channel-1', 'test-channel-2', 'test-channel-3']);

$channels->each(fn ($channel) => $this->channelManager->find($channel)->subscribe($this->connection));
$channels->each(fn ($channel) => $this->channelManager->findOrCreate($channel)->subscribe($this->connection));

foreach ($this->channelManager->all() as $index => $channel) {
expect($channel->name())->toBe($index);
Expand All @@ -46,10 +46,10 @@
->toBeIn(array_keys($this->channel->connections())));
});

it('can unsubscribe a connection for all channels', function () {
it('can unsubscribe a connection from all channels', function () {
$channels = collect(['test-channel-0', 'test-channel-1', 'test-channel-2']);

$channels->each(fn ($channel) => $this->channelManager->find($channel)->subscribe($this->connection));
$channels->each(fn ($channel) => $this->channelManager->findOrCreate($channel)->subscribe($this->connection));

collect($this->channelManager->all())->each(fn ($channel) => expect($channel->connections())->toHaveCount(1));

Expand All @@ -72,9 +72,9 @@
it('can get all connections for all channels', function () {
$connections = factory(12);

$channelOne = $this->channelManager->find('test-channel-0');
$channelTwo = $this->channelManager->find('test-channel-1');
$channelThree = $this->channelManager->find('test-channel-2');
$channelOne = $this->channelManager->findOrCreate('test-channel-0');
$channelTwo = $this->channelManager->findOrCreate('test-channel-1');
$channelThree = $this->channelManager->findOrCreate('test-channel-2');

$connections = collect($connections)->split(3);

Expand Down

0 comments on commit af6ed68

Please sign in to comment.