From dfb6cffc2d73da0d8ba15733357f62a4eaf24a40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radovan=20Kep=C3=A1k?= Date: Thu, 21 Jul 2022 12:46:18 +0200 Subject: [PATCH] Feature: fixes --- composer.json | 1 + src/Connection/Client.php | 37 ++++++++++++++++++++++++---- src/Connection/Connection.php | 13 +++------- src/Connection/ConnectionFactory.php | 2 +- src/Connection/IConnection.php | 2 +- src/Connection/PublishConfirm.php | 24 ------------------ src/Producer/Producer.php | 16 +++++++++--- tests/Cases/ProducerTest.phpt | 4 +-- 8 files changed, 54 insertions(+), 45 deletions(-) delete mode 100644 src/Connection/PublishConfirm.php diff --git a/composer.json b/composer.json index cc90be2..0c4db13 100644 --- a/composer.json +++ b/composer.json @@ -36,6 +36,7 @@ }, "require": { "php": "^8.0", + "ext-pcntl": "*", "bunny/bunny": "^0.5", "symfony/console": "~3.3 || ^4.0 || ^5.0 || ^6.0", "nette/di": "^3.0.7", diff --git a/src/Connection/Client.php b/src/Connection/Client.php index d2b6765..8b1c7bb 100644 --- a/src/Connection/Client.php +++ b/src/Connection/Client.php @@ -8,8 +8,7 @@ use Bunny\ClientStateEnum; use Bunny\Exception\BunnyException; use Bunny\Exception\ClientException; -use Bunny\Protocol\AbstractFrame; -use Bunny\Protocol\HeartbeatFrame; +use Bunny\Protocol; use Nette\Utils\Strings; /** @@ -39,7 +38,7 @@ public function __construct(array $options = []) */ public function sendHeartbeat(): void { - $this->getWriter()->appendFrame(new HeartbeatFrame(), $this->writeBuffer); + $this->getWriter()->appendFrame(new Protocol\HeartbeatFrame(), $this->writeBuffer); $this->flushWriteBuffer(); $this->options['heartbeat_callback']?->call($this); @@ -108,7 +107,7 @@ public function run($maxSeconds = null): void if (!empty($this->queue)) { $frame = array_shift($this->queue); } else { - /** @var AbstractFrame|null $frame */ + /** @var Protocol\AbstractFrame|null $frame */ $frame = $this->reader->consumeFrame($this->readBuffer); if ($frame === null) { $now = microtime(true); @@ -152,7 +151,6 @@ public function run($maxSeconds = null): void } } - /** @var AbstractFrame $frame */ if ($frame->channel === 0) { $this->onFrameReceived($frame); } else { @@ -166,4 +164,33 @@ public function run($maxSeconds = null): void } } while ($this->running); } + + public function waitForConfirm(int $channel): Protocol\MethodBasicAckFrame|Protocol\MethodBasicNackFrame + { + $frame = null; // psalm bug + while (true) { + /** + * @phpstan-ignore-next-line + */ + while (($frame = $this->getReader()->consumeFrame($this->getReadBuffer())) === null) { + $this->feedReadBuffer(); + } + + if ($frame->channel === $channel && ($frame instanceof Protocol\MethodBasicNackFrame || $frame instanceof Protocol\MethodBasicAckFrame)) { + return $frame; + } + + if ($frame instanceof Protocol\MethodChannelCloseFrame && $frame->channel === $channel) { + $this->channelCloseOk($channel); + throw new ClientException($frame->replyText, $frame->replyCode); + } + + if ($frame instanceof Protocol\MethodConnectionCloseFrame) { + $this->connectionCloseOk(); + throw new ClientException($frame->replyText, $frame->replyCode); + } + + $this->enqueue($frame); + } + } } diff --git a/src/Connection/Connection.php b/src/Connection/Connection.php index b0475cc..0f56c22 100644 --- a/src/Connection/Connection.php +++ b/src/Connection/Connection.php @@ -6,8 +6,6 @@ use Bunny\Channel; use Bunny\Exception\ClientException; -use Bunny\Protocol\MethodBasicAckFrame; -use Bunny\Protocol\MethodBasicNackFrame; use Contributte\RabbitMQ\Connection\Exception\ConnectionException; final class Connection implements IConnection @@ -24,7 +22,6 @@ final class Connection implements IConnection private float $heartbeat; private int $lastBeat = 0; private ?Channel $channel = null; - private ?PublishConfirm $publishConfirm = null; /** * @param array $ssl @@ -45,7 +42,7 @@ public function __construct( ?array $ssl = null, ?callable $cycleCallback = null, ?callable $heartbeatCallback = null, - private bool $usePublishConfirm = false, + private bool $publishConfirm = false, ) { $this->connectionParams = [ 'host' => $host, @@ -121,10 +118,8 @@ public function getChannel(): Channel $this->channel = $channel; } - if ($this->usePublishConfirm) { - $this->channel->confirmSelect(function (MethodBasicAckFrame|MethodBasicNackFrame $frame) { - $this->publishConfirm = new PublishConfirm($frame instanceof MethodBasicAckFrame, $frame->deliveryTag); - }); + if ($this->publishConfirm) { + $this->channel->confirmSelect(); } return $this->channel; @@ -142,7 +137,7 @@ public function connectIfNeeded(): void $this->bunnyClient->connect(); } - public function getPublishConfirm(): ?PublishConfirm + public function isPublishConfirm(): bool { return $this->publishConfirm; } diff --git a/src/Connection/ConnectionFactory.php b/src/Connection/ConnectionFactory.php index 362de0a..70e3f37 100644 --- a/src/Connection/ConnectionFactory.php +++ b/src/Connection/ConnectionFactory.php @@ -119,7 +119,7 @@ private function create(string $name): IConnection ssl: $connectionData['ssl'], cycleCallback: fn () => $this->sendHeartbeat(), heartbeatCallback: $connectionData['heartbeatCallback'] ?? null, - usePublishConfirm: $connectionData['publishConfirm'], + publishConfirm: $connectionData['publishConfirm'], ); } } diff --git a/src/Connection/IConnection.php b/src/Connection/IConnection.php index c5a82e5..2825642 100644 --- a/src/Connection/IConnection.php +++ b/src/Connection/IConnection.php @@ -23,7 +23,7 @@ public function getChannel(): Channel; public function sendHeartbeat(): void; public function isConnected(): bool; public function getVhost(): string; - public function getPublishConfirm(): ?PublishConfirm; + public function isPublishConfirm(): bool; /** @internal */ public function resetChannel(): void; diff --git a/src/Connection/PublishConfirm.php b/src/Connection/PublishConfirm.php deleted file mode 100644 index b24d385..0000000 --- a/src/Connection/PublishConfirm.php +++ /dev/null @@ -1,24 +0,0 @@ -isAck; - } - - public function deliveryTag(): int - { - return $this->deliveryTag; - } -} diff --git a/src/Producer/Producer.php b/src/Producer/Producer.php index b62da07..8258fda 100644 --- a/src/Producer/Producer.php +++ b/src/Producer/Producer.php @@ -5,6 +5,8 @@ namespace Contributte\RabbitMQ\Producer; use Bunny\Exception\ClientException; +use Bunny\Protocol\MethodBasicNackFrame; +use Contributte\RabbitMQ\Connection\Client; use Contributte\RabbitMQ\Connection\Exception\PublishException; use Contributte\RabbitMQ\Exchange\IExchange; use Contributte\RabbitMQ\LazyDeclarator; @@ -81,9 +83,17 @@ private function tryPublish(IQueue|IExchange $target, string $message, array $he $exchange, $routingKey ); - $confirm = $target->getConnection()->getPublishConfirm(); - if ($confirm !== null && $confirm->deliveryTag() === $deliveryTag && !$confirm->isAck()) { - throw new PublishException("Publish of message failed.\nExchange:{$exchange}\nRoutingKey:{$routingKey}"); + + if ($target->getConnection()->isPublishConfirm()) { + $client = $target->getConnection()->getChannel()->getClient(); + if (!$client instanceof Client) { + return; + } + + $frame = $client->waitForConfirm($target->getConnection()->getChannel()->getChannelId()); + if ($frame instanceof MethodBasicNackFrame && $frame->deliveryTag === $deliveryTag) { + throw new PublishException("Publish of message failed.\nExchange:{$exchange}\nRoutingKey:{$routingKey}"); + } } } catch (ClientException $e) { if ($try >= 2) { diff --git a/tests/Cases/ProducerTest.phpt b/tests/Cases/ProducerTest.phpt index c983677..c8496c5 100644 --- a/tests/Cases/ProducerTest.phpt +++ b/tests/Cases/ProducerTest.phpt @@ -302,7 +302,7 @@ final class ProducerTest extends TestCase $connectionMock = \Mockery::mock(IConnection::class) ->shouldReceive('getChannel')->andReturn($channelMock)->getMock() - ->shouldReceive('getPublishConfirm')->andReturnNull()->getMock(); + ->shouldReceive('isPublishConfirm')->andReturnFalse()->getMock(); $queueMock = \Mockery::mock(IQueue::class) ->shouldReceive('getConnection')->andReturn($connectionMock)->getMock() @@ -326,7 +326,7 @@ final class ProducerTest extends TestCase $connectionMock = \Mockery::mock(IConnection::class) ->shouldReceive('getChannel')->andReturn($channelMock)->getMock() - ->shouldReceive('getPublishConfirm')->andReturnNull()->getMock(); + ->shouldReceive('isPublishConfirm')->andReturnFalse()->getMock(); $exchangeMock = \Mockery::mock(IExchange::class) ->shouldReceive('getConnection')->andReturn($connectionMock)->getMock()