Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Commit

Permalink
Added timeout for publishConfirm
Browse files Browse the repository at this point in the history
  • Loading branch information
bckp committed Sep 27, 2022
1 parent 48c2e2e commit 8e8bf67
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 4 deletions.
15 changes: 14 additions & 1 deletion src/Connection/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
use Bunny\Exception\BunnyException;
use Bunny\Exception\ClientException;
use Bunny\Protocol;
use Contributte\RabbitMQ\Connection\Exception\WaitTimeoutException;
use Nette\Utils\Strings;
use function time;

/**
* @codeCoverageIgnore
Expand Down Expand Up @@ -172,15 +174,26 @@ public function run($maxSeconds = null): void
} while ($this->running);
}

public function waitForConfirm(int $channel): Protocol\MethodBasicAckFrame|Protocol\MethodBasicNackFrame
public function waitForConfirm(int $channel, ?int $timeout = null): Protocol\MethodBasicAckFrame|Protocol\MethodBasicNackFrame
{
$frame = null; // psalm bug
$time = time();

$checkTimeout = static function () use ($time, $timeout): void {
if ($time + $timeout < time()) {
throw new WaitTimeoutException('Timeout reached');
}
};

while (true) {
$timeout && $checkTimeout();

/**
* @phpstan-ignore-next-line
*/
while (($frame = $this->getReader()->consumeFrame($this->getReadBuffer())) === null) {
$this->feedReadBuffer();
$timeout && $checkTimeout();
}

if ($frame->channel === $channel && ($frame instanceof Protocol\MethodBasicNackFrame || $frame instanceof Protocol\MethodBasicAckFrame)) {
Expand Down
13 changes: 12 additions & 1 deletion src/Connection/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ final class Connection implements IConnection
{

private const HEARTBEAT_INTERVAL = 1;
private const CONFIRM_INTERVAL = 3;

private Client $bunnyClient;

Expand All @@ -23,6 +24,7 @@ final class Connection implements IConnection
*/
private array $connectionParams;
private float $heartbeat;
private int $publishConfirm;
private int $lastBeat = 0;
private ?Channel $channel = null;

Expand All @@ -45,7 +47,7 @@ public function __construct(
?array $ssl = null,
?callable $cycleCallback = null,
?callable $heartbeatCallback = null,
private bool $publishConfirm = false,
bool|int $publishConfirm = false,
) {
$this->connectionParams = [
'host' => $host,
Expand All @@ -71,6 +73,10 @@ public function __construct(
$this->lastBeat = time();
$this->bunnyClient->connect();
}

$this->publishConfirm = $publishConfirm === true
? self::CONFIRM_INTERVAL
: (int) $publishConfirm;
}


Expand Down Expand Up @@ -143,6 +149,11 @@ public function connectIfNeeded(): void
}

public function isPublishConfirm(): bool
{
return $this->publishConfirm > 0;
}

public function getPublishConfirm(): int
{
return $this->publishConfirm;
}
Expand Down
11 changes: 11 additions & 0 deletions src/Connection/Exception/WaitTimeoutException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Contributte\RabbitMQ\Connection\Exception;

use Bunny\Exception\ClientException;

class WaitTimeoutException extends ClientException
{
}
1 change: 1 addition & 0 deletions src/Connection/IConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public function sendHeartbeat(): void;
public function isConnected(): bool;
public function getVhost(): string;
public function isPublishConfirm(): bool;
public function getPublishConfirm(): int;

/** @internal */
public function resetChannel(): void;
Expand Down
5 changes: 4 additions & 1 deletion src/DI/Helpers/ConnectionsHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ public function getConfigSchema(): Schema
'lazy' => Expect::bool(true),
'ssl' => Expect::array(null)->required(false),
'heartbeatCallback' => Expect::array(null)->required(false),
'publishConfirm' => Expect::bool(false),
'publishConfirm' => Expect::anyOf(
Expect::bool(),
Expect::int(),
)->default(false),
'admin' => Expect::structure([
'port' => Expect::int(15672),
'secure' => Expect::bool(false),
Expand Down
8 changes: 7 additions & 1 deletion src/Producer/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Bunny\Protocol\MethodBasicNackFrame;
use Contributte\RabbitMQ\Connection\Client;
use Contributte\RabbitMQ\Connection\Exception\PublishException;
use Contributte\RabbitMQ\Connection\Exception\WaitTimeoutException;
use Contributte\RabbitMQ\Exchange\IExchange;
use Contributte\RabbitMQ\LazyDeclarator;
use Contributte\RabbitMQ\Queue\IQueue;
Expand Down Expand Up @@ -86,11 +87,16 @@ private function tryPublish(IQueue|IExchange $target, string $message, array $he
return;
}

$frame = $client->waitForConfirm($target->getConnection()->getChannel()->getChannelId());
$frame = $client->waitForConfirm($target->getConnection()->getChannel()->getChannelId(), $target->getConnection()->getPublishConfirm());
if ($frame instanceof MethodBasicNackFrame && $frame->deliveryTag === $deliveryTag) {
throw new PublishException("Publish of message failed.\nExchange:{$exchange}\nRoutingKey:{$routingKey}");
}
}
} catch (WaitTimeoutException $e) {
throw new PublishException(
"Confirm message timeout.\nExchange:{$exchange}\nRoutingKey:{$routingKey}\n",
previous: $e
);
} catch (ClientException $e) {
if ($try >= 2) {
throw $e;
Expand Down

0 comments on commit 8e8bf67

Please sign in to comment.