diff --git a/src/Connection/Api.php b/src/Connection/Api.php index 6423c2e..6e6a68e 100644 --- a/src/Connection/Api.php +++ b/src/Connection/Api.php @@ -4,6 +4,8 @@ namespace Contributte\RabbitMQ\Connection; +use Nette\InvalidStateException; + class Api implements IApi { private string $url; @@ -32,12 +34,25 @@ public function getFederations(): array return (array) $response['data']; } + /** + * @throws \JsonException + * @return array + */ + public function getPolicies(): array + { + $url = $this->url . '/api/policies'; + $response = $this->get($url); + + return (array) $response['data']; + } + /** * @throws \JsonException * @param array $policy */ public function createFederation( - string $exchange, + string $type, + string $target, string $vhost, string $uri, int $prefetch, @@ -47,9 +62,9 @@ public function createFederation( string $ackMode, array $policy ): bool { - $uniqueName = $exchange . '-' . substr(md5($uri), -8); - $policyName = $uniqueName . '-policy'; - $federationName = $uniqueName . '-federation'; + $uniqueName = "{$type}-{$target}-" . substr(md5($uri), -8); + $policyName = "{$uniqueName}-policy"; + $federationName = "{$uniqueName}-federation"; $federationParams = [ 'value' => (object) [ @@ -59,57 +74,29 @@ public function createFederation( 'message-ttl' => $messageTTL, 'expires' => $expires, 'ack-mode' => $ackMode, - 'exchange' => $exchange, + $type => $target, ], ]; $policyParams = [ - 'pattern' => $exchange, + 'pattern' => $target, 'apply-to' => 'exchanges', 'priority' => $policy['priority'], 'definition' => (object) ($policy['arguments'] + ['federation-upstream' => $federationName]), ]; - $this->createFederationUpstream($federationName, $vhost, $federationParams); - $this->createFederationPolicy($policyName, $vhost, $policyParams); + $this->createFederationUpstream($vhost, $federationName, $federationParams); + $this->createPolicy($vhost, $policyName, $policyParams); return true; } - /** - * @throws \JsonException - * @param array $params - */ - private function createFederationUpstream(string $name, string $vhost, array $params): void - { - $response = $this->put( - $this->url . '/api/parameters/federation-upstream/' . urlencode($vhost) . '/' . $name, - $params - ); - - $this->verifyResponse($response); - } - - /** - * @throws \JsonException - * @param array $params - */ - private function createFederationPolicy(string $name, string $vhost, array $params): void - { - $response = $this->put( - $this->url . '/api/policies/' . urlencode($vhost) . '/' . $name, - $params - ); - - $this->verifyResponse($response); - } - /** * @param array $response * @return void */ private function verifyResponse(array $response): void { - if ($response['status'] <= 200 || $response['status'] >= 300) { + if ($response['status'] < 200 || $response['status'] >= 300) { throw new \RuntimeException( sprintf( '%s: %s', @@ -178,4 +165,100 @@ private function request(string $method, string $url, array $params = []): array 'data' => $response ? json_decode((string) $response, flags: JSON_THROW_ON_ERROR) : '', ]; } + + /** Policies */ + + /** + * @param array $params + */ + private function existsPolicy(string $vhost, string $name, array $params): bool + { + $policy = $this->getPolicy($vhost, $name); + $params += ['vhost' => $vhost, 'name' => $name]; + return $policy == $params; // intentionally == as we do not care of order + } + + /** + * @return array + */ + private function getPolicy(string $vhost, string $name): array + { + try { + $url = $this->url . '/api/policies/' . urlencode($vhost) . '/' . $name; + $response = $this->get($url); + + if ($response['status'] !== self::HTTP_OK) { + throw new InvalidStateException('Failed to get policy.'); + } + } catch (\JsonException) { + } + + return (array) ($response['data'] ?? []); + } + + + /** + * @throws \JsonException + * @param array $params + */ + private function createPolicy(string $vhost, string $name, array $params): void + { + if ($this->existsPolicy($vhost, $name, $params)) { + return; + } + + $response = $this->put( + $this->url . '/api/policies/' . urlencode($vhost) . '/' . $name, + $params + ); + + $this->verifyResponse($response); + } + + /** Federations */ + + /** + * @param array $params + */ + private function existsFederationUpstream(string $vhost, string $name, array $params): bool + { + $federation = $this->getFederationUpstream($vhost, $name); + return isset($federation['value']) && $params['value'] == $federation['value']; // intentionally == as keys may be in different order + } + + /** + * @return array + */ + private function getFederationUpstream(string $vhost, string $name): array + { + try { + $url = $this->url . '/api/parameters/federation-upstream/' . urlencode($vhost) . '/' . $name; + $response = $this->get($url); + + if ($response['status'] !== self::HTTP_OK) { + throw new InvalidStateException('Failed to get federation upstream.'); + } + } catch (\JsonException) { + } + + return (array) ($response['data'] ?? []); + } + + /** + * @throws \JsonException + * @param array $params + */ + private function createFederationUpstream(string $vhost, string $name, array $params): void + { + if ($this->existsFederationUpstream($vhost, $name, $params)) { + return; + } + + $response = $this->put( + $this->url . '/api/parameters/federation-upstream/' . urlencode($vhost) . '/' . $name, + $params + ); + + $this->verifyResponse($response); + } } diff --git a/src/Connection/IApi.php b/src/Connection/IApi.php index ac01181..9005b04 100644 --- a/src/Connection/IApi.php +++ b/src/Connection/IApi.php @@ -6,13 +6,21 @@ interface IApi { + public const HTTP_OK = 200; + /** * @return array */ public function getFederations(): array; /** - * @param string $exchange + * @return array + */ + public function getPolicies(): array; + + /** + * @param string $type + * @param string $target * @param string $vhost * @param string $uri * @param int $prefetch @@ -24,7 +32,8 @@ public function getFederations(): array; * @return bool */ public function createFederation( - string $exchange, + string $type, + string $target, string $vhost, string $uri, int $prefetch, diff --git a/src/Exchange/ExchangeDeclarator.php b/src/Exchange/ExchangeDeclarator.php index 960b148..d23c754 100644 --- a/src/Exchange/ExchangeDeclarator.php +++ b/src/Exchange/ExchangeDeclarator.php @@ -10,6 +10,8 @@ final class ExchangeDeclarator { + private const FEDERATION_TYPE = 'exchange'; + public function __construct(private ConnectionFactory $connectionFactory, private ExchangesDataBag $exchangesDataBag, private QueueFactory $queueFactory) { } @@ -56,6 +58,7 @@ public function declareExchange(string $name): void $federation = $exchangeData['federation']; $api->createFederation( + self::FEDERATION_TYPE, $name, $connection->getVhost(), $federation['uri'],