From 39c56d4b039ed98c53444026269ff5a8cbe60e41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radovan=20Kep=C3=A1k?= Date: Wed, 20 Jul 2022 08:58:05 +0200 Subject: [PATCH] - Added heartbeat callback to the configuration - Small refactoring and speed improvements --- .docs/README.md | 1 + README.md | 5 ++-- src/Connection/Client.php | 20 ++++++------- src/Connection/Connection.php | 4 ++- src/Connection/ConnectionFactory.php | 3 +- src/DI/Helpers/ConnectionsHelper.php | 1 + src/DI/Helpers/ConsumersHelper.php | 2 +- src/Producer/Producer.php | 42 ++-------------------------- 8 files changed, 22 insertions(+), 56 deletions(-) diff --git a/.docs/README.md b/.docs/README.md index f947115..51d196b 100644 --- a/.docs/README.md +++ b/.docs/README.md @@ -35,6 +35,7 @@ rabbitmq: host: localhost port: 5672 lazy: false + heartbeatCallback: [@class, function] # Callback that is called every time real heartbeat is send queues: testQueue: diff --git a/README.md b/README.md index 667fc21..48c404a 100644 --- a/README.md +++ b/README.md @@ -31,15 +31,14 @@ For details on how to use this package, check out our [documentation](.docs). | State | Version | Branch | Nette | PHP | |--------|-----------|----------|----------|---------| -| dev | `^10.x.x` | `master` | `3.0+` | `>=8.0` | -| stable | `^10.0.0` | `master` | `3.0+` | `>=8.0` | +| dev | `^10.2.x` | `master` | `3.0+` | `>=8.0` | +| stable | `^10.1.0` | `master` | `3.0+` | `>=8.0` | ## Development This package is currently maintaining by these authors. - ----- diff --git a/src/Connection/Client.php b/src/Connection/Client.php index 5bf17ae..d2b6765 100644 --- a/src/Connection/Client.php +++ b/src/Connection/Client.php @@ -24,9 +24,14 @@ class Client extends BunnyClient */ public function __construct(array $options = []) { - // Construct parent parent::__construct($options); - $this->options['cycle_callback'] = $options['cycle_callback']; + + $this->options['cycle_callback'] = is_callable($options['cycle_callback']) + ? $options['cycle_callback'] + : null; + $this->options['heartbeat_callback'] = is_callable($options['heartbeat_callback']) + ? $options['heartbeat_callback'] + : null; } /** @@ -37,9 +42,7 @@ public function sendHeartbeat(): void $this->getWriter()->appendFrame(new HeartbeatFrame(), $this->writeBuffer); $this->flushWriteBuffer(); - if (is_callable($this->options['heartbeat_callback'] ?? null)) { - $this->options['heartbeat_callback']->call($this); - } + $this->options['heartbeat_callback']?->call($this); } public function syncDisconnect(): bool @@ -73,7 +76,7 @@ protected function write(): void { parent::write(); if (($last = error_get_last()) !== null) { - if (!Strings::match($last['message'], '~fwrite(): Send of \d+ bytes failed with errno=\d+ broken pipe~i')) { + if (!Strings::match($last['message'], '~fwrite\(\): Send of \d+ bytes failed with errno=\d+ broken pipe~i')) { return; } @@ -101,10 +104,7 @@ public function run($maxSeconds = null): void } do { - if (is_callable($this->options['cycle_callback'] ?? null)) { - $this->options['cycle_callback']->call($this); - } - + $this->options['cycle_callback']?->call($this); if (!empty($this->queue)) { $frame = array_shift($this->queue); } else { diff --git a/src/Connection/Connection.php b/src/Connection/Connection.php index fafbc0b..f66daff 100644 --- a/src/Connection/Connection.php +++ b/src/Connection/Connection.php @@ -40,7 +40,8 @@ public function __construct( bool $tcpNoDelay, bool $lazy = false, ?array $ssl = null, - ?callable $cycleCallback = null + ?callable $cycleCallback = null, + ?callable $heartbeatCallback = null, ) { $this->connectionParams = [ 'host' => $host, @@ -56,6 +57,7 @@ public function __construct( 'tcp_nodelay' => $tcpNoDelay, 'ssl' => $ssl, 'cycle_callback' => $cycleCallback, + 'heartbeat_callback' => $heartbeatCallback, ]; $this->bunnyClient = $this->createNewConnection(); diff --git a/src/Connection/ConnectionFactory.php b/src/Connection/ConnectionFactory.php index 4a5af40..b38e451 100644 --- a/src/Connection/ConnectionFactory.php +++ b/src/Connection/ConnectionFactory.php @@ -117,7 +117,8 @@ private function create(string $name): IConnection $connectionData['tcpNoDelay'], $connectionData['lazy'], $connectionData['ssl'], - fn () => $this->sendHeartbeat() + fn () => $this->sendHeartbeat(), + $connectionData['heartbeatCallback'] ?? null, ); } } diff --git a/src/DI/Helpers/ConnectionsHelper.php b/src/DI/Helpers/ConnectionsHelper.php index 272df5f..63e6e70 100644 --- a/src/DI/Helpers/ConnectionsHelper.php +++ b/src/DI/Helpers/ConnectionsHelper.php @@ -29,6 +29,7 @@ public function getConfigSchema(): Schema 'tcpNoDelay' => Expect::bool(false), 'lazy' => Expect::bool(true), 'ssl' => Expect::array(null)->required(false), + 'heartbeatCallback' => Expect::array(null)->required(false)->assert('is_callable'), 'admin' => Expect::structure([ 'port' => Expect::int(15672), 'secure' => Expect::bool(false), diff --git a/src/DI/Helpers/ConsumersHelper.php b/src/DI/Helpers/ConsumersHelper.php index f3bbb3e..eb9a2d9 100644 --- a/src/DI/Helpers/ConsumersHelper.php +++ b/src/DI/Helpers/ConsumersHelper.php @@ -18,7 +18,7 @@ public function getConfigSchema(): Schema return Expect::arrayOf( Expect::structure([ 'queue' => Expect::string()->required(true), - 'callback' => Expect::array()->required(true), + 'callback' => Expect::array()->required(true)->assert('is_callable'), 'idleTimeout' => Expect::int(30), 'bulk' => Expect::structure([ 'size' => Expect::int()->min(1), diff --git a/src/Producer/Producer.php b/src/Producer/Producer.php index c2a8c5e..e4341b9 100644 --- a/src/Producer/Producer.php +++ b/src/Producer/Producer.php @@ -8,8 +8,6 @@ use Contributte\RabbitMQ\Exchange\IExchange; use Contributte\RabbitMQ\LazyDeclarator; use Contributte\RabbitMQ\Queue\IQueue; -use Nette\Utils\Callback; -use Tracy\Dumper; final class Producer { @@ -39,11 +37,11 @@ public function publish(string $message, array $headers = [], ?string $routingKe $headers = array_merge($this->getBasicHeaders(), $headers); if ($this->queue !== null) { - $this->publishToQueue($message, $headers); + $this->tryPublish($this->queue, $message, $headers, '', $this->queue->getName()); } if ($this->exchange !== null) { - $this->publishToExchange($message, $headers, $routingKey ?? ''); + $this->tryPublish($this->exchange, $message, $headers, $this->exchange->getName(), $routingKey ?? ''); } foreach ($this->publishCallbacks as $callback) { @@ -58,18 +56,6 @@ public function addOnPublishCallback(callable $callback): void } - public function sendHeartbeat(): void - { - trigger_error(__METHOD__ . '() is deprecated, use dependency ConnectionFactory::sendHeartbeat().', E_USER_DEPRECATED); - if ($this->queue !== null) { - $this->queue->getConnection()->sendHeartbeat(); - } - if ($this->exchange !== null) { - $this->exchange->getConnection()->sendHeartbeat(); - } - } - - /** * @return array */ @@ -81,30 +67,6 @@ private function getBasicHeaders(): array ]; } - /** - * @param array $headers - */ - private function publishToQueue(string $message, array $headers = []): void - { - if (null === $queue = $this->queue) { - throw new \UnexpectedValueException('Queue is not defined'); - } - - $this->tryPublish($queue, $message, $headers, '', $queue->getName()); - } - - - /** - * @param array $headers - */ - private function publishToExchange(string $message, array $headers, string $routingKey): void - { - if (null === $exchange = $this->exchange) { - throw new \UnexpectedValueException('Exchange is not defined'); - } - - $this->tryPublish($exchange, $message, $headers, $exchange->getName(), $routingKey); - } /** * @param array $headers