diff --git a/src/Client.php b/src/Client.php index abaa986..28bcd77 100644 --- a/src/Client.php +++ b/src/Client.php @@ -10,12 +10,14 @@ use Calcinai\Bolt\Protocol\ProtocolInterface; use Calcinai\Bolt\Protocol\RFC6455; use Evenement\EventEmitter; +use Ratchet\RFC6455\Messaging\Frame; use React\Dns\Resolver\Resolver; use React\EventLoop\LoopInterface; use React\Socket\ConnectionInterface; use React\Socket\Connector; -class Client extends EventEmitter { +class Client extends EventEmitter +{ /** * @var LoopInterface @@ -55,23 +57,24 @@ class Client extends EventEmitter { const PORT_DEFAULT_HTTP = 80; const PORT_DEFAULT_HTTPS = 443; - const STATE_CONNECTING = 'connecting'; - const STATE_CONNECTED = 'connected'; - const STATE_CLOSING = 'closing'; - const STATE_CLOSED = 'closed'; + const STATE_CONNECTING = 'connecting'; + const STATE_CONNECTED = 'connected'; + const STATE_CLOSING = 'closing'; + const STATE_CLOSED = 'closed'; - public function __construct($uri, LoopInterface $loop, Resolver $resolver = null, $protocol = null){ + public function __construct($uri, LoopInterface $loop, Resolver $resolver = null, $protocol = null) + { - if(false === filter_var($uri, FILTER_VALIDATE_URL)){ + if (false === filter_var($uri, FILTER_VALIDATE_URL)) { throw new \InvalidArgumentException(sprintf('Invalid URI [%s]. Must be in format ws(s)://host:port/path', $uri)); } - if($protocol !== null) { - if(!in_array(ProtocolInterface::class, class_implements($protocol))){ + if ($protocol !== null) { + if (!in_array(ProtocolInterface::class, class_implements($protocol))) { throw new \InvalidArgumentException(sprintf('%s must implement %s', $protocol, ProtocolInterface::class)); } $this->protocol = $protocol; - } else{ + } else { $this->protocol = RFC6455::class; } @@ -82,13 +85,14 @@ public function __construct($uri, LoopInterface $loop, Resolver $resolver = null $this->heartbeat_interval = null; } - public function connect() { + public function connect() + { $connector = new Connector($this->loop, ['dns' => $this->resolver, 'timeout' => 5]); - $uri = (object) parse_url($this->uri); + $uri = (object)parse_url($this->uri); - switch($uri->scheme){ + switch ($uri->scheme) { case 'ws': $scheme = 'tcp'; $port = isset($uri->port) ? $uri->port : self::PORT_DEFAULT_HTTP; @@ -105,17 +109,18 @@ public function connect() { $this->setState(self::STATE_CONNECTING); - return $connector->connect($scheme . '://' . $uri->host . ':' . $port)->then(function(ConnectionInterface $stream) use($that) { + return $connector->connect($scheme . '://' . $uri->host . ':' . $port)->then(function (ConnectionInterface $stream) use ($that) { $that->transport = new $that->protocol($that, $stream); $that->transport->upgrade(); }); } - public function setState($state){ + public function setState($state) + { $this->state = $state; - switch($state){ + switch ($state) { case self::STATE_CONNECTING: $this->emit('connecting'); break; @@ -135,28 +140,34 @@ public function setState($state){ return $this; } - public function getState(){ + public function getState() + { return $this->state; } - public function getURI(){ + public function getURI() + { return $this->uri; } - public function getLoop(){ + public function getLoop() + { return $this->loop; } - public function send($string) { - $this->transport->send($string); + public function send($string, $type = Frame::OP_TEXT) + { + $this->transport->send($string, $type); } - public function setHeartbeatInterval($interval) { + public function setHeartbeatInterval($interval) + { $this->heartbeat_interval = $interval; } - public function getHeartbeatInterval() { + public function getHeartbeatInterval() + { return $this->heartbeat_interval; } -} \ No newline at end of file +} diff --git a/src/Protocol/ProtocolInterface.php b/src/Protocol/ProtocolInterface.php index f94a85d..7a58111 100644 --- a/src/Protocol/ProtocolInterface.php +++ b/src/Protocol/ProtocolInterface.php @@ -7,9 +7,15 @@ namespace Calcinai\Bolt\Protocol; -interface ProtocolInterface { +use Ratchet\RFC6455\Messaging\Frame; + +interface ProtocolInterface +{ public function onStreamData(&$buffer); + public function upgrade(); - public function send($string); + + public function send($string, $type = Frame::OP_TEXT); + public static function getVersion(); -} \ No newline at end of file +} diff --git a/src/Protocol/RFC6455.php b/src/Protocol/RFC6455.php index e49a7a3..67e1b54 100644 --- a/src/Protocol/RFC6455.php +++ b/src/Protocol/RFC6455.php @@ -8,16 +8,16 @@ use Calcinai\Bolt\Client; use GuzzleHttp\Psr7\Uri; +use Ratchet\RFC6455\Handshake\ClientNegotiator; +use Ratchet\RFC6455\Messaging\CloseFrameChecker; use Ratchet\RFC6455\Messaging\Frame; use Ratchet\RFC6455\Messaging\FrameInterface; use Ratchet\RFC6455\Messaging\MessageBuffer; use Ratchet\RFC6455\Messaging\MessageInterface; -use Ratchet\RFC6455\Handshake\ClientNegotiator; -use Ratchet\RFC6455\Messaging\CloseFrameChecker; - use function GuzzleHttp\Psr7\parse_response; -class RFC6455 extends AbstractProtocol { +class RFC6455 extends AbstractProtocol +{ /** @var ClientNegotiator */ private $negotiator; @@ -28,7 +28,8 @@ class RFC6455 extends AbstractProtocol { /** @var MessageBuffer */ private $message_buffer; - public function upgrade() { + public function upgrade() + { $this->negotiator = new ClientNegotiator(); @@ -38,17 +39,19 @@ public function upgrade() { // If your WebSocket server uses Basic Auth this needs to be added manually as a header $uri = parse_url($this->client->getURI()); - if(isset($uri['user']) || isset($uri['pass'])){ - $this->connection_request = $this->connection_request->withAddedHeader('Authorization', 'Basic ' . base64_encode($uri['user'] . ':' . $uri['pass'])); + if (isset($uri['user']) || isset($uri['pass'])) { + $this->connection_request = $this->connection_request->withAddedHeader('Authorization', + 'Basic ' . base64_encode($uri['user'] . ':' . $uri['pass'])); } $this->stream->write(\GuzzleHttp\Psr7\str($this->connection_request)); } - public function onStreamData(&$buffer) { + public function onStreamData(&$buffer) + { - if($this->client->getState() !== Client::STATE_CONNECTED){ + if ($this->client->getState() !== Client::STATE_CONNECTED) { $response = parse_response($buffer); @@ -67,10 +70,10 @@ public function onStreamData(&$buffer) { $this->message_buffer = new MessageBuffer( new CloseFrameChecker(), - function (MessageInterface $msg) use ($that){ + function (MessageInterface $msg) use ($that) { $that->client->emit('message', [$msg->getPayload()]); }, - function(FrameInterface $frame) use ($that){ + function (FrameInterface $frame) use ($that) { $that->processControlFrame($frame); }, false @@ -88,9 +91,10 @@ function(FrameInterface $frame) use ($that){ } - public function processControlFrame(FrameInterface $frame) { + public function processControlFrame(FrameInterface $frame) + { - switch($frame->getOpcode()){ + switch ($frame->getOpcode()) { case Frame::OP_PING: $f = new Frame($frame->getPayload(), true, Frame::OP_PONG); @@ -107,19 +111,22 @@ public function processControlFrame(FrameInterface $frame) { } - public function send($string, $type = Frame::OP_TEXT) { + public function send($string, $type = Frame::OP_TEXT) + { $frame = new Frame($string, true, $type); $this->stream->write($frame->maskPayload()->getContents()); } - public function sendHeartbeat(){ + public function sendHeartbeat() + { $frame = new Frame(uniqid(), true, Frame::OP_PING); $this->stream->write($frame->maskPayload()->getContents()); } - public static function getVersion() { + public static function getVersion() + { return 10; } -} \ No newline at end of file +}