Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Commit

Permalink
Feature: fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
bckp committed Jul 21, 2022
1 parent 14de7f1 commit dfb6cff
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 45 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
},
"require": {
"php": "^8.0",
"ext-pcntl": "*",
"bunny/bunny": "^0.5",
"symfony/console": "~3.3 || ^4.0 || ^5.0 || ^6.0",
"nette/di": "^3.0.7",
Expand Down
37 changes: 32 additions & 5 deletions src/Connection/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
use Bunny\ClientStateEnum;
use Bunny\Exception\BunnyException;
use Bunny\Exception\ClientException;
use Bunny\Protocol\AbstractFrame;
use Bunny\Protocol\HeartbeatFrame;
use Bunny\Protocol;
use Nette\Utils\Strings;

/**
Expand Down Expand Up @@ -39,7 +38,7 @@ public function __construct(array $options = [])
*/
public function sendHeartbeat(): void
{
$this->getWriter()->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
$this->getWriter()->appendFrame(new Protocol\HeartbeatFrame(), $this->writeBuffer);
$this->flushWriteBuffer();

$this->options['heartbeat_callback']?->call($this);
Expand Down Expand Up @@ -108,7 +107,7 @@ public function run($maxSeconds = null): void
if (!empty($this->queue)) {
$frame = array_shift($this->queue);
} else {
/** @var AbstractFrame|null $frame */
/** @var Protocol\AbstractFrame|null $frame */
$frame = $this->reader->consumeFrame($this->readBuffer);
if ($frame === null) {
$now = microtime(true);
Expand Down Expand Up @@ -152,7 +151,6 @@ public function run($maxSeconds = null): void
}
}

/** @var AbstractFrame $frame */
if ($frame->channel === 0) {
$this->onFrameReceived($frame);
} else {
Expand All @@ -166,4 +164,33 @@ public function run($maxSeconds = null): void
}
} while ($this->running);
}

public function waitForConfirm(int $channel): Protocol\MethodBasicAckFrame|Protocol\MethodBasicNackFrame
{
$frame = null; // psalm bug
while (true) {
/**
* @phpstan-ignore-next-line
*/
while (($frame = $this->getReader()->consumeFrame($this->getReadBuffer())) === null) {
$this->feedReadBuffer();
}

if ($frame->channel === $channel && ($frame instanceof Protocol\MethodBasicNackFrame || $frame instanceof Protocol\MethodBasicAckFrame)) {
return $frame;
}

if ($frame instanceof Protocol\MethodChannelCloseFrame && $frame->channel === $channel) {
$this->channelCloseOk($channel);
throw new ClientException($frame->replyText, $frame->replyCode);
}

if ($frame instanceof Protocol\MethodConnectionCloseFrame) {
$this->connectionCloseOk();
throw new ClientException($frame->replyText, $frame->replyCode);
}

$this->enqueue($frame);
}
}
}
13 changes: 4 additions & 9 deletions src/Connection/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

use Bunny\Channel;
use Bunny\Exception\ClientException;
use Bunny\Protocol\MethodBasicAckFrame;
use Bunny\Protocol\MethodBasicNackFrame;
use Contributte\RabbitMQ\Connection\Exception\ConnectionException;

final class Connection implements IConnection
Expand All @@ -24,7 +22,6 @@ final class Connection implements IConnection
private float $heartbeat;
private int $lastBeat = 0;
private ?Channel $channel = null;
private ?PublishConfirm $publishConfirm = null;

/**
* @param array<string, mixed> $ssl
Expand All @@ -45,7 +42,7 @@ public function __construct(
?array $ssl = null,
?callable $cycleCallback = null,
?callable $heartbeatCallback = null,
private bool $usePublishConfirm = false,
private bool $publishConfirm = false,
) {
$this->connectionParams = [
'host' => $host,
Expand Down Expand Up @@ -121,10 +118,8 @@ public function getChannel(): Channel
$this->channel = $channel;
}

if ($this->usePublishConfirm) {
$this->channel->confirmSelect(function (MethodBasicAckFrame|MethodBasicNackFrame $frame) {
$this->publishConfirm = new PublishConfirm($frame instanceof MethodBasicAckFrame, $frame->deliveryTag);
});
if ($this->publishConfirm) {
$this->channel->confirmSelect();
}

return $this->channel;
Expand All @@ -142,7 +137,7 @@ public function connectIfNeeded(): void
$this->bunnyClient->connect();
}

public function getPublishConfirm(): ?PublishConfirm
public function isPublishConfirm(): bool
{
return $this->publishConfirm;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Connection/ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private function create(string $name): IConnection
ssl: $connectionData['ssl'],
cycleCallback: fn () => $this->sendHeartbeat(),
heartbeatCallback: $connectionData['heartbeatCallback'] ?? null,
usePublishConfirm: $connectionData['publishConfirm'],
publishConfirm: $connectionData['publishConfirm'],
);
}
}
2 changes: 1 addition & 1 deletion src/Connection/IConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public function getChannel(): Channel;
public function sendHeartbeat(): void;
public function isConnected(): bool;
public function getVhost(): string;
public function getPublishConfirm(): ?PublishConfirm;
public function isPublishConfirm(): bool;

/** @internal */
public function resetChannel(): void;
Expand Down
24 changes: 0 additions & 24 deletions src/Connection/PublishConfirm.php

This file was deleted.

16 changes: 13 additions & 3 deletions src/Producer/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
namespace Contributte\RabbitMQ\Producer;

use Bunny\Exception\ClientException;
use Bunny\Protocol\MethodBasicNackFrame;
use Contributte\RabbitMQ\Connection\Client;
use Contributte\RabbitMQ\Connection\Exception\PublishException;
use Contributte\RabbitMQ\Exchange\IExchange;
use Contributte\RabbitMQ\LazyDeclarator;
Expand Down Expand Up @@ -81,9 +83,17 @@ private function tryPublish(IQueue|IExchange $target, string $message, array $he
$exchange,
$routingKey
);
$confirm = $target->getConnection()->getPublishConfirm();
if ($confirm !== null && $confirm->deliveryTag() === $deliveryTag && !$confirm->isAck()) {
throw new PublishException("Publish of message failed.\nExchange:{$exchange}\nRoutingKey:{$routingKey}");

if ($target->getConnection()->isPublishConfirm()) {
$client = $target->getConnection()->getChannel()->getClient();
if (!$client instanceof Client) {
return;
}

$frame = $client->waitForConfirm($target->getConnection()->getChannel()->getChannelId());
if ($frame instanceof MethodBasicNackFrame && $frame->deliveryTag === $deliveryTag) {
throw new PublishException("Publish of message failed.\nExchange:{$exchange}\nRoutingKey:{$routingKey}");
}
}
} catch (ClientException $e) {
if ($try >= 2) {
Expand Down
4 changes: 2 additions & 2 deletions tests/Cases/ProducerTest.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ final class ProducerTest extends TestCase

$connectionMock = \Mockery::mock(IConnection::class)
->shouldReceive('getChannel')->andReturn($channelMock)->getMock()
->shouldReceive('getPublishConfirm')->andReturnNull()->getMock();
->shouldReceive('isPublishConfirm')->andReturnFalse()->getMock();

$queueMock = \Mockery::mock(IQueue::class)
->shouldReceive('getConnection')->andReturn($connectionMock)->getMock()
Expand All @@ -326,7 +326,7 @@ final class ProducerTest extends TestCase

$connectionMock = \Mockery::mock(IConnection::class)
->shouldReceive('getChannel')->andReturn($channelMock)->getMock()
->shouldReceive('getPublishConfirm')->andReturnNull()->getMock();
->shouldReceive('isPublishConfirm')->andReturnFalse()->getMock();

$exchangeMock = \Mockery::mock(IExchange::class)
->shouldReceive('getConnection')->andReturn($connectionMock)->getMock()
Expand Down

0 comments on commit dfb6cff

Please sign in to comment.