Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Commit

Permalink
Fix: create federation upstream only when really needed
Browse files Browse the repository at this point in the history
  • Loading branch information
bckp committed May 25, 2022
1 parent 549de72 commit 9e0f8af
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 39 deletions.
157 changes: 120 additions & 37 deletions src/Connection/Api.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Contributte\RabbitMQ\Connection;

use Nette\InvalidStateException;

class Api implements IApi
{
private string $url;
Expand Down Expand Up @@ -32,12 +34,25 @@ public function getFederations(): array
return (array) $response['data'];
}

/**
* @throws \JsonException
* @return array<int, mixed>
*/
public function getPolicies(): array
{
$url = $this->url . '/api/policies';
$response = $this->get($url);

return (array) $response['data'];
}

/**
* @throws \JsonException
* @param array<string, mixed> $policy
*/
public function createFederation(
string $exchange,
string $type,
string $target,
string $vhost,
string $uri,
int $prefetch,
Expand All @@ -47,9 +62,9 @@ public function createFederation(
string $ackMode,
array $policy
): bool {
$uniqueName = $exchange . '-' . substr(md5($uri), -8);
$policyName = $uniqueName . '-policy';
$federationName = $uniqueName . '-federation';
$uniqueName = "{$type}-{$target}-" . substr(md5($uri), -8);
$policyName = "{$uniqueName}-policy";
$federationName = "{$uniqueName}-federation";

$federationParams = [
'value' => (object) [
Expand All @@ -59,57 +74,29 @@ public function createFederation(
'message-ttl' => $messageTTL,
'expires' => $expires,
'ack-mode' => $ackMode,
'exchange' => $exchange,
$type => $target,
],
];
$policyParams = [
'pattern' => $exchange,
'pattern' => $target,
'apply-to' => 'exchanges',
'priority' => $policy['priority'],
'definition' => (object) ($policy['arguments'] + ['federation-upstream' => $federationName]),
];

$this->createFederationUpstream($federationName, $vhost, $federationParams);
$this->createFederationPolicy($policyName, $vhost, $policyParams);
$this->createFederationUpstream($vhost, $federationName, $federationParams);
$this->createPolicy($vhost, $policyName, $policyParams);

return true;
}

/**
* @throws \JsonException
* @param array<string, mixed> $params
*/
private function createFederationUpstream(string $name, string $vhost, array $params): void
{
$response = $this->put(
$this->url . '/api/parameters/federation-upstream/' . urlencode($vhost) . '/' . $name,
$params
);

$this->verifyResponse($response);
}

/**
* @throws \JsonException
* @param array<string, mixed> $params
*/
private function createFederationPolicy(string $name, string $vhost, array $params): void
{
$response = $this->put(
$this->url . '/api/policies/' . urlencode($vhost) . '/' . $name,
$params
);

$this->verifyResponse($response);
}

/**
* @param array<string, mixed> $response
* @return void
*/
private function verifyResponse(array $response): void
{
if ($response['status'] <= 200 || $response['status'] >= 300) {
if ($response['status'] < 200 || $response['status'] >= 300) {
throw new \RuntimeException(
sprintf(
'%s: %s',
Expand Down Expand Up @@ -178,4 +165,100 @@ private function request(string $method, string $url, array $params = []): array
'data' => $response ? json_decode((string) $response, flags: JSON_THROW_ON_ERROR) : '',
];
}

/** Policies */

/**
* @param array<string, mixed> $params
*/
private function existsPolicy(string $vhost, string $name, array $params): bool
{
$policy = $this->getPolicy($vhost, $name);
$params += ['vhost' => $vhost, 'name' => $name];
return $policy == $params; // intentionally == as we do not care of order
}

/**
* @return array<string, mixed>
*/
private function getPolicy(string $vhost, string $name): array
{
try {
$url = $this->url . '/api/policies/' . urlencode($vhost) . '/' . $name;
$response = $this->get($url);

if ($response['status'] !== self::HTTP_OK) {
throw new InvalidStateException('Failed to get policy.');
}
} catch (\JsonException) {
}

return (array) ($response['data'] ?? []);
}


/**
* @throws \JsonException
* @param array<string, mixed> $params
*/
private function createPolicy(string $vhost, string $name, array $params): void
{
if ($this->existsPolicy($vhost, $name, $params)) {
return;
}

$response = $this->put(
$this->url . '/api/policies/' . urlencode($vhost) . '/' . $name,
$params
);

$this->verifyResponse($response);
}

/** Federations */

/**
* @param array<string, mixed> $params
*/
private function existsFederationUpstream(string $vhost, string $name, array $params): bool
{
$federation = $this->getFederationUpstream($vhost, $name);
return isset($federation['value']) && $params['value'] == $federation['value']; // intentionally == as keys may be in different order
}

/**
* @return array<string, mixed>
*/
private function getFederationUpstream(string $vhost, string $name): array
{
try {
$url = $this->url . '/api/parameters/federation-upstream/' . urlencode($vhost) . '/' . $name;
$response = $this->get($url);

if ($response['status'] !== self::HTTP_OK) {
throw new InvalidStateException('Failed to get federation upstream.');
}
} catch (\JsonException) {
}

return (array) ($response['data'] ?? []);
}

/**
* @throws \JsonException
* @param array<string, mixed> $params
*/
private function createFederationUpstream(string $vhost, string $name, array $params): void
{
if ($this->existsFederationUpstream($vhost, $name, $params)) {
return;
}

$response = $this->put(
$this->url . '/api/parameters/federation-upstream/' . urlencode($vhost) . '/' . $name,
$params
);

$this->verifyResponse($response);
}
}
13 changes: 11 additions & 2 deletions src/Connection/IApi.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,21 @@

interface IApi
{
public const HTTP_OK = 200;

/**
* @return array<int, mixed>
*/
public function getFederations(): array;

/**
* @param string $exchange
* @return array<int, mixed>
*/
public function getPolicies(): array;

/**
* @param string $type
* @param string $target
* @param string $vhost
* @param string $uri
* @param int $prefetch
Expand All @@ -24,7 +32,8 @@ public function getFederations(): array;
* @return bool
*/
public function createFederation(
string $exchange,
string $type,
string $target,
string $vhost,
string $uri,
int $prefetch,
Expand Down
3 changes: 3 additions & 0 deletions src/Exchange/ExchangeDeclarator.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

final class ExchangeDeclarator
{
private const FEDERATION_TYPE = 'exchange';

public function __construct(private ConnectionFactory $connectionFactory, private ExchangesDataBag $exchangesDataBag, private QueueFactory $queueFactory)
{
}
Expand Down Expand Up @@ -56,6 +58,7 @@ public function declareExchange(string $name): void
$federation = $exchangeData['federation'];

$api->createFederation(
self::FEDERATION_TYPE,
$name,
$connection->getVhost(),
$federation['uri'],
Expand Down

0 comments on commit 9e0f8af

Please sign in to comment.