Skip to content

Commit

Permalink
emit disconnect event on ping timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Nitro-N committed Nov 29, 2021
1 parent 1395020 commit 1673f69
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 24 deletions.
21 changes: 21 additions & 0 deletions src/MgKurentoClient/Exceptions/ConnectionClosedException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

namespace MgKurentoClient\Exceptions;

/**
* Class ConnectionClosedException
*
* @package src\MgKurentoClient\Exceptions
*
* @author Sergey Zvyagintsev <[email protected]>
*/
class ConnectionClosedException extends KurentoClientException
{
/**
* @inheritDoc
*/
public function __construct($message = 'Connection closed', $code = 0, $data = null)
{
parent::__construct($message, $code, $data);
}
}
21 changes: 21 additions & 0 deletions src/MgKurentoClient/Exceptions/DeferredNotFoundException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

namespace MgKurentoClient\Exceptions;

/**
* Class DeferredNotFoundException
*
* @package src\MgKurentoClient\Exceptions
*
* @author Sergey Zvyagintsev <[email protected]>
*/
class DeferredNotFoundException extends KurentoClientException
{
/**
* @inheritDoc
*/
public function __construct($message = 'Json deferred not found', $code = 0, $data = null)
{
parent::__construct($message, $code, $data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* file that was distributed with this source code.
*/

namespace MgKurentoClient\JsonRpc;
namespace MgKurentoClient\Exceptions;

use Exception;

Expand All @@ -30,7 +30,7 @@ public function __construct($message, $code = 0, $data = null)
{
$this->data = $data;

return parent::__construct($message, $code);
parent::__construct($message, $code);
}


Expand Down
14 changes: 14 additions & 0 deletions src/MgKurentoClient/Exceptions/RpcErrorException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace MgKurentoClient\Exceptions;

/**
* Class RpcErrorException
*
* @package src\MgKurentoClient\Exceptions
*
* @author Sergey Zvyagintsev <[email protected]>
*/
class RpcErrorException extends KurentoClientException
{
}
21 changes: 21 additions & 0 deletions src/MgKurentoClient/Exceptions/RpcTimeoutException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

namespace MgKurentoClient\Exceptions;

/**
* Class RpcTimeoutException
*
* @package src\MgKurentoClient\Exceptions
*
* @author Sergey Zvyagintsev <[email protected]>
*/
class RpcTimeoutException extends KurentoClientException
{
/**
* @inheritDoc
*/
public function __construct($message = 'RPC timeout', $code = 0, $data = null)
{
parent::__construct($message, $code, $data);
}
}
5 changes: 4 additions & 1 deletion src/MgKurentoClient/Interfaces/MediaObject.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@

namespace MgKurentoClient\Interfaces;

use Evenement\EventEmitterInterface;
use React\Promise\Promise;

interface MediaObject
interface MediaObject extends EventEmitterInterface
{
public function build(array $params = []): Promise;

Expand All @@ -23,4 +24,6 @@ public function getId();
public function getMediaPipeline();

public function getParent();

public function destroy();
}
26 changes: 17 additions & 9 deletions src/MgKurentoClient/JsonRpc/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
namespace MgKurentoClient\JsonRpc;

use Evenement\EventEmitter;
use MgKurentoClient\Exceptions\ConnectionClosedException;
use MgKurentoClient\Exceptions\DeferredNotFoundException;
use MgKurentoClient\Exceptions\RpcErrorException;
use MgKurentoClient\Exceptions\RpcTimeoutException;
use MgKurentoClient\WebRtc\Client as WsClient;
use Psr\Log\LoggerInterface;
use Ratchet\RFC6455\Messaging\MessageInterface;
Expand Down Expand Up @@ -76,8 +80,8 @@ class Client extends EventEmitter
public function __construct($websocketUrl, $loop, $logger, $timeout)
{
$this->websocketUrl = $websocketUrl;
$this->loop = $loop;
$this->logger = $logger;
$this->loop = $loop;
$this->logger = $logger;
$this->timeout = $timeout;

$this->wsClient = new WsClient($websocketUrl, $loop, $this->logger);
Expand Down Expand Up @@ -115,7 +119,7 @@ public function connect()
*
* @param array $data
* @return mixed
* @throws KurentoClientException
* @throws DeferredNotFoundException
*/
public function receive($data)
{
Expand All @@ -124,7 +128,7 @@ public function receive($data)
$this->sessionId = $data['result']['sessionId'];
}
//onEvent?
if (isset($data['method']) && $data['method'] == 'onEvent') {
if (isset($data['method']) && $data['method'] === 'onEvent') {
$value = $data['params']['value'];
$key = $value['object'] . '__' . $value['type'];
if (isset($this->subscriptions[$key])) {
Expand All @@ -145,13 +149,13 @@ public function receive($data)
if (isset($data['error']) && isset($data['id']) && isset($this->deferred[$data['id']])) {
$deferred = $this->deferred[$data['id']];
$error = $data['error'];
$deferred->reject(new KurentoClientException($error['message'] ?? '', $error['code'] ?? 0, $error['data'] ?? null));
$deferred->reject(new RpcErrorException($error['message'] ?? '', $error['code'] ?? 0, $error['data'] ?? null));
unset($this->deferred[$data['id']]);

return;
}

throw new KurentoClientException('Json deferred not found');
throw new DeferredNotFoundException();
}

/**
Expand Down Expand Up @@ -281,7 +285,7 @@ protected function send($method, $params): Promise
$this->deferred[$this->id] = $deferred;

$timer = $this->loop->addTimer($this->timeout, function () use ($deferred) {
$deferred->reject(new KurentoClientException('RPC timeout'));
$deferred->reject(new RpcTimeoutException());
});

return $deferred->promise()
Expand All @@ -294,7 +298,11 @@ private function startPing()
{
$this->pingTimer = $this->loop->addPeriodicTimer(60, function () {
if ($this->wsClient->isConnected()) {
$this->send('ping', []);
$this->send('ping', [])->otherwise(function($exception) {
if ($exception instanceof RpcTimeoutException) {
$this->wsClient->onPingTimeout();
}
});
}
});
}
Expand All @@ -308,7 +316,7 @@ private function rejectAllPromises()
{
/** @var Deferred $deferredItem */
foreach ($this->deferred as $deferredItem) {
$deferredItem->reject(new KurentoClientException('Connection closed'));
$deferredItem->reject(new ConnectionClosedException());
}
$this->deferred = [];
}
Expand Down
19 changes: 18 additions & 1 deletion src/MgKurentoClient/MediaObject.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,28 @@

namespace MgKurentoClient;

use Evenement\EventEmitter;
use React\Promise\Promise;

class MediaObject implements Interfaces\MediaObject
class MediaObject extends EventEmitter implements Interfaces\MediaObject
{
public const EVENT_DESTROYED = 'destroyed';

protected $id = null;

protected $pipeline = null;

protected $destroyed = false;

public function __construct(Interfaces\MediaPipeline $pipeline)
{
$this->pipeline = $pipeline;

if ($pipeline !== $this) {
$pipeline->on(self::EVENT_DESTROYED, function() {
$this->destroy();
});
}
}

public function release(): Promise
Expand Down Expand Up @@ -63,6 +74,12 @@ public function remoteRelease(): Promise
->sendRelease($this->getId());
}

public function destroy()
{
$this->destroyed = true;
$this->emit(self::EVENT_DESTROYED);
}

public function remoteUnsubscribe($subscriptionId): Promise
{
return $this->pipeline->getJsonRpc()
Expand Down
7 changes: 6 additions & 1 deletion src/MgKurentoClient/MediaPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ class MediaPipeline extends MediaObject implements Interfaces\MediaPipeline

public function __construct(JsonRpc\Client $jsonRpc)
{
$this->jsonRpc = $jsonRpc;
parent::__construct($this);

$this->jsonRpc = $jsonRpc;

$jsonRpc->on(WebRtc\Client::EVENT_CONNECTION_CLOSED, function() {
$this->destroy();
});
}

public function getJsonRpc()
Expand Down
34 changes: 24 additions & 10 deletions src/MgKurentoClient/WebRtc/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class Client extends EventEmitter
*/
private $connection = null;


/**
* @var bool
*/
Expand Down Expand Up @@ -101,35 +100,50 @@ public function connect()
});

$connection->on('closed', function ($code = null, $reason = null) {
$this->logger->info("Connection closed ({$code} - {$reason})");
$this->emit(self::EVENT_CONNECTION_CLOSED, [$code, $reason]);
if ($code === Frame::CLOSE_ABNORMAL) {
$this->logger->error("Connection closed abnormally({$code} - {$reason})");
$this->emit(self::EVENT_CONNECTION_CLOSED_ABNORMALLY, [$code, $reason]);
$this->reconnect();
}
$this->onConnectionClosed($code, $reason);
});

$connection->on('error', function ($error) {
$this->logger->error("Error: {$error}");
$this->emit(self::EVENT_STREAM_ERROR, [$error]);
});
}, function (Exception $e) {
$this->connection = null;
$this->logger->error("Could not connect: {$e->getMessage()}");
$this->emit(self::EVENT_CONNECTION_ERROR, [$e]);
$this->connection = null;
$this->reconnect();
});
}

public function onPingTimeout()
{
$this->onConnectionClosed(Frame::CLOSE_ABNORMAL, 'Ping timeout');
}

/**
* @param int $code
* @param string $reason
*/
private function onConnectionClosed($code, $reason)
{
$this->connection = null;
$this->logger->info("Connection closed ({$code} - {$reason})");
$this->emit(self::EVENT_CONNECTION_CLOSED, [$code, $reason]);
if ($code === Frame::CLOSE_ABNORMAL) {
$this->logger->error("Connection closed abnormally({$code} - {$reason})");
$this->emit(self::EVENT_CONNECTION_CLOSED_ABNORMALLY, [$code, $reason]);
$this->reconnect();
}
}

/**
* Send message
*
* @param string $message
*/
public function send($message)
{
if (!$this->connection) {
if (!$this->isConnected()) {
if (!$this->connecting) {
$this->connect();
}
Expand Down

0 comments on commit 1673f69

Please sign in to comment.