Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Commit

Permalink
Merge pull request #17 from mallgroup/feature-publish-confirm
Browse files Browse the repository at this point in the history
Feature: added publish confirm option
  • Loading branch information
bckp authored Jul 21, 2022
2 parents f1cce84 + dfb6cff commit 2e31507
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 35 deletions.
1 change: 1 addition & 0 deletions .docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rabbitmq:
host: localhost
port: 5672
lazy: false
publishConfirm: true # enables confirm mode on rabbitmq publish (this inform you about ack/nack due policy, this function is experimental)
heartbeatCallback: [@class, function] # Callback that is called every time real heartbeat is send
queues:
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
},
"require": {
"php": "^8.0",
"bunny/bunny": "^0.2.4 || ^0.3 || ^0.4 || ^0.5",
"ext-pcntl": "*",
"bunny/bunny": "^0.5",
"symfony/console": "~3.3 || ^4.0 || ^5.0 || ^6.0",
"nette/di": "^3.0.7",
"nette/utils": "^3.2.0",
Expand Down
37 changes: 32 additions & 5 deletions src/Connection/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
use Bunny\ClientStateEnum;
use Bunny\Exception\BunnyException;
use Bunny\Exception\ClientException;
use Bunny\Protocol\AbstractFrame;
use Bunny\Protocol\HeartbeatFrame;
use Bunny\Protocol;
use Nette\Utils\Strings;

/**
Expand Down Expand Up @@ -39,7 +38,7 @@ public function __construct(array $options = [])
*/
public function sendHeartbeat(): void
{
$this->getWriter()->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
$this->getWriter()->appendFrame(new Protocol\HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer();

$this->options['heartbeat_callback']?->call($this);
Expand Down Expand Up @@ -108,7 +107,7 @@ public function run($maxSeconds = null): void
if (!empty($this->queue)) {
$frame = array_shift($this->queue);
} else {
/** @var AbstractFrame|null $frame */
/** @var Protocol\AbstractFrame|null $frame */
$frame = $this->reader->consumeFrame($this->readBuffer);
if ($frame === null) {
$now = microtime(true);
Expand Down Expand Up @@ -152,7 +151,6 @@ public function run($maxSeconds = null): void
}
}

/** @var AbstractFrame $frame */
if ($frame->channel === 0) {
$this->onFrameReceived($frame);
} else {
Expand All @@ -166,4 +164,33 @@ public function run($maxSeconds = null): void
}
} while ($this->running);
}

public function waitForConfirm(int $channel): Protocol\MethodBasicAckFrame|Protocol\MethodBasicNackFrame
{
$frame = null; // psalm bug
while (true) {
/**
* @phpstan-ignore-next-line
*/
while (($frame = $this->getReader()->consumeFrame($this->getReadBuffer())) === null) {
$this->feedReadBuffer();
}

if ($frame->channel === $channel && ($frame instanceof Protocol\MethodBasicNackFrame || $frame instanceof Protocol\MethodBasicAckFrame)) {
return $frame;
}

if ($frame instanceof Protocol\MethodChannelCloseFrame && $frame->channel === $channel) {
$this->channelCloseOk($channel);
throw new ClientException($frame->replyText, $frame->replyCode);
}

if ($frame instanceof Protocol\MethodConnectionCloseFrame) {
$this->connectionCloseOk();
throw new ClientException($frame->replyText, $frame->replyCode);
}

$this->enqueue($frame);
}
}
}
10 changes: 10 additions & 0 deletions src/Connection/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public function __construct(
?array $ssl = null,
?callable $cycleCallback = null,
?callable $heartbeatCallback = null,
private bool $publishConfirm = false,
) {
$this->connectionParams = [
'host' => $host,
Expand Down Expand Up @@ -117,6 +118,10 @@ public function getChannel(): Channel
$this->channel = $channel;
}

if ($this->publishConfirm) {
$this->channel->confirmSelect();
}

return $this->channel;
}

Expand All @@ -132,6 +137,11 @@ public function connectIfNeeded(): void
$this->bunnyClient->connect();
}

public function isPublishConfirm(): bool
{
return $this->publishConfirm;
}

public function sendHeartbeat(): void
{
$now = time();
Expand Down
29 changes: 15 additions & 14 deletions src/Connection/ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,21 @@ private function create(string $name): IConnection
}

return new Connection(
$connectionData['host'],
$connectionData['port'],
$connectionData['user'],
$connectionData['password'],
$connectionData['vhost'],
$connectionData['heartbeat'],
$connectionData['timeout'],
$connectionData['persistent'],
$connectionData['path'],
$connectionData['tcpNoDelay'],
$connectionData['lazy'],
$connectionData['ssl'],
fn () => $this->sendHeartbeat(),
$connectionData['heartbeatCallback'] ?? null,
host: $connectionData['host'],
port: $connectionData['port'],
user: $connectionData['user'],
password: $connectionData['password'],
vhost: $connectionData['vhost'],
heartbeat: $connectionData['heartbeat'],
timeout: $connectionData['timeout'],
persistent: $connectionData['persistent'],
path: $connectionData['path'],
tcpNoDelay: $connectionData['tcpNoDelay'],
lazy: $connectionData['lazy'],
ssl: $connectionData['ssl'],
cycleCallback: fn () => $this->sendHeartbeat(),
heartbeatCallback: $connectionData['heartbeatCallback'] ?? null,
publishConfirm: $connectionData['publishConfirm'],
);
}
}
9 changes: 9 additions & 0 deletions src/Connection/Exception/PublishException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Contributte\RabbitMQ\Connection\Exception;

class PublishException extends ConnectionException
{
}
2 changes: 2 additions & 0 deletions src/Connection/IConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Bunny\Channel;
use Bunny\Exception\BunnyException;
use Bunny\Protocol\MethodFrame;
use Contributte\RabbitMQ\Connection\Exception\ConnectionException;

interface IConnection
Expand All @@ -22,6 +23,7 @@ public function getChannel(): Channel;
public function sendHeartbeat(): void;
public function isConnected(): bool;
public function getVhost(): string;
public function isPublishConfirm(): bool;

/** @internal */
public function resetChannel(): void;
Expand Down
3 changes: 2 additions & 1 deletion src/DI/Helpers/ConnectionsHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public function getConfigSchema(): Schema
'tcpNoDelay' => Expect::bool(false),
'lazy' => Expect::bool(true),
'ssl' => Expect::array(null)->required(false),
'heartbeatCallback' => Expect::array(null)->required(false)->assert('is_callable'),
'heartbeatCallback' => Expect::array(null)->required(false),
'publishConfirm' => Expect::bool(false),
'admin' => Expect::structure([
'port' => Expect::int(15672),
'secure' => Expect::bool(false),
Expand Down
4 changes: 2 additions & 2 deletions src/DI/Helpers/ConsumersHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ public function getConfigSchema(): Schema
{
return Expect::arrayOf(
Expect::structure([
'queue' => Expect::string()->required(true),
'callback' => Expect::array()->required(true)->assert('is_callable'),
'queue' => Expect::string()->required(),
'callback' => Expect::array()->required(),
'idleTimeout' => Expect::int(30),
'bulk' => Expect::structure([
'size' => Expect::int()->min(1),
Expand Down
25 changes: 20 additions & 5 deletions src/Producer/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
namespace Contributte\RabbitMQ\Producer;

use Bunny\Exception\ClientException;
use Bunny\Protocol\MethodBasicNackFrame;
use Contributte\RabbitMQ\Connection\Client;
use Contributte\RabbitMQ\Connection\Exception\PublishException;
use Contributte\RabbitMQ\Exchange\IExchange;
use Contributte\RabbitMQ\LazyDeclarator;
use Contributte\RabbitMQ\Queue\IQueue;
Expand All @@ -21,10 +24,10 @@ final class Producer
private array $publishCallbacks = [];

public function __construct(
private ?IExchange $exchange,
private ?IQueue $queue,
private string $contentType,
private int $deliveryMode,
private ?IExchange $exchange,
private ?IQueue $queue,
private string $contentType,
private int $deliveryMode,
private LazyDeclarator $lazyDeclarator
) {
}
Expand Down Expand Up @@ -74,12 +77,24 @@ private function getBasicHeaders(): array
private function tryPublish(IQueue|IExchange $target, string $message, array $headers, string $exchange, string $routingKey, int $try = 0): void
{
try {
$target->getConnection()->getChannel()->publish(
$deliveryTag = $target->getConnection()->getChannel()->publish(
$message,
$headers,
$exchange,
$routingKey
);

if ($target->getConnection()->isPublishConfirm()) {
$client = $target->getConnection()->getChannel()->getClient();
if (!$client instanceof Client) {
return;
}

$frame = $client->waitForConfirm($target->getConnection()->getChannel()->getChannelId());
if ($frame instanceof MethodBasicNackFrame && $frame->deliveryTag === $deliveryTag) {
throw new PublishException("Publish of message failed.\nExchange:{$exchange}\nRoutingKey:{$routingKey}");
}
}
} catch (ClientException $e) {
if ($try >= 2) {
throw $e;
Expand Down
16 changes: 9 additions & 7 deletions tests/Cases/ProducerTest.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,12 @@ final class ProducerTest extends TestCase
$channelMock = new ChannelMock();

$connectionMock = \Mockery::mock(IConnection::class)
->shouldReceive('getChannel')->andReturn($channelMock)->getMock();
->shouldReceive('getChannel')->andReturn($channelMock)->getMock()
->shouldReceive('isPublishConfirm')->andReturnFalse()->getMock();

$queueMock = \Mockery::mock(IQueue::class)
->shouldReceive('getConnection')->andReturn($connectionMock)->getMock()
->shouldReceive('getName')->andReturn($testQueueName)->getMock();
->shouldReceive('getConnection')->andReturn($connectionMock)->getMock()
->shouldReceive('getName')->andReturn($testQueueName)->getMock();

$producer = new Producer(
null,
Expand All @@ -324,11 +325,12 @@ final class ProducerTest extends TestCase
$channelMock = new ChannelMock();

$connectionMock = \Mockery::mock(IConnection::class)
->shouldReceive('getChannel')->andReturn($channelMock)->getMock();
->shouldReceive('getChannel')->andReturn($channelMock)->getMock()
->shouldReceive('isPublishConfirm')->andReturnFalse()->getMock();

$exchangeMock = \Mockery::mock(IExchange::class)
->shouldReceive('getConnection')->andReturn($connectionMock)->getMock()
->shouldReceive('getName')->andReturn($testExchange)->getMock();
->shouldReceive('getConnection')->andReturn($connectionMock)->getMock()
->shouldReceive('getName')->andReturn($testExchange)->getMock();

$producer = new Producer(
$exchangeMock,
Expand All @@ -344,7 +346,7 @@ final class ProducerTest extends TestCase

protected function createLazyDeclarator(): LazyDeclarator
{
return new class extends LazyDeclarator{
return new class extends LazyDeclarator {
public function __construct()
{
$this->queuesDataBag = \Mockery::spy(QueuesDataBag::class);
Expand Down

0 comments on commit 2e31507

Please sign in to comment.