diff --git a/src/Connection/Api.php b/src/Connection/Api.php index 6e6a68e..41fb311 100644 --- a/src/Connection/Api.php +++ b/src/Connection/Api.php @@ -4,7 +4,7 @@ namespace Contributte\RabbitMQ\Connection; -use Nette\InvalidStateException; +use Contributte\RabbitMQ\Exchange\IExchange; class Api implements IApi { @@ -51,34 +51,41 @@ public function getPolicies(): array * @param array $policy */ public function createFederation( - string $type, - string $target, + string $name, string $vhost, string $uri, int $prefetch, int $reconnectDelay, - int $messageTTL, - int $expires, + ?int $messageTTL, + ?int $expires, string $ackMode, array $policy ): bool { - $uniqueName = "{$type}-{$target}-" . substr(md5($uri), -8); + $uniqueName = "{$name}-" . substr(md5($uri), -8); $policyName = "{$uniqueName}-policy"; $federationName = "{$uniqueName}-federation"; + $federationParamsPrototype = [ + 'uri' => $uri, + 'prefetch-count' => $prefetch, + 'reconnect-delay' => $reconnectDelay, + 'ack-mode' => $ackMode, + 'exchange' => $name, + ]; + + if ($messageTTL) { + $federationParamsPrototype['message-ttl'] = $messageTTL; + } + if ($expires) { + $federationParamsPrototype['expires'] = $expires; + } + $federationParams = [ - 'value' => (object) [ - 'uri' => $uri, - 'prefetch-count' => $prefetch, - 'reconnect-delay' => $reconnectDelay, - 'message-ttl' => $messageTTL, - 'expires' => $expires, - 'ack-mode' => $ackMode, - $type => $target, - ], + 'value' => (object) $federationParamsPrototype ]; + $policyParams = [ - 'pattern' => $target, + 'pattern' => $name, 'apply-to' => 'exchanges', 'priority' => $policy['priority'], 'definition' => (object) ($policy['arguments'] + ['federation-upstream' => $federationName]), @@ -187,8 +194,8 @@ private function getPolicy(string $vhost, string $name): array $url = $this->url . '/api/policies/' . urlencode($vhost) . '/' . $name; $response = $this->get($url); - if ($response['status'] !== self::HTTP_OK) { - throw new InvalidStateException('Failed to get policy.'); + if ($response['status'] === self::HTTP_NOT_FOUND && isset($response['data']->error)) { + return []; } } catch (\JsonException) { } @@ -235,8 +242,8 @@ private function getFederationUpstream(string $vhost, string $name): array $url = $this->url . '/api/parameters/federation-upstream/' . urlencode($vhost) . '/' . $name; $response = $this->get($url); - if ($response['status'] !== self::HTTP_OK) { - throw new InvalidStateException('Failed to get federation upstream.'); + if ($response['status'] === self::HTTP_NOT_FOUND && isset($response['data']->error)) { + return []; } } catch (\JsonException) { } diff --git a/src/Connection/IApi.php b/src/Connection/IApi.php index 9005b04..2232220 100644 --- a/src/Connection/IApi.php +++ b/src/Connection/IApi.php @@ -6,7 +6,7 @@ interface IApi { - public const HTTP_OK = 200; + public const HTTP_NOT_FOUND = 404; /** * @return array @@ -19,27 +19,24 @@ public function getFederations(): array; public function getPolicies(): array; /** - * @param string $type - * @param string $target * @param string $vhost * @param string $uri * @param int $prefetch * @param int $reconnectDelay - * @param int $messageTTL - * @param int $expires + * @param ?int $messageTTL + * @param ?int $expires * @param string $ackMode * @param array $policy * @return bool */ public function createFederation( - string $type, - string $target, + string $name, string $vhost, string $uri, int $prefetch, int $reconnectDelay, - int $messageTTL, - int $expires, + ?int $messageTTL, + ?int $expires, string $ackMode, array $policy ): bool; diff --git a/src/DI/Helpers/ExchangesHelper.php b/src/DI/Helpers/ExchangesHelper.php index 7ebccdb..1e6356b 100644 --- a/src/DI/Helpers/ExchangesHelper.php +++ b/src/DI/Helpers/ExchangesHelper.php @@ -7,6 +7,8 @@ use Contributte\RabbitMQ\Exchange\ExchangeDeclarator; use Contributte\RabbitMQ\Exchange\ExchangeFactory; use Contributte\RabbitMQ\Exchange\ExchangesDataBag; +use Contributte\RabbitMQ\Exchange\IExchange; +use Contributte\RabbitMQ\Queue\IQueue; use Nette\DI\ContainerBuilder; use Nette\DI\Definitions\ServiceDefinition; use Nette\Schema\Expect; @@ -41,8 +43,8 @@ public function getConfigSchema(): Schema 'uri' => Expect::string()->required()->dynamic(), 'prefetchCount' => Expect::int(20)->min(1), 'reconnectDelay' => Expect::int(1)->min(1), - 'messageTTL' => Expect::int(3_600_000)->min(1), - 'expires' => Expect::int(3_600_000)->min(1), + 'messageTTL' => Expect::int(), + 'expires' => Expect::int(), 'ackMode' => Expect::anyOf(...self::ACK_TYPES)->default(self::ACK_TYPES[0]), 'policy' => Expect::structure([ 'priority' => Expect::int(0), diff --git a/src/Exchange/ExchangeDeclarator.php b/src/Exchange/ExchangeDeclarator.php index d23c754..88e9696 100644 --- a/src/Exchange/ExchangeDeclarator.php +++ b/src/Exchange/ExchangeDeclarator.php @@ -10,10 +10,11 @@ final class ExchangeDeclarator { - private const FEDERATION_TYPE = 'exchange'; - - public function __construct(private ConnectionFactory $connectionFactory, private ExchangesDataBag $exchangesDataBag, private QueueFactory $queueFactory) - { + public function __construct( + private ConnectionFactory $connectionFactory, + private ExchangesDataBag $exchangesDataBag, + private QueueFactory $queueFactory + ) { } @@ -58,7 +59,6 @@ public function declareExchange(string $name): void $federation = $exchangeData['federation']; $api->createFederation( - self::FEDERATION_TYPE, $name, $connection->getVhost(), $federation['uri'],