Skip to content

Commit

Permalink
Add binary support
Browse files Browse the repository at this point in the history
  • Loading branch information
calcinai committed May 31, 2019
1 parent 5eef2e8 commit 02746f5
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 44 deletions.
59 changes: 35 additions & 24 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
use Calcinai\Bolt\Protocol\ProtocolInterface;
use Calcinai\Bolt\Protocol\RFC6455;
use Evenement\EventEmitter;
use Ratchet\RFC6455\Messaging\Frame;
use React\Dns\Resolver\Resolver;
use React\EventLoop\LoopInterface;
use React\Socket\ConnectionInterface;
use React\Socket\Connector;

class Client extends EventEmitter {
class Client extends EventEmitter
{

/**
* @var LoopInterface
Expand Down Expand Up @@ -55,23 +57,24 @@ class Client extends EventEmitter {
const PORT_DEFAULT_HTTP = 80;
const PORT_DEFAULT_HTTPS = 443;

const STATE_CONNECTING = 'connecting';
const STATE_CONNECTED = 'connected';
const STATE_CLOSING = 'closing';
const STATE_CLOSED = 'closed';
const STATE_CONNECTING = 'connecting';
const STATE_CONNECTED = 'connected';
const STATE_CLOSING = 'closing';
const STATE_CLOSED = 'closed';

public function __construct($uri, LoopInterface $loop, Resolver $resolver = null, $protocol = null){
public function __construct($uri, LoopInterface $loop, Resolver $resolver = null, $protocol = null)
{

if(false === filter_var($uri, FILTER_VALIDATE_URL)){
if (false === filter_var($uri, FILTER_VALIDATE_URL)) {
throw new \InvalidArgumentException(sprintf('Invalid URI [%s]. Must be in format ws(s)://host:port/path', $uri));
}

if($protocol !== null) {
if(!in_array(ProtocolInterface::class, class_implements($protocol))){
if ($protocol !== null) {
if (!in_array(ProtocolInterface::class, class_implements($protocol))) {
throw new \InvalidArgumentException(sprintf('%s must implement %s', $protocol, ProtocolInterface::class));
}
$this->protocol = $protocol;
} else{
} else {
$this->protocol = RFC6455::class;
}

Expand All @@ -82,13 +85,14 @@ public function __construct($uri, LoopInterface $loop, Resolver $resolver = null
$this->heartbeat_interval = null;
}

public function connect() {
public function connect()
{

$connector = new Connector($this->loop, ['dns' => $this->resolver, 'timeout' => 5]);

$uri = (object) parse_url($this->uri);
$uri = (object)parse_url($this->uri);

switch($uri->scheme){
switch ($uri->scheme) {
case 'ws':
$scheme = 'tcp';
$port = isset($uri->port) ? $uri->port : self::PORT_DEFAULT_HTTP;
Expand All @@ -105,17 +109,18 @@ public function connect() {

$this->setState(self::STATE_CONNECTING);

return $connector->connect($scheme . '://' . $uri->host . ':' . $port)->then(function(ConnectionInterface $stream) use($that) {
return $connector->connect($scheme . '://' . $uri->host . ':' . $port)->then(function (ConnectionInterface $stream) use ($that) {
$that->transport = new $that->protocol($that, $stream);
$that->transport->upgrade();
});

}

public function setState($state){
public function setState($state)
{
$this->state = $state;

switch($state){
switch ($state) {
case self::STATE_CONNECTING:
$this->emit('connecting');
break;
Expand All @@ -135,28 +140,34 @@ public function setState($state){
return $this;
}

public function getState(){
public function getState()
{
return $this->state;
}

public function getURI(){
public function getURI()
{
return $this->uri;
}

public function getLoop(){
public function getLoop()
{
return $this->loop;
}

public function send($string) {
$this->transport->send($string);
public function send($string, $type = Frame::OP_TEXT)
{
$this->transport->send($string, $type);
}

public function setHeartbeatInterval($interval) {
public function setHeartbeatInterval($interval)
{
$this->heartbeat_interval = $interval;
}

public function getHeartbeatInterval() {
public function getHeartbeatInterval()
{
return $this->heartbeat_interval;
}

}
}
12 changes: 9 additions & 3 deletions src/Protocol/ProtocolInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@

namespace Calcinai\Bolt\Protocol;

interface ProtocolInterface {
use Ratchet\RFC6455\Messaging\Frame;

interface ProtocolInterface
{
public function onStreamData(&$buffer);

public function upgrade();
public function send($string);

public function send($string, $type = Frame::OP_TEXT);

public static function getVersion();
}
}
41 changes: 24 additions & 17 deletions src/Protocol/RFC6455.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@

use Calcinai\Bolt\Client;
use GuzzleHttp\Psr7\Uri;
use Ratchet\RFC6455\Handshake\ClientNegotiator;
use Ratchet\RFC6455\Messaging\CloseFrameChecker;
use Ratchet\RFC6455\Messaging\Frame;
use Ratchet\RFC6455\Messaging\FrameInterface;
use Ratchet\RFC6455\Messaging\MessageBuffer;
use Ratchet\RFC6455\Messaging\MessageInterface;
use Ratchet\RFC6455\Handshake\ClientNegotiator;
use Ratchet\RFC6455\Messaging\CloseFrameChecker;

use function GuzzleHttp\Psr7\parse_response;

class RFC6455 extends AbstractProtocol {
class RFC6455 extends AbstractProtocol
{

/** @var ClientNegotiator */
private $negotiator;
Expand All @@ -28,7 +28,8 @@ class RFC6455 extends AbstractProtocol {
/** @var MessageBuffer */
private $message_buffer;

public function upgrade() {
public function upgrade()
{

$this->negotiator = new ClientNegotiator();

Expand All @@ -38,17 +39,19 @@ public function upgrade() {
// If your WebSocket server uses Basic Auth this needs to be added manually as a header
$uri = parse_url($this->client->getURI());

if(isset($uri['user']) || isset($uri['pass'])){
$this->connection_request = $this->connection_request->withAddedHeader('Authorization', 'Basic ' . base64_encode($uri['user'] . ':' . $uri['pass']));
if (isset($uri['user']) || isset($uri['pass'])) {
$this->connection_request = $this->connection_request->withAddedHeader('Authorization',
'Basic ' . base64_encode($uri['user'] . ':' . $uri['pass']));
}

$this->stream->write(\GuzzleHttp\Psr7\str($this->connection_request));

}

public function onStreamData(&$buffer) {
public function onStreamData(&$buffer)
{

if($this->client->getState() !== Client::STATE_CONNECTED){
if ($this->client->getState() !== Client::STATE_CONNECTED) {

$response = parse_response($buffer);

Expand All @@ -67,10 +70,10 @@ public function onStreamData(&$buffer) {

$this->message_buffer = new MessageBuffer(
new CloseFrameChecker(),
function (MessageInterface $msg) use ($that){
function (MessageInterface $msg) use ($that) {
$that->client->emit('message', [$msg->getPayload()]);
},
function(FrameInterface $frame) use ($that){
function (FrameInterface $frame) use ($that) {
$that->processControlFrame($frame);
},
false
Expand All @@ -88,9 +91,10 @@ function(FrameInterface $frame) use ($that){

}

public function processControlFrame(FrameInterface $frame) {
public function processControlFrame(FrameInterface $frame)
{

switch($frame->getOpcode()){
switch ($frame->getOpcode()) {

case Frame::OP_PING:
$f = new Frame($frame->getPayload(), true, Frame::OP_PONG);
Expand All @@ -107,19 +111,22 @@ public function processControlFrame(FrameInterface $frame) {

}

public function send($string, $type = Frame::OP_TEXT) {
public function send($string, $type = Frame::OP_TEXT)
{
$frame = new Frame($string, true, $type);
$this->stream->write($frame->maskPayload()->getContents());
}

public function sendHeartbeat(){
public function sendHeartbeat()
{
$frame = new Frame(uniqid(), true, Frame::OP_PING);
$this->stream->write($frame->maskPayload()->getContents());

}

public static function getVersion() {
public static function getVersion()
{
return 10;
}

}
}

0 comments on commit 02746f5

Please sign in to comment.