Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Commit

Permalink
Feature: added auto heartbeat for every opened connection
Browse files Browse the repository at this point in the history
  • Loading branch information
bckp committed Jul 19, 2022
1 parent 0fc02e9 commit b1f263e
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 9 deletions.
121 changes: 120 additions & 1 deletion src/Connection/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,38 @@
use Bunny\ClientStateEnum;
use Bunny\Exception\BunnyException;
use Bunny\Exception\ClientException;
use Bunny\Protocol\AbstractFrame;
use Bunny\Protocol\HeartbeatFrame;
use Nette\Utils\Strings;

/**
* @codeCoverageIgnore
*/
class Client extends BunnyClient
{
/**
* Constructor.
*
* @param array<string|mixed> $options
*/
public function __construct(array $options = [])
{
// Construct parent
parent::__construct($options);
$this->options['cycle_callback'] = $options['cycle_callback'];
}

/**
* @throws BunnyException
*/
public function sendHeartbeat(): void
{
$this->getWriter()->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer();

if (is_callable($this->options['heartbeat_callback'] ?? null)) {
$this->options['heartbeat_callback']->call($this);
}
}

public function syncDisconnect(): bool
Expand All @@ -44,7 +65,105 @@ public function syncDisconnect(): bool
}

$this->init();

return true;
}

protected function write(): void
{
parent::write();
if (($last = error_get_last()) !== null) {
if (!Strings::match($last['message'], '~fwrite(): Send of \d+ bytes failed with errno=\d+ broken pipe~i')) {
return;
}

error_clear_last();
throw new ClientException('Broken pipe or closed connection.');
}
}

/**
* Runs it's own event loop, processes frames as they arrive. Processes messages for at most $maxSeconds.
*
* @param float $maxSeconds
*/
public function run($maxSeconds = null): void
{
if (!$this->isConnected()) {
throw new ClientException('Client has to be connected.');
}

$this->running = true;
$startTime = microtime(true);
$stopTime = null;
if ($maxSeconds !== null) {
$stopTime = $startTime + $maxSeconds;
}

do {
if (is_callable($this->options['cycle_callback'] ?? null)) {
$this->options['cycle_callback']->call($this);
}

if (!empty($this->queue)) {
$frame = array_shift($this->queue);
} else {
/** @var AbstractFrame|null $frame */
$frame = $this->reader->consumeFrame($this->readBuffer);
if ($frame === null) {
$now = microtime(true);
$nextStreamSelectTimeout = ($this->lastWrite ?: $now) + $this->options["heartbeat"];
if ($stopTime !== null && $stopTime < $nextStreamSelectTimeout) {
$nextStreamSelectTimeout = $stopTime;
}
$tvSec = max((int)($nextStreamSelectTimeout - $now), 0);
$tvUsec = max((int)(($nextStreamSelectTimeout - $now - $tvSec) * 1000000), 0);

$r = [$this->getStream()];
$w = null;
$e = null;

if (($n = @stream_select($r, $w, $e, $tvSec, $tvUsec)) === false) {
$lastError = error_get_last();
if ($lastError !== null &&
preg_match('/^stream_select\\(\\): unable to select \\[(\\d+)\\]:/', $lastError['message'], $m) &&
(int)$m[1] === PCNTL_EINTR
) {
// got interrupted by signal, dispatch signals & continue
pcntl_signal_dispatch();
$n = 0;
} else {
throw new ClientException(sprintf(
'stream_select() failed: %s',
$lastError ? $lastError['message'] : 'Unknown error.'
));
}
}
$now = microtime(true);
if ($stopTime !== null && $now >= $stopTime) {
break;
}

if ($n > 0) {
$this->feedReadBuffer();
}

continue;
}
}

/** @var AbstractFrame $frame */
if ($frame->channel === 0) {
$this->onFrameReceived($frame);
} else {
if (!isset($this->channels[$frame->channel])) {
throw new ClientException(
"Received frame #{$frame->type} on closed channel #{$frame->channel}."
);
}

$this->channels[$frame->channel]->onFrameReceived($frame);
}
} while ($this->running);
}
}
12 changes: 5 additions & 7 deletions src/Connection/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@
use Bunny\Channel;
use Bunny\Exception\ClientException;
use Contributte\RabbitMQ\Connection\Exception\ConnectionException;
use Contributte\RabbitMQ\Exchange\IExchange;
use Contributte\RabbitMQ\Queue\IQueue;

final class Connection implements IConnection
{

private const HEARTBEAT_INTERVAL = 1;

private Client $bunnyClient;

/**
* @var array<string, mixed>
*/
Expand All @@ -25,8 +24,8 @@ final class Connection implements IConnection
private ?Channel $channel = null;

/**
* @throws \Exception
* @param array<string, mixed> $ssl
* @throws \Exception
*/
public function __construct(
string $host,
Expand All @@ -40,7 +39,8 @@ public function __construct(
string $path,
bool $tcpNoDelay,
bool $lazy = false,
?array $ssl = null
?array $ssl = null,
?callable $cycleCallback = null
) {
$this->connectionParams = [
'host' => $host,
Expand All @@ -55,6 +55,7 @@ public function __construct(
'path' => $path,
'tcp_nodelay' => $tcpNoDelay,
'ssl' => $ssl,
'cycle_callback' => $cycleCallback,
];

$this->bunnyClient = $this->createNewConnection();
Expand Down Expand Up @@ -117,7 +118,6 @@ public function getChannel(): Channel
return $this->channel;
}


/**
* @throws \Exception
*/
Expand All @@ -130,7 +130,6 @@ public function connectIfNeeded(): void
$this->bunnyClient->connect();
}


public function sendHeartbeat(): void
{
$now = time();
Expand All @@ -140,7 +139,6 @@ public function sendHeartbeat(): void
}
}


public function getVhost(): string
{
return $this->connectionParams['vhost'];
Expand Down
11 changes: 10 additions & 1 deletion src/Connection/ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public function getApi(string $name): IApi
return $this->requests[$name];
}

public function sendHeartbeat(): bool
{
foreach ($this->connections as $connection) {
$connection->sendHeartbeat();
}
return true;
}

/**
* @throws ConnectionFactoryException
*/
Expand Down Expand Up @@ -108,7 +116,8 @@ private function create(string $name): IConnection
$connectionData['path'],
$connectionData['tcpNoDelay'],
$connectionData['lazy'],
$connectionData['ssl']
$connectionData['ssl'],
fn () => $this->sendHeartbeat()
);
}
}

0 comments on commit b1f263e

Please sign in to comment.