Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
joedixon committed Nov 15, 2023
1 parent b4a3b60 commit 9eec38d
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 43 deletions.
63 changes: 63 additions & 0 deletions src/Conn.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

namespace Laravel\Reverb;

use BadMethodCallException;
use React\Socket\ConnectionInterface;

class Conn
{
protected $id;

protected $initilized = false;

protected $buffer = '';

public function __construct(protected ConnectionInterface $connection)
{
$this->id = (int) $connection->stream;
}

public function id()
{
return $this->id;
}

public function initialize()
{
$this->initilized = true;
}

public function isInitialized()
{
return $this->initilized;
}

public function hasBuffer()
{
return $this->buffer !== '';
}

public function send($data)
{
$this->connection->write($data);

return $this;
}

public function close()
{
$this->connection->end();

return $this;
}

public function __call($method, $parameters)
{
if (! method_exists($this->connection, $method)) {
throw new BadMethodCallException("Method [{$method}] does not exist on [".get_class($this->connection).'].');
}

return $this->connection->{$method}(...$parameters);
}
}
110 changes: 110 additions & 0 deletions src/HttpServer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
<?php

namespace Laravel\Reverb;

use GuzzleHttp\Psr7\Message;
use GuzzleHttp\Psr7\Response;
use Laravel\Reverb\Contracts\ApplicationProvider;
use Laravel\Reverb\Contracts\ConnectionManager;
use Laravel\Reverb\Servers\Reverb\Connection;
use Laravel\Reverb\WebSockets\Request as WebSocketRequest;
use Laravel\Reverb\WebSockets\WsConnection;
use Psr\Http\Message\RequestInterface;
use Ratchet\RFC6455\Handshake\RequestVerifier;
use Ratchet\RFC6455\Handshake\ServerNegotiator;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Socket\ConnectionInterface;
use React\Socket\ServerInterface;
use Symfony\Component\HttpFoundation\JsonResponse;

class HttpServer
{
protected $loop;

public function __construct(protected ServerInterface $socket, LoopInterface $loop = null)
{
$this->loop = $this->loop ?: Loop::get();

$socket->on('connection', $this);
}

public function __invoke(ConnectionInterface $connection)
{
$connection = new Conn($connection);

$connection->on('data', function ($data) use ($connection) {
$this->handleRequest($data, $connection);
});
// $conn->on('close', function () use ($conn) {
// $this->handleEnd($conn);
// });
// $conn->on('error', function (\Exception $e) use ($conn) {
// $this->handleError($e, $conn);
// });
}

public function start()
{
$this->loop->run();
}

protected function handleRequest(string $data, Conn $connection)
{
if(! $connection->isInitialized()) {
$request = Request::from($data);
$connection->initialize();

if($request->getUri()->getPath() === '/app/yysmuc8zbg4vo2hxgk9w') {
$negotiator = new ServerNegotiator(new RequestVerifier);
$response = $negotiator->handshake($request);

$connection = new WsConnection($connection);

$connection->send(Message::toString($response));

$server = app(Server::class);
$reverbConnection = $this->connection($request, $connection);
$server->open($reverbConnection);

$connection->on('message', fn (string $message) => $server->message($reverbConnection, $message));

return;
}

$payload = json_decode($request->getBody()->getContents(), true);

Event::dispatch($this->application($request), [
'event' => $payload['name'],
'channel' => $payload['channel'],
'data' => $payload['data'],
]);

return tap($connection)->send(new JsonResponse((object) []))->close();
}
}

protected function connection(RequestInterface $request, WsConnection $connection): Connection
{
return app(ConnectionManager::class)
->for($application = $this->application($request))
->resolve(
$connection->id(),
fn () => new Connection(
$connection,
$application,
$request->getHeader('Origin')[0] ?? null
)
);
}

/**
* Get the application instance for the request.
*/
protected function application(RequestInterface $request): Application
{
// parse_str($request->getUri()->getQuery(), $queryString);

return app(ApplicationProvider::class)->findById('123456');
}
}
14 changes: 14 additions & 0 deletions src/Request.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace Laravel\Reverb;

use GuzzleHttp\Psr7\Message;
use Psr\Http\Message\RequestInterface;

class Request
{
public static function from(string $message): RequestInterface
{
return Message::parseRequest($message);
}
}
1 change: 1 addition & 0 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public function __construct(
*/
public function open(Connection $connection): void
{
dump('Opening connection...');
try {
$this->verifyOrigin($connection);

Expand Down
2 changes: 1 addition & 1 deletion src/Servers/Reverb/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public function __construct(protected WsConnection $connection, Application $app
*/
public function identifier(): string
{
return (string) $this->connection->resourceId;
return (string) $this->connection->id();
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/Servers/Reverb/Console/Commands/StartServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ public function handle(): void
$this->subscribeToRedis($loop);
$this->scheduleCleanup($loop);

$server = ServerFactory::make($host, $port, $loop);

$this->components->info("Starting server on {$host}:{$port}");

ServerFactory::make($host, $port, $loop);
$server->start();
}

/**
Expand Down
48 changes: 23 additions & 25 deletions src/Servers/Reverb/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Laravel\Reverb\Event;
use Laravel\Reverb\Http\Controllers\EventController;
use Laravel\Reverb\Http\Controllers\StatsController;
use Laravel\Reverb\HttpServer as ReverbHttpServer;
use Laravel\Reverb\Server;
use Laravel\Reverb\WebSockets\WebSocketMiddleware;
use Psr\Http\Message\ServerRequestInterface;
Expand All @@ -17,6 +18,7 @@
use React\Http\Message\Response;
use React\Http\Middleware\LimitConcurrentRequestsMiddleware;
use React\Http\Middleware\RequestBodyBufferMiddleware;
use React\Http\Middleware\StreamingRequestMiddleware;
use React\Socket\SocketServer;
use Symfony\Component\Routing\Route;
use Symfony\Component\Routing\RouteCollection;
Expand All @@ -27,39 +29,35 @@ class Factory
* Create a new WebSocket server instance.
*/
public static function make(string $host = '0.0.0.0', string $port = '8080', LoopInterface $loop = null)
{
gc_enable();
set_time_limit(0);
ob_implicit_flush();

{
$loop = $loop ?: Loop::get();

$socket = new SocketServer("{$host}:{$port}", [], $loop);

$server = new HttpServer(
$loop,
new LimitConcurrentRequestsMiddleware(10000),
new RequestBodyBufferMiddleware(2 * 1024 * 1024),
new WebSocketMiddleware(App::make(Server::class)),
function (ServerRequestInterface $request) {
$payload = json_decode($request->getBody()->getContents(), true);
$appId = Str::beforeLast($request->getUri()->getPath(), '/');
$appId = Str::afterLast($appId, '/');
return new ReverbHttpServer($socket, $loop);

$app = app(ApplicationProvider::class)->findById($appId);
// dump('Starting server...');
// $server = new HttpServer(
// $loop,
// new LimitConcurrentRequestsMiddleware(10000),
// new StreamingRequestMiddleware(),
// new WebSocketMiddleware(App::make(Server::class)),
// function (ServerRequestInterface $request) {
// $payload = json_decode($request->getBody()->getContents(), true);
// $appId = Str::beforeLast($request->getUri()->getPath(), '/');
// $appId = Str::afterLast($appId, '/');

Event::dispatch($app, [
'event' => $payload['name'],
'channel' => $payload['channel'],
'data' => $payload['data'],
]);
// $app = app(ApplicationProvider::class)->findById($appId);

return Response::json(['status' => 'success']);
}
);
// Event::dispatch($app, [
// 'event' => $payload['name'],
// 'channel' => $payload['channel'],
// 'data' => $payload['data'],
// ]);

$server->listen($socket);
$loop->run();
// return Response::json(['status' => 'success']);
// }
// );
}

/**
Expand Down
16 changes: 11 additions & 5 deletions src/WebSockets/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Laravel\Reverb\WebSockets;

use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ServerRequestInterface;
use Ratchet\RFC6455\Handshake\RequestVerifier;
use Ratchet\RFC6455\Handshake\ServerNegotiator;
Expand All @@ -21,13 +22,15 @@ class Request

protected $response;

public function __construct(protected ServerRequestInterface $request)
protected $startMemory;

public function __construct(protected RequestInterface $request)
{
$this->startMemory = memory_get_usage();
$negotiator = new ServerNegotiator(new RequestVerifier);
dump(memory_get_usage() - $this->startMemory);
$this->response = $negotiator->handshake($this->request);
$this->input = new ThroughStream;
$this->output = new ThroughStream;
$this->stream = new CompositeStream($this->input, $this->output);
dump(memory_get_usage() - $this->startMemory);
}

/**
Expand Down Expand Up @@ -55,6 +58,9 @@ public function respond(): Response
*/
public function connect(): WsConnection
{
return new WsConnection($this->stream);
$connection = new WsConnection($this->stream);
dump(memory_get_usage() - $this->startMemory);

return $connection;
}
}
22 changes: 11 additions & 11 deletions src/WebSockets/WsConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,26 @@
namespace Laravel\Reverb\WebSockets;

use Evenement\EventEmitter;
use Illuminate\Support\Str;
use Laravel\Reverb\Conn;
use Ratchet\RFC6455\Messaging\CloseFrameChecker;
use Ratchet\RFC6455\Messaging\Message;
use Ratchet\RFC6455\Messaging\MessageBuffer;
use React\Stream\DuplexStreamInterface;

class WsConnection extends EventEmitter
{
public string $resourceId;

protected $buffer;

public function __construct(public DuplexStreamInterface $stream)
public function __construct(public Conn $connection)
{
$this->resourceId = Str::random();

$this->buffer = new MessageBuffer(
new CloseFrameChecker,
onMessage: fn (Message $message) => $this->emit('message', [$message->getPayload()]),
onControl: fn () => $this->close(),
sender: [$stream, 'write']
sender: [$connection, 'write']
);

$stream->on('data', [$this->buffer, 'onData']);
$stream->on('close', fn () => $this->emit('close'));
$connection->on('data', [$this->buffer, 'onData']);
$connection->on('close', fn () => $this->emit('close'));
}

/**
Expand All @@ -43,6 +38,11 @@ public function send(string $message): void
*/
public function close(): void
{
$this->stream->close();
$this->connection->close();
}

public function id()
{
return $this->connection->id();
}
}

0 comments on commit 9eec38d

Please sign in to comment.