From aa017112417d2406a40728937b4003af1ac4a4df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radovan=20Kep=C3=A1k?= Date: Tue, 19 Jul 2022 16:42:51 +0200 Subject: [PATCH 1/3] Feature: added publish confirm option --- src/Connection/Connection.php | 15 ++++++++++++ src/Connection/Exception/PublishException.php | 9 +++++++ src/Connection/IConnection.php | 2 ++ src/Connection/PublishConfirm.php | 24 +++++++++++++++++++ src/DI/Helpers/ConnectionsHelper.php | 1 + src/Producer/Producer.php | 15 ++++++++---- tests/Cases/ProducerTest.phpt | 16 +++++++------ 7 files changed, 70 insertions(+), 12 deletions(-) create mode 100644 src/Connection/Exception/PublishException.php create mode 100644 src/Connection/PublishConfirm.php diff --git a/src/Connection/Connection.php b/src/Connection/Connection.php index f66daff..c4c58e5 100644 --- a/src/Connection/Connection.php +++ b/src/Connection/Connection.php @@ -6,6 +6,8 @@ use Bunny\Channel; use Bunny\Exception\ClientException; +use Bunny\Protocol\MethodBasicAckFrame; +use Bunny\Protocol\MethodBasicNackFrame; use Contributte\RabbitMQ\Connection\Exception\ConnectionException; final class Connection implements IConnection @@ -22,6 +24,7 @@ final class Connection implements IConnection private float $heartbeat; private int $lastBeat = 0; private ?Channel $channel = null; + private ?PublishConfirm $publishConfirm = null; /** * @param array $ssl @@ -42,6 +45,7 @@ public function __construct( ?array $ssl = null, ?callable $cycleCallback = null, ?callable $heartbeatCallback = null, + private bool $confirmMode = false, ) { $this->connectionParams = [ 'host' => $host, @@ -116,6 +120,12 @@ public function getChannel(): Channel $this->channel = $channel; } + + if ($this->confirmMode) { + $this->channel->confirmSelect(function (MethodBasicAckFrame|MethodBasicNackFrame $frame) { + $this->publishConfirm = new PublishConfirm($frame instanceof MethodBasicAckFrame, $frame->deliveryTag); + }); + } return $this->channel; } @@ -132,6 +142,11 @@ public function connectIfNeeded(): void $this->bunnyClient->connect(); } + public function getPublishConfirm(): ?PublishConfirm + { + return $this->publishConfirm; + } + public function sendHeartbeat(): void { $now = time(); diff --git a/src/Connection/Exception/PublishException.php b/src/Connection/Exception/PublishException.php new file mode 100644 index 0000000..7c75c5a --- /dev/null +++ b/src/Connection/Exception/PublishException.php @@ -0,0 +1,9 @@ +isAck; + } + + public function deliveryTag(): int + { + return $this->deliveryTag; + } +} diff --git a/src/DI/Helpers/ConnectionsHelper.php b/src/DI/Helpers/ConnectionsHelper.php index 63e6e70..9dc46e2 100644 --- a/src/DI/Helpers/ConnectionsHelper.php +++ b/src/DI/Helpers/ConnectionsHelper.php @@ -30,6 +30,7 @@ public function getConfigSchema(): Schema 'lazy' => Expect::bool(true), 'ssl' => Expect::array(null)->required(false), 'heartbeatCallback' => Expect::array(null)->required(false)->assert('is_callable'), + 'publishConfirm' => Expect::bool(false), 'admin' => Expect::structure([ 'port' => Expect::int(15672), 'secure' => Expect::bool(false), diff --git a/src/Producer/Producer.php b/src/Producer/Producer.php index e4341b9..b62da07 100644 --- a/src/Producer/Producer.php +++ b/src/Producer/Producer.php @@ -5,6 +5,7 @@ namespace Contributte\RabbitMQ\Producer; use Bunny\Exception\ClientException; +use Contributte\RabbitMQ\Connection\Exception\PublishException; use Contributte\RabbitMQ\Exchange\IExchange; use Contributte\RabbitMQ\LazyDeclarator; use Contributte\RabbitMQ\Queue\IQueue; @@ -21,10 +22,10 @@ final class Producer private array $publishCallbacks = []; public function __construct( - private ?IExchange $exchange, - private ?IQueue $queue, - private string $contentType, - private int $deliveryMode, + private ?IExchange $exchange, + private ?IQueue $queue, + private string $contentType, + private int $deliveryMode, private LazyDeclarator $lazyDeclarator ) { } @@ -74,12 +75,16 @@ private function getBasicHeaders(): array private function tryPublish(IQueue|IExchange $target, string $message, array $headers, string $exchange, string $routingKey, int $try = 0): void { try { - $target->getConnection()->getChannel()->publish( + $deliveryTag = $target->getConnection()->getChannel()->publish( $message, $headers, $exchange, $routingKey ); + $confirm = $target->getConnection()->getPublishConfirm(); + if ($confirm !== null && $confirm->deliveryTag() === $deliveryTag && !$confirm->isAck()) { + throw new PublishException("Publish of message failed.\nExchange:{$exchange}\nRoutingKey:{$routingKey}"); + } } catch (ClientException $e) { if ($try >= 2) { throw $e; diff --git a/tests/Cases/ProducerTest.phpt b/tests/Cases/ProducerTest.phpt index bd6e3a9..c983677 100644 --- a/tests/Cases/ProducerTest.phpt +++ b/tests/Cases/ProducerTest.phpt @@ -301,11 +301,12 @@ final class ProducerTest extends TestCase $channelMock = new ChannelMock(); $connectionMock = \Mockery::mock(IConnection::class) - ->shouldReceive('getChannel')->andReturn($channelMock)->getMock(); + ->shouldReceive('getChannel')->andReturn($channelMock)->getMock() + ->shouldReceive('getPublishConfirm')->andReturnNull()->getMock(); $queueMock = \Mockery::mock(IQueue::class) - ->shouldReceive('getConnection')->andReturn($connectionMock)->getMock() - ->shouldReceive('getName')->andReturn($testQueueName)->getMock(); + ->shouldReceive('getConnection')->andReturn($connectionMock)->getMock() + ->shouldReceive('getName')->andReturn($testQueueName)->getMock(); $producer = new Producer( null, @@ -324,11 +325,12 @@ final class ProducerTest extends TestCase $channelMock = new ChannelMock(); $connectionMock = \Mockery::mock(IConnection::class) - ->shouldReceive('getChannel')->andReturn($channelMock)->getMock(); + ->shouldReceive('getChannel')->andReturn($channelMock)->getMock() + ->shouldReceive('getPublishConfirm')->andReturnNull()->getMock(); $exchangeMock = \Mockery::mock(IExchange::class) - ->shouldReceive('getConnection')->andReturn($connectionMock)->getMock() - ->shouldReceive('getName')->andReturn($testExchange)->getMock(); + ->shouldReceive('getConnection')->andReturn($connectionMock)->getMock() + ->shouldReceive('getName')->andReturn($testExchange)->getMock(); $producer = new Producer( $exchangeMock, @@ -344,7 +346,7 @@ final class ProducerTest extends TestCase protected function createLazyDeclarator(): LazyDeclarator { - return new class extends LazyDeclarator{ + return new class extends LazyDeclarator { public function __construct() { $this->queuesDataBag = \Mockery::spy(QueuesDataBag::class); From 14de7f117e892b281e0f71c4bc7ecd8c6e67a62c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radovan=20Kep=C3=A1k?= Date: Wed, 20 Jul 2022 09:05:41 +0200 Subject: [PATCH 2/3] Small changes --- .docs/README.md | 1 + composer.json | 2 +- src/Connection/Connection.php | 6 +++--- src/Connection/ConnectionFactory.php | 29 ++++++++++++++-------------- src/DI/Helpers/ConnectionsHelper.php | 2 +- src/DI/Helpers/ConsumersHelper.php | 4 ++-- 6 files changed, 23 insertions(+), 21 deletions(-) diff --git a/.docs/README.md b/.docs/README.md index 51d196b..8c31e26 100644 --- a/.docs/README.md +++ b/.docs/README.md @@ -35,6 +35,7 @@ rabbitmq: host: localhost port: 5672 lazy: false + publishConfirm: true # enables confirm mode on rabbitmq publish (this inform you about ack/nack due policy, this function is experimental) heartbeatCallback: [@class, function] # Callback that is called every time real heartbeat is send queues: diff --git a/composer.json b/composer.json index 6807eed..cc90be2 100644 --- a/composer.json +++ b/composer.json @@ -36,7 +36,7 @@ }, "require": { "php": "^8.0", - "bunny/bunny": "^0.2.4 || ^0.3 || ^0.4 || ^0.5", + "bunny/bunny": "^0.5", "symfony/console": "~3.3 || ^4.0 || ^5.0 || ^6.0", "nette/di": "^3.0.7", "nette/utils": "^3.2.0", diff --git a/src/Connection/Connection.php b/src/Connection/Connection.php index c4c58e5..b0475cc 100644 --- a/src/Connection/Connection.php +++ b/src/Connection/Connection.php @@ -45,7 +45,7 @@ public function __construct( ?array $ssl = null, ?callable $cycleCallback = null, ?callable $heartbeatCallback = null, - private bool $confirmMode = false, + private bool $usePublishConfirm = false, ) { $this->connectionParams = [ 'host' => $host, @@ -120,8 +120,8 @@ public function getChannel(): Channel $this->channel = $channel; } - - if ($this->confirmMode) { + + if ($this->usePublishConfirm) { $this->channel->confirmSelect(function (MethodBasicAckFrame|MethodBasicNackFrame $frame) { $this->publishConfirm = new PublishConfirm($frame instanceof MethodBasicAckFrame, $frame->deliveryTag); }); diff --git a/src/Connection/ConnectionFactory.php b/src/Connection/ConnectionFactory.php index b38e451..362de0a 100644 --- a/src/Connection/ConnectionFactory.php +++ b/src/Connection/ConnectionFactory.php @@ -105,20 +105,21 @@ private function create(string $name): IConnection } return new Connection( - $connectionData['host'], - $connectionData['port'], - $connectionData['user'], - $connectionData['password'], - $connectionData['vhost'], - $connectionData['heartbeat'], - $connectionData['timeout'], - $connectionData['persistent'], - $connectionData['path'], - $connectionData['tcpNoDelay'], - $connectionData['lazy'], - $connectionData['ssl'], - fn () => $this->sendHeartbeat(), - $connectionData['heartbeatCallback'] ?? null, + host: $connectionData['host'], + port: $connectionData['port'], + user: $connectionData['user'], + password: $connectionData['password'], + vhost: $connectionData['vhost'], + heartbeat: $connectionData['heartbeat'], + timeout: $connectionData['timeout'], + persistent: $connectionData['persistent'], + path: $connectionData['path'], + tcpNoDelay: $connectionData['tcpNoDelay'], + lazy: $connectionData['lazy'], + ssl: $connectionData['ssl'], + cycleCallback: fn () => $this->sendHeartbeat(), + heartbeatCallback: $connectionData['heartbeatCallback'] ?? null, + usePublishConfirm: $connectionData['publishConfirm'], ); } } diff --git a/src/DI/Helpers/ConnectionsHelper.php b/src/DI/Helpers/ConnectionsHelper.php index 9dc46e2..5cf0e4a 100644 --- a/src/DI/Helpers/ConnectionsHelper.php +++ b/src/DI/Helpers/ConnectionsHelper.php @@ -29,7 +29,7 @@ public function getConfigSchema(): Schema 'tcpNoDelay' => Expect::bool(false), 'lazy' => Expect::bool(true), 'ssl' => Expect::array(null)->required(false), - 'heartbeatCallback' => Expect::array(null)->required(false)->assert('is_callable'), + 'heartbeatCallback' => Expect::array(null)->required(false), 'publishConfirm' => Expect::bool(false), 'admin' => Expect::structure([ 'port' => Expect::int(15672), diff --git a/src/DI/Helpers/ConsumersHelper.php b/src/DI/Helpers/ConsumersHelper.php index eb9a2d9..e48d907 100644 --- a/src/DI/Helpers/ConsumersHelper.php +++ b/src/DI/Helpers/ConsumersHelper.php @@ -17,8 +17,8 @@ public function getConfigSchema(): Schema { return Expect::arrayOf( Expect::structure([ - 'queue' => Expect::string()->required(true), - 'callback' => Expect::array()->required(true)->assert('is_callable'), + 'queue' => Expect::string()->required(), + 'callback' => Expect::array()->required(), 'idleTimeout' => Expect::int(30), 'bulk' => Expect::structure([ 'size' => Expect::int()->min(1), From dfb6cffc2d73da0d8ba15733357f62a4eaf24a40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radovan=20Kep=C3=A1k?= Date: Thu, 21 Jul 2022 12:46:18 +0200 Subject: [PATCH 3/3] Feature: fixes --- composer.json | 1 + src/Connection/Client.php | 37 ++++++++++++++++++++++++---- src/Connection/Connection.php | 13 +++------- src/Connection/ConnectionFactory.php | 2 +- src/Connection/IConnection.php | 2 +- src/Connection/PublishConfirm.php | 24 ------------------ src/Producer/Producer.php | 16 +++++++++--- tests/Cases/ProducerTest.phpt | 4 +-- 8 files changed, 54 insertions(+), 45 deletions(-) delete mode 100644 src/Connection/PublishConfirm.php diff --git a/composer.json b/composer.json index cc90be2..0c4db13 100644 --- a/composer.json +++ b/composer.json @@ -36,6 +36,7 @@ }, "require": { "php": "^8.0", + "ext-pcntl": "*", "bunny/bunny": "^0.5", "symfony/console": "~3.3 || ^4.0 || ^5.0 || ^6.0", "nette/di": "^3.0.7", diff --git a/src/Connection/Client.php b/src/Connection/Client.php index d2b6765..8b1c7bb 100644 --- a/src/Connection/Client.php +++ b/src/Connection/Client.php @@ -8,8 +8,7 @@ use Bunny\ClientStateEnum; use Bunny\Exception\BunnyException; use Bunny\Exception\ClientException; -use Bunny\Protocol\AbstractFrame; -use Bunny\Protocol\HeartbeatFrame; +use Bunny\Protocol; use Nette\Utils\Strings; /** @@ -39,7 +38,7 @@ public function __construct(array $options = []) */ public function sendHeartbeat(): void { - $this->getWriter()->appendFrame(new HeartbeatFrame(), $this->writeBuffer); + $this->getWriter()->appendFrame(new Protocol\HeartbeatFrame(), $this->writeBuffer); $this->flushWriteBuffer(); $this->options['heartbeat_callback']?->call($this); @@ -108,7 +107,7 @@ public function run($maxSeconds = null): void if (!empty($this->queue)) { $frame = array_shift($this->queue); } else { - /** @var AbstractFrame|null $frame */ + /** @var Protocol\AbstractFrame|null $frame */ $frame = $this->reader->consumeFrame($this->readBuffer); if ($frame === null) { $now = microtime(true); @@ -152,7 +151,6 @@ public function run($maxSeconds = null): void } } - /** @var AbstractFrame $frame */ if ($frame->channel === 0) { $this->onFrameReceived($frame); } else { @@ -166,4 +164,33 @@ public function run($maxSeconds = null): void } } while ($this->running); } + + public function waitForConfirm(int $channel): Protocol\MethodBasicAckFrame|Protocol\MethodBasicNackFrame + { + $frame = null; // psalm bug + while (true) { + /** + * @phpstan-ignore-next-line + */ + while (($frame = $this->getReader()->consumeFrame($this->getReadBuffer())) === null) { + $this->feedReadBuffer(); + } + + if ($frame->channel === $channel && ($frame instanceof Protocol\MethodBasicNackFrame || $frame instanceof Protocol\MethodBasicAckFrame)) { + return $frame; + } + + if ($frame instanceof Protocol\MethodChannelCloseFrame && $frame->channel === $channel) { + $this->channelCloseOk($channel); + throw new ClientException($frame->replyText, $frame->replyCode); + } + + if ($frame instanceof Protocol\MethodConnectionCloseFrame) { + $this->connectionCloseOk(); + throw new ClientException($frame->replyText, $frame->replyCode); + } + + $this->enqueue($frame); + } + } } diff --git a/src/Connection/Connection.php b/src/Connection/Connection.php index b0475cc..0f56c22 100644 --- a/src/Connection/Connection.php +++ b/src/Connection/Connection.php @@ -6,8 +6,6 @@ use Bunny\Channel; use Bunny\Exception\ClientException; -use Bunny\Protocol\MethodBasicAckFrame; -use Bunny\Protocol\MethodBasicNackFrame; use Contributte\RabbitMQ\Connection\Exception\ConnectionException; final class Connection implements IConnection @@ -24,7 +22,6 @@ final class Connection implements IConnection private float $heartbeat; private int $lastBeat = 0; private ?Channel $channel = null; - private ?PublishConfirm $publishConfirm = null; /** * @param array $ssl @@ -45,7 +42,7 @@ public function __construct( ?array $ssl = null, ?callable $cycleCallback = null, ?callable $heartbeatCallback = null, - private bool $usePublishConfirm = false, + private bool $publishConfirm = false, ) { $this->connectionParams = [ 'host' => $host, @@ -121,10 +118,8 @@ public function getChannel(): Channel $this->channel = $channel; } - if ($this->usePublishConfirm) { - $this->channel->confirmSelect(function (MethodBasicAckFrame|MethodBasicNackFrame $frame) { - $this->publishConfirm = new PublishConfirm($frame instanceof MethodBasicAckFrame, $frame->deliveryTag); - }); + if ($this->publishConfirm) { + $this->channel->confirmSelect(); } return $this->channel; @@ -142,7 +137,7 @@ public function connectIfNeeded(): void $this->bunnyClient->connect(); } - public function getPublishConfirm(): ?PublishConfirm + public function isPublishConfirm(): bool { return $this->publishConfirm; } diff --git a/src/Connection/ConnectionFactory.php b/src/Connection/ConnectionFactory.php index 362de0a..70e3f37 100644 --- a/src/Connection/ConnectionFactory.php +++ b/src/Connection/ConnectionFactory.php @@ -119,7 +119,7 @@ private function create(string $name): IConnection ssl: $connectionData['ssl'], cycleCallback: fn () => $this->sendHeartbeat(), heartbeatCallback: $connectionData['heartbeatCallback'] ?? null, - usePublishConfirm: $connectionData['publishConfirm'], + publishConfirm: $connectionData['publishConfirm'], ); } } diff --git a/src/Connection/IConnection.php b/src/Connection/IConnection.php index c5a82e5..2825642 100644 --- a/src/Connection/IConnection.php +++ b/src/Connection/IConnection.php @@ -23,7 +23,7 @@ public function getChannel(): Channel; public function sendHeartbeat(): void; public function isConnected(): bool; public function getVhost(): string; - public function getPublishConfirm(): ?PublishConfirm; + public function isPublishConfirm(): bool; /** @internal */ public function resetChannel(): void; diff --git a/src/Connection/PublishConfirm.php b/src/Connection/PublishConfirm.php deleted file mode 100644 index b24d385..0000000 --- a/src/Connection/PublishConfirm.php +++ /dev/null @@ -1,24 +0,0 @@ -isAck; - } - - public function deliveryTag(): int - { - return $this->deliveryTag; - } -} diff --git a/src/Producer/Producer.php b/src/Producer/Producer.php index b62da07..8258fda 100644 --- a/src/Producer/Producer.php +++ b/src/Producer/Producer.php @@ -5,6 +5,8 @@ namespace Contributte\RabbitMQ\Producer; use Bunny\Exception\ClientException; +use Bunny\Protocol\MethodBasicNackFrame; +use Contributte\RabbitMQ\Connection\Client; use Contributte\RabbitMQ\Connection\Exception\PublishException; use Contributte\RabbitMQ\Exchange\IExchange; use Contributte\RabbitMQ\LazyDeclarator; @@ -81,9 +83,17 @@ private function tryPublish(IQueue|IExchange $target, string $message, array $he $exchange, $routingKey ); - $confirm = $target->getConnection()->getPublishConfirm(); - if ($confirm !== null && $confirm->deliveryTag() === $deliveryTag && !$confirm->isAck()) { - throw new PublishException("Publish of message failed.\nExchange:{$exchange}\nRoutingKey:{$routingKey}"); + + if ($target->getConnection()->isPublishConfirm()) { + $client = $target->getConnection()->getChannel()->getClient(); + if (!$client instanceof Client) { + return; + } + + $frame = $client->waitForConfirm($target->getConnection()->getChannel()->getChannelId()); + if ($frame instanceof MethodBasicNackFrame && $frame->deliveryTag === $deliveryTag) { + throw new PublishException("Publish of message failed.\nExchange:{$exchange}\nRoutingKey:{$routingKey}"); + } } } catch (ClientException $e) { if ($try >= 2) { diff --git a/tests/Cases/ProducerTest.phpt b/tests/Cases/ProducerTest.phpt index c983677..c8496c5 100644 --- a/tests/Cases/ProducerTest.phpt +++ b/tests/Cases/ProducerTest.phpt @@ -302,7 +302,7 @@ final class ProducerTest extends TestCase $connectionMock = \Mockery::mock(IConnection::class) ->shouldReceive('getChannel')->andReturn($channelMock)->getMock() - ->shouldReceive('getPublishConfirm')->andReturnNull()->getMock(); + ->shouldReceive('isPublishConfirm')->andReturnFalse()->getMock(); $queueMock = \Mockery::mock(IQueue::class) ->shouldReceive('getConnection')->andReturn($connectionMock)->getMock() @@ -326,7 +326,7 @@ final class ProducerTest extends TestCase $connectionMock = \Mockery::mock(IConnection::class) ->shouldReceive('getChannel')->andReturn($channelMock)->getMock() - ->shouldReceive('getPublishConfirm')->andReturnNull()->getMock(); + ->shouldReceive('isPublishConfirm')->andReturnFalse()->getMock(); $exchangeMock = \Mockery::mock(IExchange::class) ->shouldReceive('getConnection')->andReturn($connectionMock)->getMock()