Skip to content

Commit

Permalink
handle control messages
Browse files Browse the repository at this point in the history
  • Loading branch information
joedixon committed Nov 21, 2023
1 parent beae7d1 commit fc6ff02
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 1 deletion.
15 changes: 14 additions & 1 deletion src/WebSockets/WsConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
use Evenement\EventEmitter;
use Laravel\Reverb\Http\Connection;
use Ratchet\RFC6455\Messaging\CloseFrameChecker;
use Ratchet\RFC6455\Messaging\Frame;
use Ratchet\RFC6455\Messaging\FrameInterface;
use Ratchet\RFC6455\Messaging\Message;
use Ratchet\RFC6455\Messaging\MessageBuffer;

Expand All @@ -17,7 +19,7 @@ public function __construct(public Connection $connection)
$this->buffer = new MessageBuffer(
new CloseFrameChecker,
onMessage: fn (Message $message) => $this->emit('message', [$message->getPayload()]),
onControl: fn () => $this->close(),
onControl: fn (FrameInterface $message) => $this->control($message),
sender: [$connection, 'send']
);

Expand All @@ -33,6 +35,17 @@ public function send(string $message): void
$this->buffer->sendMessage($message);
}

/**
* Handle control frames.
*/
public function control(FrameInterface $message): void
{
match ($message->getOpcode()) {
Frame::OP_PING => $this->send(new Frame('pong', opcode: Frame::OP_PONG)),
Frame::OP_CLOSE => $this->close(),
};
}

/**
* Close the connection.
*/
Expand Down
8 changes: 8 additions & 0 deletions tests/Feature/Reverb/ServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Laravel\Reverb\Jobs\PingInactiveConnections;
use Laravel\Reverb\Jobs\PruneStaleConnections;
use Laravel\Reverb\Tests\ReverbTestCase;
use Ratchet\RFC6455\Messaging\Frame;
use React\Promise\Deferred;

use function Ratchet\Client\connect;
Expand Down Expand Up @@ -314,6 +315,13 @@
$this->connect();
});

it('can receive a pong control frame', function () {
$frame = new Frame('ping', true, Frame::OP_PING);
$response = $this->sendRaw($frame);

expect($response)->toBe('pong');
});

it('clears application state between requests', function () {
$this->subscribe('test-channel');

Expand Down
18 changes: 18 additions & 0 deletions tests/ReverbTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,24 @@ public function send(array $message, WebSocket $connection = null): string
return await($promise->promise());
}

/**
* Send a message to the connected client.
*/
public function sendRaw(mixed $message, WebSocket $connection = null): string
{
$promise = new Deferred;

$connection = $connection ?: $this->connect();

$connection->on('message', function ($message) use ($promise) {
$promise->resolve((string) $message);
});

$connection->send($message);

return await($promise->promise());
}

/**
* Disconnect the connected client.
*/
Expand Down

0 comments on commit fc6ff02

Please sign in to comment.