diff --git a/.docs/README.md b/.docs/README.md index 51d196b..8c31e26 100644 --- a/.docs/README.md +++ b/.docs/README.md @@ -35,6 +35,7 @@ rabbitmq: host: localhost port: 5672 lazy: false + publishConfirm: true # enables confirm mode on rabbitmq publish (this inform you about ack/nack due policy, this function is experimental) heartbeatCallback: [@class, function] # Callback that is called every time real heartbeat is send queues: diff --git a/composer.json b/composer.json index 6807eed..0c4db13 100644 --- a/composer.json +++ b/composer.json @@ -36,7 +36,8 @@ }, "require": { "php": "^8.0", - "bunny/bunny": "^0.2.4 || ^0.3 || ^0.4 || ^0.5", + "ext-pcntl": "*", + "bunny/bunny": "^0.5", "symfony/console": "~3.3 || ^4.0 || ^5.0 || ^6.0", "nette/di": "^3.0.7", "nette/utils": "^3.2.0", diff --git a/src/Connection/Client.php b/src/Connection/Client.php index d2b6765..8b1c7bb 100644 --- a/src/Connection/Client.php +++ b/src/Connection/Client.php @@ -8,8 +8,7 @@ use Bunny\ClientStateEnum; use Bunny\Exception\BunnyException; use Bunny\Exception\ClientException; -use Bunny\Protocol\AbstractFrame; -use Bunny\Protocol\HeartbeatFrame; +use Bunny\Protocol; use Nette\Utils\Strings; /** @@ -39,7 +38,7 @@ public function __construct(array $options = []) */ public function sendHeartbeat(): void { - $this->getWriter()->appendFrame(new HeartbeatFrame(), $this->writeBuffer); + $this->getWriter()->appendFrame(new Protocol\HeartbeatFrame(), $this->writeBuffer); $this->flushWriteBuffer(); $this->options['heartbeat_callback']?->call($this); @@ -108,7 +107,7 @@ public function run($maxSeconds = null): void if (!empty($this->queue)) { $frame = array_shift($this->queue); } else { - /** @var AbstractFrame|null $frame */ + /** @var Protocol\AbstractFrame|null $frame */ $frame = $this->reader->consumeFrame($this->readBuffer); if ($frame === null) { $now = microtime(true); @@ -152,7 +151,6 @@ public function run($maxSeconds = null): void } } - /** @var AbstractFrame $frame */ if ($frame->channel === 0) { $this->onFrameReceived($frame); } else { @@ -166,4 +164,33 @@ public function run($maxSeconds = null): void } } while ($this->running); } + + public function waitForConfirm(int $channel): Protocol\MethodBasicAckFrame|Protocol\MethodBasicNackFrame + { + $frame = null; // psalm bug + while (true) { + /** + * @phpstan-ignore-next-line + */ + while (($frame = $this->getReader()->consumeFrame($this->getReadBuffer())) === null) { + $this->feedReadBuffer(); + } + + if ($frame->channel === $channel && ($frame instanceof Protocol\MethodBasicNackFrame || $frame instanceof Protocol\MethodBasicAckFrame)) { + return $frame; + } + + if ($frame instanceof Protocol\MethodChannelCloseFrame && $frame->channel === $channel) { + $this->channelCloseOk($channel); + throw new ClientException($frame->replyText, $frame->replyCode); + } + + if ($frame instanceof Protocol\MethodConnectionCloseFrame) { + $this->connectionCloseOk(); + throw new ClientException($frame->replyText, $frame->replyCode); + } + + $this->enqueue($frame); + } + } } diff --git a/src/Connection/Connection.php b/src/Connection/Connection.php index f66daff..0f56c22 100644 --- a/src/Connection/Connection.php +++ b/src/Connection/Connection.php @@ -42,6 +42,7 @@ public function __construct( ?array $ssl = null, ?callable $cycleCallback = null, ?callable $heartbeatCallback = null, + private bool $publishConfirm = false, ) { $this->connectionParams = [ 'host' => $host, @@ -117,6 +118,10 @@ public function getChannel(): Channel $this->channel = $channel; } + if ($this->publishConfirm) { + $this->channel->confirmSelect(); + } + return $this->channel; } @@ -132,6 +137,11 @@ public function connectIfNeeded(): void $this->bunnyClient->connect(); } + public function isPublishConfirm(): bool + { + return $this->publishConfirm; + } + public function sendHeartbeat(): void { $now = time(); diff --git a/src/Connection/ConnectionFactory.php b/src/Connection/ConnectionFactory.php index b38e451..70e3f37 100644 --- a/src/Connection/ConnectionFactory.php +++ b/src/Connection/ConnectionFactory.php @@ -105,20 +105,21 @@ private function create(string $name): IConnection } return new Connection( - $connectionData['host'], - $connectionData['port'], - $connectionData['user'], - $connectionData['password'], - $connectionData['vhost'], - $connectionData['heartbeat'], - $connectionData['timeout'], - $connectionData['persistent'], - $connectionData['path'], - $connectionData['tcpNoDelay'], - $connectionData['lazy'], - $connectionData['ssl'], - fn () => $this->sendHeartbeat(), - $connectionData['heartbeatCallback'] ?? null, + host: $connectionData['host'], + port: $connectionData['port'], + user: $connectionData['user'], + password: $connectionData['password'], + vhost: $connectionData['vhost'], + heartbeat: $connectionData['heartbeat'], + timeout: $connectionData['timeout'], + persistent: $connectionData['persistent'], + path: $connectionData['path'], + tcpNoDelay: $connectionData['tcpNoDelay'], + lazy: $connectionData['lazy'], + ssl: $connectionData['ssl'], + cycleCallback: fn () => $this->sendHeartbeat(), + heartbeatCallback: $connectionData['heartbeatCallback'] ?? null, + publishConfirm: $connectionData['publishConfirm'], ); } } diff --git a/src/Connection/Exception/PublishException.php b/src/Connection/Exception/PublishException.php new file mode 100644 index 0000000..7c75c5a --- /dev/null +++ b/src/Connection/Exception/PublishException.php @@ -0,0 +1,9 @@ + Expect::bool(false), 'lazy' => Expect::bool(true), 'ssl' => Expect::array(null)->required(false), - 'heartbeatCallback' => Expect::array(null)->required(false)->assert('is_callable'), + 'heartbeatCallback' => Expect::array(null)->required(false), + 'publishConfirm' => Expect::bool(false), 'admin' => Expect::structure([ 'port' => Expect::int(15672), 'secure' => Expect::bool(false), diff --git a/src/DI/Helpers/ConsumersHelper.php b/src/DI/Helpers/ConsumersHelper.php index eb9a2d9..e48d907 100644 --- a/src/DI/Helpers/ConsumersHelper.php +++ b/src/DI/Helpers/ConsumersHelper.php @@ -17,8 +17,8 @@ public function getConfigSchema(): Schema { return Expect::arrayOf( Expect::structure([ - 'queue' => Expect::string()->required(true), - 'callback' => Expect::array()->required(true)->assert('is_callable'), + 'queue' => Expect::string()->required(), + 'callback' => Expect::array()->required(), 'idleTimeout' => Expect::int(30), 'bulk' => Expect::structure([ 'size' => Expect::int()->min(1), diff --git a/src/Producer/Producer.php b/src/Producer/Producer.php index e4341b9..8258fda 100644 --- a/src/Producer/Producer.php +++ b/src/Producer/Producer.php @@ -5,6 +5,9 @@ namespace Contributte\RabbitMQ\Producer; use Bunny\Exception\ClientException; +use Bunny\Protocol\MethodBasicNackFrame; +use Contributte\RabbitMQ\Connection\Client; +use Contributte\RabbitMQ\Connection\Exception\PublishException; use Contributte\RabbitMQ\Exchange\IExchange; use Contributte\RabbitMQ\LazyDeclarator; use Contributte\RabbitMQ\Queue\IQueue; @@ -21,10 +24,10 @@ final class Producer private array $publishCallbacks = []; public function __construct( - private ?IExchange $exchange, - private ?IQueue $queue, - private string $contentType, - private int $deliveryMode, + private ?IExchange $exchange, + private ?IQueue $queue, + private string $contentType, + private int $deliveryMode, private LazyDeclarator $lazyDeclarator ) { } @@ -74,12 +77,24 @@ private function getBasicHeaders(): array private function tryPublish(IQueue|IExchange $target, string $message, array $headers, string $exchange, string $routingKey, int $try = 0): void { try { - $target->getConnection()->getChannel()->publish( + $deliveryTag = $target->getConnection()->getChannel()->publish( $message, $headers, $exchange, $routingKey ); + + if ($target->getConnection()->isPublishConfirm()) { + $client = $target->getConnection()->getChannel()->getClient(); + if (!$client instanceof Client) { + return; + } + + $frame = $client->waitForConfirm($target->getConnection()->getChannel()->getChannelId()); + if ($frame instanceof MethodBasicNackFrame && $frame->deliveryTag === $deliveryTag) { + throw new PublishException("Publish of message failed.\nExchange:{$exchange}\nRoutingKey:{$routingKey}"); + } + } } catch (ClientException $e) { if ($try >= 2) { throw $e; diff --git a/tests/Cases/ProducerTest.phpt b/tests/Cases/ProducerTest.phpt index bd6e3a9..c8496c5 100644 --- a/tests/Cases/ProducerTest.phpt +++ b/tests/Cases/ProducerTest.phpt @@ -301,11 +301,12 @@ final class ProducerTest extends TestCase $channelMock = new ChannelMock(); $connectionMock = \Mockery::mock(IConnection::class) - ->shouldReceive('getChannel')->andReturn($channelMock)->getMock(); + ->shouldReceive('getChannel')->andReturn($channelMock)->getMock() + ->shouldReceive('isPublishConfirm')->andReturnFalse()->getMock(); $queueMock = \Mockery::mock(IQueue::class) - ->shouldReceive('getConnection')->andReturn($connectionMock)->getMock() - ->shouldReceive('getName')->andReturn($testQueueName)->getMock(); + ->shouldReceive('getConnection')->andReturn($connectionMock)->getMock() + ->shouldReceive('getName')->andReturn($testQueueName)->getMock(); $producer = new Producer( null, @@ -324,11 +325,12 @@ final class ProducerTest extends TestCase $channelMock = new ChannelMock(); $connectionMock = \Mockery::mock(IConnection::class) - ->shouldReceive('getChannel')->andReturn($channelMock)->getMock(); + ->shouldReceive('getChannel')->andReturn($channelMock)->getMock() + ->shouldReceive('isPublishConfirm')->andReturnFalse()->getMock(); $exchangeMock = \Mockery::mock(IExchange::class) - ->shouldReceive('getConnection')->andReturn($connectionMock)->getMock() - ->shouldReceive('getName')->andReturn($testExchange)->getMock(); + ->shouldReceive('getConnection')->andReturn($connectionMock)->getMock() + ->shouldReceive('getName')->andReturn($testExchange)->getMock(); $producer = new Producer( $exchangeMock, @@ -344,7 +346,7 @@ final class ProducerTest extends TestCase protected function createLazyDeclarator(): LazyDeclarator { - return new class extends LazyDeclarator{ + return new class extends LazyDeclarator { public function __construct() { $this->queuesDataBag = \Mockery::spy(QueuesDataBag::class);