Skip to content

Commit

Permalink
Add WebsocketCompressionNegotiator
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Oct 8, 2023
1 parent ec8a6f5 commit c035faf
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 19 deletions.
18 changes: 2 additions & 16 deletions src/Rfc6455ClientFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Amp\Http\Server\Request;
use Amp\Http\Server\Response;
use Amp\Socket\Socket;
use Amp\Websocket\Compression\WebsocketCompressionContextFactory;
use Amp\Websocket\Compression\WebsocketCompressionContext;
use Amp\Websocket\ConstantRateLimit;
use Amp\Websocket\Parser\Rfc6455ParserFactory;
use Amp\Websocket\Parser\WebsocketParserFactory;
Expand All @@ -24,12 +24,10 @@ final class Rfc6455ClientFactory implements WebsocketClientFactory
use ForbidSerialization;

/**
* @param WebsocketCompressionContextFactory|null $compressionContextFactory Use null to disable compression.
* @param WebsocketHeartbeatQueue|null $heartbeatQueue Use null to disable automatic heartbeats (pings).
* @param WebsocketRateLimit|null $rateLimit Use null to disable client rate limits.
*/
public function __construct(
private readonly ?WebsocketCompressionContextFactory $compressionContextFactory = null,
private readonly ?WebsocketHeartbeatQueue $heartbeatQueue = new PeriodicHeartbeatQueue(),
private readonly ?WebsocketRateLimit $rateLimit = new ConstantRateLimit(),
private readonly WebsocketParserFactory $parserFactory = new Rfc6455ParserFactory(),
Expand All @@ -42,6 +40,7 @@ public function createClient(
Request $request,
Response $response,
Socket $socket,
?WebsocketCompressionContext $compressionContext,
): WebsocketClient {
if ($socket instanceof ResourceStream) {
$socketResource = $socket->getResource();
Expand All @@ -64,19 +63,6 @@ public function createClient(
}
}

$compressionContext = null;
if ($this->compressionContextFactory) {
$extensions = \array_map('trim', \explode(',', (string) $request->getHeader('sec-websocket-extensions')));

foreach ($extensions as $extension) {
if ($compressionContext = $this->compressionContextFactory->fromClientHeader($extension, $headerLine)) {
/** @psalm-suppress PossiblyNullArgument */
$response->setHeader('sec-websocket-extensions', $headerLine);
break;
}
}
}

return new Rfc6455Client(
socket: $socket,
masked: false,
Expand Down
33 changes: 33 additions & 0 deletions src/Rfc6455CompressionNegotiator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

namespace Amp\Websocket\Server;

use Amp\Http;
use Amp\Http\Server\Request;
use Amp\Http\Server\Response;
use Amp\Websocket\Compression\WebsocketCompressionContext;
use Amp\Websocket\Compression\WebsocketCompressionContextFactory;

class Rfc6455CompressionNegotiator implements WebsocketCompressionNegotiator
{
public function __construct(private readonly WebsocketCompressionContextFactory $compressionContextFactory)
{
}

public function negotiateCompression(Request $request, Response $response): ?WebsocketCompressionContext
{
if ($this->compressionContextFactory) {
$extensions = Http\splitHeader($request, 'sec-websocket-extensions') ?? [];
foreach ($extensions as $extension) {
if ($compressionContext = $this->compressionContextFactory->fromClientHeader($extension, $headerLine)) {
/** @psalm-suppress PossiblyNullArgument */
$response->setHeader('sec-websocket-extensions', $headerLine);

return $compressionContext;
}
}
}

return null;
}
}
8 changes: 7 additions & 1 deletion src/Websocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Amp\Http\Server\Request;
use Amp\Http\Server\RequestHandler;
use Amp\Http\Server\Response;
use Amp\Websocket\Compression\WebsocketCompressionContext;
use Amp\Websocket\WebsocketClient;
use Amp\Websocket\WebsocketCloseCode;
use Amp\Websocket\WebsocketClosedException;
Expand All @@ -33,6 +34,7 @@ public function __construct(
private readonly WebsocketAcceptor $acceptor,
private readonly WebsocketClientHandler $clientHandler,
private readonly WebsocketClientFactory $clientFactory = new Rfc6455ClientFactory(),
private readonly ?WebsocketCompressionNegotiator $compressionNegotiator = null,
) {
/** @psalm-suppress PropertyTypeCoercion */
$this->clients = new \WeakMap();
Expand All @@ -51,10 +53,13 @@ public function handleRequest(Request $request): Response
return $response;
}

$compressionContext = $this->compressionNegotiator->negotiateCompression($request, $response);

$response->upgrade(fn (UpgradedSocket $socket) => $this->reapClient(
socket: $socket,
request: $request,
response: $response,
compressionContext: $compressionContext,
));

return $response;
Expand All @@ -64,8 +69,9 @@ private function reapClient(
UpgradedSocket $socket,
Request $request,
Response $response,
?WebsocketCompressionContext $compressionContext,
): void {
$client = $this->clientFactory->createClient($request, $response, $socket);
$client = $this->clientFactory->createClient($request, $response, $socket, $compressionContext);

/** @psalm-suppress RedundantCondition */
\assert($this->logger->debug(\sprintf(
Expand Down
10 changes: 8 additions & 2 deletions src/WebsocketClientFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@
use Amp\Http\Server\Request;
use Amp\Http\Server\Response;
use Amp\Socket\Socket;
use Amp\Websocket\Compression\WebsocketCompressionContext;
use Amp\Websocket\WebsocketClient;

interface WebsocketClientFactory
{
/**
* Creates a Client object based on the given Request.
* Creates a {@see WebsocketClient} object based on the given Request.
*/
public function createClient(Request $request, Response $response, Socket $socket): WebsocketClient;
public function createClient(
Request $request,
Response $response,
Socket $socket,
?WebsocketCompressionContext $compressionContext,
): WebsocketClient;
}
12 changes: 12 additions & 0 deletions src/WebsocketCompressionNegotiator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

namespace Amp\Websocket\Server;

use Amp\Http\Server\Request;
use Amp\Http\Server\Response;
use Amp\Websocket\Compression\WebsocketCompressionContext;

interface WebsocketCompressionNegotiator
{
public function negotiateCompression(Request $request, Response $response): ?WebsocketCompressionContext;
}

0 comments on commit c035faf

Please sign in to comment.