From 8e8bf67938b8a317257c55361ab52f8ed7232dba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radovan=20Kep=C3=A1k?= Date: Tue, 27 Sep 2022 12:20:44 +0200 Subject: [PATCH] Added timeout for publishConfirm --- src/Connection/Client.php | 15 ++++++++++++++- src/Connection/Connection.php | 13 ++++++++++++- src/Connection/Exception/WaitTimeoutException.php | 11 +++++++++++ src/Connection/IConnection.php | 1 + src/DI/Helpers/ConnectionsHelper.php | 5 ++++- src/Producer/Producer.php | 8 +++++++- 6 files changed, 49 insertions(+), 4 deletions(-) create mode 100644 src/Connection/Exception/WaitTimeoutException.php diff --git a/src/Connection/Client.php b/src/Connection/Client.php index 656db37..989624a 100644 --- a/src/Connection/Client.php +++ b/src/Connection/Client.php @@ -10,7 +10,9 @@ use Bunny\Exception\BunnyException; use Bunny\Exception\ClientException; use Bunny\Protocol; +use Contributte\RabbitMQ\Connection\Exception\WaitTimeoutException; use Nette\Utils\Strings; +use function time; /** * @codeCoverageIgnore @@ -172,15 +174,26 @@ public function run($maxSeconds = null): void } while ($this->running); } - public function waitForConfirm(int $channel): Protocol\MethodBasicAckFrame|Protocol\MethodBasicNackFrame + public function waitForConfirm(int $channel, ?int $timeout = null): Protocol\MethodBasicAckFrame|Protocol\MethodBasicNackFrame { $frame = null; // psalm bug + $time = time(); + + $checkTimeout = static function () use ($time, $timeout): void { + if ($time + $timeout < time()) { + throw new WaitTimeoutException('Timeout reached'); + } + }; + while (true) { + $timeout && $checkTimeout(); + /** * @phpstan-ignore-next-line */ while (($frame = $this->getReader()->consumeFrame($this->getReadBuffer())) === null) { $this->feedReadBuffer(); + $timeout && $checkTimeout(); } if ($frame->channel === $channel && ($frame instanceof Protocol\MethodBasicNackFrame || $frame instanceof Protocol\MethodBasicAckFrame)) { diff --git a/src/Connection/Connection.php b/src/Connection/Connection.php index 3f2f4ab..e9fbe6a 100644 --- a/src/Connection/Connection.php +++ b/src/Connection/Connection.php @@ -15,6 +15,7 @@ final class Connection implements IConnection { private const HEARTBEAT_INTERVAL = 1; + private const CONFIRM_INTERVAL = 3; private Client $bunnyClient; @@ -23,6 +24,7 @@ final class Connection implements IConnection */ private array $connectionParams; private float $heartbeat; + private int $publishConfirm; private int $lastBeat = 0; private ?Channel $channel = null; @@ -45,7 +47,7 @@ public function __construct( ?array $ssl = null, ?callable $cycleCallback = null, ?callable $heartbeatCallback = null, - private bool $publishConfirm = false, + bool|int $publishConfirm = false, ) { $this->connectionParams = [ 'host' => $host, @@ -71,6 +73,10 @@ public function __construct( $this->lastBeat = time(); $this->bunnyClient->connect(); } + + $this->publishConfirm = $publishConfirm === true + ? self::CONFIRM_INTERVAL + : (int) $publishConfirm; } @@ -143,6 +149,11 @@ public function connectIfNeeded(): void } public function isPublishConfirm(): bool + { + return $this->publishConfirm > 0; + } + + public function getPublishConfirm(): int { return $this->publishConfirm; } diff --git a/src/Connection/Exception/WaitTimeoutException.php b/src/Connection/Exception/WaitTimeoutException.php new file mode 100644 index 0000000..a96e808 --- /dev/null +++ b/src/Connection/Exception/WaitTimeoutException.php @@ -0,0 +1,11 @@ + Expect::bool(true), 'ssl' => Expect::array(null)->required(false), 'heartbeatCallback' => Expect::array(null)->required(false), - 'publishConfirm' => Expect::bool(false), + 'publishConfirm' => Expect::anyOf( + Expect::bool(), + Expect::int(), + )->default(false), 'admin' => Expect::structure([ 'port' => Expect::int(15672), 'secure' => Expect::bool(false), diff --git a/src/Producer/Producer.php b/src/Producer/Producer.php index ca8524f..b78d70a 100644 --- a/src/Producer/Producer.php +++ b/src/Producer/Producer.php @@ -8,6 +8,7 @@ use Bunny\Protocol\MethodBasicNackFrame; use Contributte\RabbitMQ\Connection\Client; use Contributte\RabbitMQ\Connection\Exception\PublishException; +use Contributte\RabbitMQ\Connection\Exception\WaitTimeoutException; use Contributte\RabbitMQ\Exchange\IExchange; use Contributte\RabbitMQ\LazyDeclarator; use Contributte\RabbitMQ\Queue\IQueue; @@ -86,11 +87,16 @@ private function tryPublish(IQueue|IExchange $target, string $message, array $he return; } - $frame = $client->waitForConfirm($target->getConnection()->getChannel()->getChannelId()); + $frame = $client->waitForConfirm($target->getConnection()->getChannel()->getChannelId(), $target->getConnection()->getPublishConfirm()); if ($frame instanceof MethodBasicNackFrame && $frame->deliveryTag === $deliveryTag) { throw new PublishException("Publish of message failed.\nExchange:{$exchange}\nRoutingKey:{$routingKey}"); } } + } catch (WaitTimeoutException $e) { + throw new PublishException( + "Confirm message timeout.\nExchange:{$exchange}\nRoutingKey:{$routingKey}\n", + previous: $e + ); } catch (ClientException $e) { if ($try >= 2) { throw $e;