diff --git a/src/Conn.php b/src/Conn.php new file mode 100644 index 00000000..65b25f13 --- /dev/null +++ b/src/Conn.php @@ -0,0 +1,63 @@ +id = (int) $connection->stream; + } + + public function id() + { + return $this->id; + } + + public function initialize() + { + $this->initilized = true; + } + + public function isInitialized() + { + return $this->initilized; + } + + public function hasBuffer() + { + return $this->buffer !== ''; + } + + public function send($data) + { + $this->connection->write($data); + + return $this; + } + + public function close() + { + $this->connection->end(); + + return $this; + } + + public function __call($method, $parameters) + { + if (! method_exists($this->connection, $method)) { + throw new BadMethodCallException("Method [{$method}] does not exist on [".get_class($this->connection).'].'); + } + + return $this->connection->{$method}(...$parameters); + } +} \ No newline at end of file diff --git a/src/HttpServer.php b/src/HttpServer.php new file mode 100644 index 00000000..7eae09ba --- /dev/null +++ b/src/HttpServer.php @@ -0,0 +1,110 @@ +loop = $this->loop ?: Loop::get(); + + $socket->on('connection', $this); + } + + public function __invoke(ConnectionInterface $connection) + { + $connection = new Conn($connection); + + $connection->on('data', function ($data) use ($connection) { + $this->handleRequest($data, $connection); + }); + // $conn->on('close', function () use ($conn) { + // $this->handleEnd($conn); + // }); + // $conn->on('error', function (\Exception $e) use ($conn) { + // $this->handleError($e, $conn); + // }); + } + + public function start() + { + $this->loop->run(); + } + + protected function handleRequest(string $data, Conn $connection) + { + if(! $connection->isInitialized()) { + $request = Request::from($data); + $connection->initialize(); + + if($request->getUri()->getPath() === '/app/yysmuc8zbg4vo2hxgk9w') { + $negotiator = new ServerNegotiator(new RequestVerifier); + $response = $negotiator->handshake($request); + + $connection = new WsConnection($connection); + + $connection->send(Message::toString($response)); + + $server = app(Server::class); + $reverbConnection = $this->connection($request, $connection); + $server->open($reverbConnection); + + $connection->on('message', fn (string $message) => $server->message($reverbConnection, $message)); + + return; + } + + $payload = json_decode($request->getBody()->getContents(), true); + + Event::dispatch($this->application($request), [ + 'event' => $payload['name'], + 'channel' => $payload['channel'], + 'data' => $payload['data'], + ]); + + return tap($connection)->send(new JsonResponse((object) []))->close(); + } + } + + protected function connection(RequestInterface $request, WsConnection $connection): Connection + { + return app(ConnectionManager::class) + ->for($application = $this->application($request)) + ->resolve( + $connection->id(), + fn () => new Connection( + $connection, + $application, + $request->getHeader('Origin')[0] ?? null + ) + ); + } + + /** + * Get the application instance for the request. + */ + protected function application(RequestInterface $request): Application + { + // parse_str($request->getUri()->getQuery(), $queryString); + + return app(ApplicationProvider::class)->findById('123456'); + } +} diff --git a/src/Request.php b/src/Request.php new file mode 100644 index 00000000..981f8aab --- /dev/null +++ b/src/Request.php @@ -0,0 +1,14 @@ +verifyOrigin($connection); diff --git a/src/Servers/Reverb/Connection.php b/src/Servers/Reverb/Connection.php index 7592cc65..bf73804c 100644 --- a/src/Servers/Reverb/Connection.php +++ b/src/Servers/Reverb/Connection.php @@ -30,7 +30,7 @@ public function __construct(protected WsConnection $connection, Application $app */ public function identifier(): string { - return (string) $this->connection->resourceId; + return (string) $this->connection->id(); } /** diff --git a/src/Servers/Reverb/Console/Commands/StartServer.php b/src/Servers/Reverb/Console/Commands/StartServer.php index a3b3e26d..0029afe6 100644 --- a/src/Servers/Reverb/Console/Commands/StartServer.php +++ b/src/Servers/Reverb/Console/Commands/StartServer.php @@ -48,9 +48,11 @@ public function handle(): void $this->subscribeToRedis($loop); $this->scheduleCleanup($loop); + $server = ServerFactory::make($host, $port, $loop); + $this->components->info("Starting server on {$host}:{$port}"); - ServerFactory::make($host, $port, $loop); + $server->start(); } /** diff --git a/src/Servers/Reverb/Factory.php b/src/Servers/Reverb/Factory.php index f88bafa2..5b465686 100644 --- a/src/Servers/Reverb/Factory.php +++ b/src/Servers/Reverb/Factory.php @@ -8,6 +8,7 @@ use Laravel\Reverb\Event; use Laravel\Reverb\Http\Controllers\EventController; use Laravel\Reverb\Http\Controllers\StatsController; +use Laravel\Reverb\HttpServer as ReverbHttpServer; use Laravel\Reverb\Server; use Laravel\Reverb\WebSockets\WebSocketMiddleware; use Psr\Http\Message\ServerRequestInterface; @@ -17,6 +18,7 @@ use React\Http\Message\Response; use React\Http\Middleware\LimitConcurrentRequestsMiddleware; use React\Http\Middleware\RequestBodyBufferMiddleware; +use React\Http\Middleware\StreamingRequestMiddleware; use React\Socket\SocketServer; use Symfony\Component\Routing\Route; use Symfony\Component\Routing\RouteCollection; @@ -27,39 +29,35 @@ class Factory * Create a new WebSocket server instance. */ public static function make(string $host = '0.0.0.0', string $port = '8080', LoopInterface $loop = null) - { - gc_enable(); - set_time_limit(0); - ob_implicit_flush(); - + { $loop = $loop ?: Loop::get(); $socket = new SocketServer("{$host}:{$port}", [], $loop); - $server = new HttpServer( - $loop, - new LimitConcurrentRequestsMiddleware(10000), - new RequestBodyBufferMiddleware(2 * 1024 * 1024), - new WebSocketMiddleware(App::make(Server::class)), - function (ServerRequestInterface $request) { - $payload = json_decode($request->getBody()->getContents(), true); - $appId = Str::beforeLast($request->getUri()->getPath(), '/'); - $appId = Str::afterLast($appId, '/'); + return new ReverbHttpServer($socket, $loop); - $app = app(ApplicationProvider::class)->findById($appId); + // dump('Starting server...'); + // $server = new HttpServer( + // $loop, + // new LimitConcurrentRequestsMiddleware(10000), + // new StreamingRequestMiddleware(), + // new WebSocketMiddleware(App::make(Server::class)), + // function (ServerRequestInterface $request) { + // $payload = json_decode($request->getBody()->getContents(), true); + // $appId = Str::beforeLast($request->getUri()->getPath(), '/'); + // $appId = Str::afterLast($appId, '/'); - Event::dispatch($app, [ - 'event' => $payload['name'], - 'channel' => $payload['channel'], - 'data' => $payload['data'], - ]); + // $app = app(ApplicationProvider::class)->findById($appId); - return Response::json(['status' => 'success']); - } - ); + // Event::dispatch($app, [ + // 'event' => $payload['name'], + // 'channel' => $payload['channel'], + // 'data' => $payload['data'], + // ]); - $server->listen($socket); - $loop->run(); + // return Response::json(['status' => 'success']); + // } + // ); } /** diff --git a/src/WebSockets/Request.php b/src/WebSockets/Request.php index bb29d563..d0d8ba3c 100644 --- a/src/WebSockets/Request.php +++ b/src/WebSockets/Request.php @@ -2,6 +2,7 @@ namespace Laravel\Reverb\WebSockets; +use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ServerRequestInterface; use Ratchet\RFC6455\Handshake\RequestVerifier; use Ratchet\RFC6455\Handshake\ServerNegotiator; @@ -21,13 +22,15 @@ class Request protected $response; - public function __construct(protected ServerRequestInterface $request) + protected $startMemory; + + public function __construct(protected RequestInterface $request) { + $this->startMemory = memory_get_usage(); $negotiator = new ServerNegotiator(new RequestVerifier); + dump(memory_get_usage() - $this->startMemory); $this->response = $negotiator->handshake($this->request); - $this->input = new ThroughStream; - $this->output = new ThroughStream; - $this->stream = new CompositeStream($this->input, $this->output); + dump(memory_get_usage() - $this->startMemory); } /** @@ -55,6 +58,9 @@ public function respond(): Response */ public function connect(): WsConnection { - return new WsConnection($this->stream); + $connection = new WsConnection($this->stream); + dump(memory_get_usage() - $this->startMemory); + + return $connection; } } diff --git a/src/WebSockets/WsConnection.php b/src/WebSockets/WsConnection.php index b74f4b55..c8bf066d 100644 --- a/src/WebSockets/WsConnection.php +++ b/src/WebSockets/WsConnection.php @@ -3,31 +3,26 @@ namespace Laravel\Reverb\WebSockets; use Evenement\EventEmitter; -use Illuminate\Support\Str; +use Laravel\Reverb\Conn; use Ratchet\RFC6455\Messaging\CloseFrameChecker; use Ratchet\RFC6455\Messaging\Message; use Ratchet\RFC6455\Messaging\MessageBuffer; -use React\Stream\DuplexStreamInterface; class WsConnection extends EventEmitter { - public string $resourceId; - protected $buffer; - public function __construct(public DuplexStreamInterface $stream) + public function __construct(public Conn $connection) { - $this->resourceId = Str::random(); - $this->buffer = new MessageBuffer( new CloseFrameChecker, onMessage: fn (Message $message) => $this->emit('message', [$message->getPayload()]), onControl: fn () => $this->close(), - sender: [$stream, 'write'] + sender: [$connection, 'write'] ); - $stream->on('data', [$this->buffer, 'onData']); - $stream->on('close', fn () => $this->emit('close')); + $connection->on('data', [$this->buffer, 'onData']); + $connection->on('close', fn () => $this->emit('close')); } /** @@ -43,6 +38,11 @@ public function send(string $message): void */ public function close(): void { - $this->stream->close(); + $this->connection->close(); + } + + public function id() + { + return $this->connection->id(); } }