Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Commit

Permalink
Merge pull request #16 from mallgroup/feature-auto-heartbeat
Browse files Browse the repository at this point in the history
Feature: added auto heartbeat for every opened connection
  • Loading branch information
bckp authored Jul 20, 2022
2 parents 0fc02e9 + 39c56d4 commit f1cce84
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 53 deletions.
1 change: 1 addition & 0 deletions .docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rabbitmq:
host: localhost
port: 5672
lazy: false
heartbeatCallback: [@class, function] # Callback that is called every time real heartbeat is send
queues:
testQueue:
Expand Down
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ For details on how to use this package, check out our [documentation](.docs).

| State | Version | Branch | Nette | PHP |
|--------|-----------|----------|----------|---------|
| dev | `^10.x.x` | `master` | `3.0+` | `>=8.0` |
| stable | `^10.0.0` | `master` | `3.0+` | `>=8.0` |
| dev | `^10.2.x` | `master` | `3.0+` | `>=8.0` |
| stable | `^10.1.0` | `master` | `3.0+` | `>=8.0` |

## Development

This package is currently maintaining by these authors.

<a href="https://github.com/bckp"><img width="80" height="80" src="https://avatars.githubusercontent.com/u/179652?v=4&s=80"></a>
<a href="https://github.com/kucix"><img width="80" height="80" src="https://avatars.githubusercontent.com/u/444459?v=4&s=80"></a>
<a href="https://github.com/mallgroup"><img width="80" height="80" src="https://avatars.githubusercontent.com/u/23184995?v=4&s=80"></a>

-----
Expand Down
121 changes: 120 additions & 1 deletion src/Connection/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,41 @@
use Bunny\ClientStateEnum;
use Bunny\Exception\BunnyException;
use Bunny\Exception\ClientException;
use Bunny\Protocol\AbstractFrame;
use Bunny\Protocol\HeartbeatFrame;
use Nette\Utils\Strings;

/**
* @codeCoverageIgnore
*/
class Client extends BunnyClient
{
/**
* Constructor.
*
* @param array<string|mixed> $options
*/
public function __construct(array $options = [])
{
parent::__construct($options);

$this->options['cycle_callback'] = is_callable($options['cycle_callback'])
? $options['cycle_callback']
: null;
$this->options['heartbeat_callback'] = is_callable($options['heartbeat_callback'])
? $options['heartbeat_callback']
: null;
}

/**
* @throws BunnyException
*/
public function sendHeartbeat(): void
{
$this->getWriter()->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer();

$this->options['heartbeat_callback']?->call($this);
}

public function syncDisconnect(): bool
Expand All @@ -44,7 +68,102 @@ public function syncDisconnect(): bool
}

$this->init();

return true;
}

protected function write(): void
{
parent::write();
if (($last = error_get_last()) !== null) {
if (!Strings::match($last['message'], '~fwrite\(\): Send of \d+ bytes failed with errno=\d+ broken pipe~i')) {
return;
}

error_clear_last();
throw new ClientException('Broken pipe or closed connection.');
}
}

/**
* Runs it's own event loop, processes frames as they arrive. Processes messages for at most $maxSeconds.
*
* @param float $maxSeconds
*/
public function run($maxSeconds = null): void
{
if (!$this->isConnected()) {
throw new ClientException('Client has to be connected.');
}

$this->running = true;
$startTime = microtime(true);
$stopTime = null;
if ($maxSeconds !== null) {
$stopTime = $startTime + $maxSeconds;
}

do {
$this->options['cycle_callback']?->call($this);
if (!empty($this->queue)) {
$frame = array_shift($this->queue);
} else {
/** @var AbstractFrame|null $frame */
$frame = $this->reader->consumeFrame($this->readBuffer);
if ($frame === null) {
$now = microtime(true);
$nextStreamSelectTimeout = ($this->lastWrite ?: $now) + $this->options["heartbeat"];
if ($stopTime !== null && $stopTime < $nextStreamSelectTimeout) {
$nextStreamSelectTimeout = $stopTime;
}
$tvSec = max((int)($nextStreamSelectTimeout - $now), 0);
$tvUsec = max((int)(($nextStreamSelectTimeout - $now - $tvSec) * 1000000), 0);

$r = [$this->getStream()];
$w = null;
$e = null;

if (($n = @stream_select($r, $w, $e, $tvSec, $tvUsec)) === false) {
$lastError = error_get_last();
if ($lastError !== null &&
preg_match('/^stream_select\\(\\): unable to select \\[(\\d+)\\]:/', $lastError['message'], $m) &&
(int)$m[1] === PCNTL_EINTR
) {
// got interrupted by signal, dispatch signals & continue
pcntl_signal_dispatch();
$n = 0;
} else {
throw new ClientException(sprintf(
'stream_select() failed: %s',
$lastError ? $lastError['message'] : 'Unknown error.'
));
}
}
$now = microtime(true);
if ($stopTime !== null && $now >= $stopTime) {
break;
}

if ($n > 0) {
$this->feedReadBuffer();
}

continue;
}
}

/** @var AbstractFrame $frame */
if ($frame->channel === 0) {
$this->onFrameReceived($frame);
} else {
if (!isset($this->channels[$frame->channel])) {
throw new ClientException(
"Received frame #{$frame->type} on closed channel #{$frame->channel}."
);
}

$this->channels[$frame->channel]->onFrameReceived($frame);
}
} while ($this->running);
}
}
14 changes: 7 additions & 7 deletions src/Connection/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@
use Bunny\Channel;
use Bunny\Exception\ClientException;
use Contributte\RabbitMQ\Connection\Exception\ConnectionException;
use Contributte\RabbitMQ\Exchange\IExchange;
use Contributte\RabbitMQ\Queue\IQueue;

final class Connection implements IConnection
{

private const HEARTBEAT_INTERVAL = 1;

private Client $bunnyClient;

/**
* @var array<string, mixed>
*/
Expand All @@ -25,8 +24,8 @@ final class Connection implements IConnection
private ?Channel $channel = null;

/**
* @throws \Exception
* @param array<string, mixed> $ssl
* @throws \Exception
*/
public function __construct(
string $host,
Expand All @@ -40,7 +39,9 @@ public function __construct(
string $path,
bool $tcpNoDelay,
bool $lazy = false,
?array $ssl = null
?array $ssl = null,
?callable $cycleCallback = null,
?callable $heartbeatCallback = null,
) {
$this->connectionParams = [
'host' => $host,
Expand All @@ -55,6 +56,8 @@ public function __construct(
'path' => $path,
'tcp_nodelay' => $tcpNoDelay,
'ssl' => $ssl,
'cycle_callback' => $cycleCallback,
'heartbeat_callback' => $heartbeatCallback,
];

$this->bunnyClient = $this->createNewConnection();
Expand Down Expand Up @@ -117,7 +120,6 @@ public function getChannel(): Channel
return $this->channel;
}


/**
* @throws \Exception
*/
Expand All @@ -130,7 +132,6 @@ public function connectIfNeeded(): void
$this->bunnyClient->connect();
}


public function sendHeartbeat(): void
{
$now = time();
Expand All @@ -140,7 +141,6 @@ public function sendHeartbeat(): void
}
}


public function getVhost(): string
{
return $this->connectionParams['vhost'];
Expand Down
12 changes: 11 additions & 1 deletion src/Connection/ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public function getApi(string $name): IApi
return $this->requests[$name];
}

public function sendHeartbeat(): bool
{
foreach ($this->connections as $connection) {
$connection->sendHeartbeat();
}
return true;
}

/**
* @throws ConnectionFactoryException
*/
Expand Down Expand Up @@ -108,7 +116,9 @@ private function create(string $name): IConnection
$connectionData['path'],
$connectionData['tcpNoDelay'],
$connectionData['lazy'],
$connectionData['ssl']
$connectionData['ssl'],
fn () => $this->sendHeartbeat(),
$connectionData['heartbeatCallback'] ?? null,
);
}
}
1 change: 1 addition & 0 deletions src/DI/Helpers/ConnectionsHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public function getConfigSchema(): Schema
'tcpNoDelay' => Expect::bool(false),
'lazy' => Expect::bool(true),
'ssl' => Expect::array(null)->required(false),
'heartbeatCallback' => Expect::array(null)->required(false)->assert('is_callable'),
'admin' => Expect::structure([
'port' => Expect::int(15672),
'secure' => Expect::bool(false),
Expand Down
2 changes: 1 addition & 1 deletion src/DI/Helpers/ConsumersHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public function getConfigSchema(): Schema
return Expect::arrayOf(
Expect::structure([
'queue' => Expect::string()->required(true),
'callback' => Expect::array()->required(true),
'callback' => Expect::array()->required(true)->assert('is_callable'),
'idleTimeout' => Expect::int(30),
'bulk' => Expect::structure([
'size' => Expect::int()->min(1),
Expand Down
42 changes: 2 additions & 40 deletions src/Producer/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
use Contributte\RabbitMQ\Exchange\IExchange;
use Contributte\RabbitMQ\LazyDeclarator;
use Contributte\RabbitMQ\Queue\IQueue;
use Nette\Utils\Callback;
use Tracy\Dumper;

final class Producer
{
Expand Down Expand Up @@ -39,11 +37,11 @@ public function publish(string $message, array $headers = [], ?string $routingKe
$headers = array_merge($this->getBasicHeaders(), $headers);

if ($this->queue !== null) {
$this->publishToQueue($message, $headers);
$this->tryPublish($this->queue, $message, $headers, '', $this->queue->getName());
}

if ($this->exchange !== null) {
$this->publishToExchange($message, $headers, $routingKey ?? '');
$this->tryPublish($this->exchange, $message, $headers, $this->exchange->getName(), $routingKey ?? '');
}

foreach ($this->publishCallbacks as $callback) {
Expand All @@ -58,18 +56,6 @@ public function addOnPublishCallback(callable $callback): void
}


public function sendHeartbeat(): void
{
trigger_error(__METHOD__ . '() is deprecated, use dependency ConnectionFactory::sendHeartbeat().', E_USER_DEPRECATED);
if ($this->queue !== null) {
$this->queue->getConnection()->sendHeartbeat();
}
if ($this->exchange !== null) {
$this->exchange->getConnection()->sendHeartbeat();
}
}


/**
* @return array<string, string|int>
*/
Expand All @@ -81,30 +67,6 @@ private function getBasicHeaders(): array
];
}

/**
* @param array<string, string|int> $headers
*/
private function publishToQueue(string $message, array $headers = []): void
{
if (null === $queue = $this->queue) {
throw new \UnexpectedValueException('Queue is not defined');
}

$this->tryPublish($queue, $message, $headers, '', $queue->getName());
}


/**
* @param array<string, string|int> $headers
*/
private function publishToExchange(string $message, array $headers, string $routingKey): void
{
if (null === $exchange = $this->exchange) {
throw new \UnexpectedValueException('Exchange is not defined');
}

$this->tryPublish($exchange, $message, $headers, $exchange->getName(), $routingKey);
}

/**
* @param array<string, string|int> $headers
Expand Down

0 comments on commit f1cce84

Please sign in to comment.