From 626b7ea359bc14f0c2c682701b46de24b936fca6 Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Sun, 1 Dec 2024 18:14:44 +0000 Subject: [PATCH 01/11] add retries --- .../Reverb/Console/Commands/StartServer.php | 1 - .../Reverb/Publishing/RedisClientFactory.php | 5 +- .../Reverb/Publishing/RedisPubSubProvider.php | 159 ++++++++++++++++-- 3 files changed, 149 insertions(+), 16 deletions(-) diff --git a/src/Servers/Reverb/Console/Commands/StartServer.php b/src/Servers/Reverb/Console/Commands/StartServer.php index d4209444..23046cb2 100644 --- a/src/Servers/Reverb/Console/Commands/StartServer.php +++ b/src/Servers/Reverb/Console/Commands/StartServer.php @@ -81,7 +81,6 @@ protected function ensureHorizontalScalability(LoopInterface $loop): void { if ($this->laravel->make(ServerProviderManager::class)->driver('reverb')->subscribesToEvents()) { $this->laravel->make(PubSubProvider::class)->connect($loop); - $this->laravel->make(PubSubProvider::class)->subscribe(); } } diff --git a/src/Servers/Reverb/Publishing/RedisClientFactory.php b/src/Servers/Reverb/Publishing/RedisClientFactory.php index 7db35f30..655eeffb 100644 --- a/src/Servers/Reverb/Publishing/RedisClientFactory.php +++ b/src/Servers/Reverb/Publishing/RedisClientFactory.php @@ -5,15 +5,16 @@ use Clue\React\Redis\Client; use Clue\React\Redis\Factory; use React\EventLoop\LoopInterface; +use React\Promise\PromiseInterface; class RedisClientFactory { /** * Create a new Redis client. */ - public function make(LoopInterface $loop, string $redisUrl): Client + public function make(LoopInterface $loop, string $redisUrl): PromiseInterface { - return (new Factory($loop))->createLazyClient( + return (new Factory($loop))->createClient( $redisUrl ); } diff --git a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php index 731cab79..d59a9297 100644 --- a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php +++ b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php @@ -2,14 +2,19 @@ namespace Laravel\Reverb\Servers\Reverb\Publishing; +use Clue\React\Redis\Client; +use Exception; use Illuminate\Support\Arr; use Illuminate\Support\ConfigurationUrlParser; use Illuminate\Support\Facades\Config; +use Laravel\Reverb\Loggers\Log; use Laravel\Reverb\Servers\Reverb\Contracts\PubSubIncomingMessageHandler; use Laravel\Reverb\Servers\Reverb\Contracts\PubSubProvider; use React\EventLoop\LoopInterface; +use React\Promise\Deferred; +use React\Promise\Promise; use React\Promise\PromiseInterface; -use RuntimeException; +use Throwable; class RedisPubSubProvider implements PubSubProvider { @@ -17,11 +22,20 @@ class RedisPubSubProvider implements PubSubProvider protected $subscribingClient; + protected $publishingClientReconnectionTimer; + + protected $subscribingClientReconnectionTimer; + + protected $queuedSubscriptionEvents = []; + + protected $queuedPublishEvents = []; + public function __construct( protected RedisClientFactory $clientFactory, protected PubSubIncomingMessageHandler $messageHandler, protected string $channel, - protected array $server = [] + protected array $server = [], + protected int $reconnectionTimeout = 60 ) { // } @@ -31,8 +45,79 @@ public function __construct( */ public function connect(LoopInterface $loop): void { - $this->publishingClient = $this->clientFactory->make($loop, $this->redisUrl()); - $this->subscribingClient = $this->clientFactory->make($loop, $this->redisUrl()); + $this->connectSubcribingClient($loop); + $this->connectPublishingClient($loop); + } + + protected function connectSubcribingClient($loop) + { + $this->clientFactory->make($loop, $this->redisUrl())->then( + function (Client $client) use ($loop) { + $this->subscribingClient = $client; + $this->subscribingClientReconnectionTimer = null; + $this->configureSubscribingClientErrorHandler($this->subscribingClient, $loop); + $this->processQueuedSubscriptionEvents(); + $this->subscribe(); + Log::info('Redis subscriber connected'); + }, + function (Exception $e) use ($loop) { + $this->subscribingClient = null; + Log::error($e->getMessage()); + $this->reconnectSubscribingClient($loop); + } + ); + } + + protected function configureSubscribingClientErrorHandler(Client $client, LoopInterface $loop) + { + $client->on('close', function () use ($loop) { + $this->subscribingClient = null; + Log::info('Redis subscriber disconnected'); + $this->reconnectSubscribingClient($loop); + }); + } + + protected function reconnectSubscribingClient(LoopInterface $loop) + { + $this->subscribingClientReconnectionTimer = $loop->addTimer(1, function () use ($loop) { + Log::info('Attempting to reconnect Redis subscriber'); + $this->connectSubcribingClient($loop); + }); + } + + protected function connectPublishingClient($loop) + { + $this->clientFactory->make($loop, $this->redisUrl())->then( + function (Client $client) use ($loop) { + $this->publishingClient = $client; + $this->publishingClientReconnectionTimer = null; + $this->configurePublishingClientErrorHandler($this->publishingClient, $loop); + $this->processQueuedPublishEvents(); + Log::info('Redis publisher connected'); + }, + function (Exception $e) use ($loop) { + $this->publishingClient = null; + Log::error($e->getMessage()); + $this->reconnectPublishingClient($loop); + } + ); + } + + protected function configurePublishingClientErrorHandler(Client $client, LoopInterface $loop) + { + $client->on('close', function () use ($loop) { + $this->publishingClient = null; + Log::info('Redis publisher disconnected'); + $this->reconnectPublishingClient($loop); + }); + } + + protected function reconnectPublishingClient(LoopInterface $loop) + { + $this->publishingClientReconnectionTimer = $loop->addTimer(1, function () use ($loop) { + Log::info('Attempting to reconnect Redis publisher'); + $this->connectPublishingClient($loop); + }); } /** @@ -49,8 +134,13 @@ public function disconnect(): void */ public function subscribe(): void { - $this->ensureConnected(); + if (! $this->clientIsReady($this->subscribingClient)) { + $this->queueSubscriptionEvent('subscribe'); + + return; + } + Log::info('Subscribing'); $this->subscribingClient->subscribe($this->channel); $this->subscribingClient->on('message', function (string $channel, string $payload) { @@ -69,6 +159,14 @@ public function subscribe(): void */ public function on(string $event, callable $callback): void { + dump($event, $callback); + + if (! $this->clientIsReady($this->subscribingClient)) { + $this->queueSubscriptionEvent('on', [$event => $callback]); + } + + dump($event, $callback); + $this->subscribingClient->on('message', function (string $channel, string $payload) use ($event, $callback) { $payload = json_decode($payload, associative: true, flags: JSON_THROW_ON_ERROR); @@ -79,11 +177,18 @@ public function on(string $event, callable $callback): void } /** - * Publish a payload to the publisher. + * Publish a payload to the publishingClientReconnectionTimer. */ public function publish(array $payload): PromiseInterface { - $this->ensureConnected(); + Log::info('Sending'); + if (! $this->clientIsReady($this->publishingClient)) { + $this->queuePublishEvent($payload); + + return new Promise(fn () => new Exception('It\'s broken')); + } + + Log::info('Publishing'); return $this->publishingClient->publish($this->channel, json_encode($payload)); } @@ -127,13 +232,41 @@ protected function redisUrl(): string return "redis{$protocol}://{$host}:{$port}".($query ? "?{$query}" : ''); } - /** - * Ensure that a connection to Redis has been established. - */ - protected function ensureConnected(): void + protected function queueSubscriptionEvent(): void + { + $this->queuedSubscriptionEvents['subscribe'] = true; + } + + protected function queuePublishEvent(array $payload): void + { + $this->queuedPublishEvents[] = $payload; + } + + protected function clientIsReady(mixed $client): bool + { + return (bool) $client === true && $client instanceof Client; + } + + protected function processQueuedSubscriptionEvents(): void + { + dump($this->queuedSubscriptionEvents); + foreach ($this->queuedSubscriptionEvents as $event => $args) { + match ($event) { + 'subscribe' => $this->subscribe(), + 'on' => $this->on(...$args), + default => null + }; + + } + $this->queuedSubscriptionEvents = []; + } + + protected function processQueuedPublishEvents(): void { - if (! $this->publishingClient) { - throw new RuntimeException('Connection to Redis has not been established.'); + dump($this->queuedPublishEvents); + foreach ($this->queuedPublishEvents as $event) { + $this->publish($event); } + $this->queuedPublishEvents = []; } } From 94f496efd23c499e5219b2e1c4efefa9a4dc18f7 Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Sun, 1 Dec 2024 18:31:27 +0000 Subject: [PATCH 02/11] add connection timeout --- config/reverb.php | 1 + .../Reverb/Publishing/RedisPubSubProvider.php | 23 +++++++++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/config/reverb.php b/config/reverb.php index 3fead53b..13289265 100644 --- a/config/reverb.php +++ b/config/reverb.php @@ -46,6 +46,7 @@ 'username' => env('REDIS_USERNAME'), 'password' => env('REDIS_PASSWORD'), 'database' => env('REDIS_DB', '0'), + 'timeout' => env('REDIS_TIMEOUT', 60), ], ], 'pulse_ingest_interval' => env('REVERB_PULSE_INGEST_INTERVAL', 15), diff --git a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php index d59a9297..cb5f225a 100644 --- a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php +++ b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php @@ -11,10 +11,8 @@ use Laravel\Reverb\Servers\Reverb\Contracts\PubSubIncomingMessageHandler; use Laravel\Reverb\Servers\Reverb\Contracts\PubSubProvider; use React\EventLoop\LoopInterface; -use React\Promise\Deferred; use React\Promise\Promise; use React\Promise\PromiseInterface; -use Throwable; class RedisPubSubProvider implements PubSubProvider { @@ -26,6 +24,10 @@ class RedisPubSubProvider implements PubSubProvider protected $subscribingClientReconnectionTimer; + protected $publishingClientReconnectionAttempts = 0; + + protected $subscribingClientReconnectionAttempts = 0; + protected $queuedSubscriptionEvents = []; protected $queuedPublishEvents = []; @@ -34,8 +36,7 @@ public function __construct( protected RedisClientFactory $clientFactory, protected PubSubIncomingMessageHandler $messageHandler, protected string $channel, - protected array $server = [], - protected int $reconnectionTimeout = 60 + protected array $server = [] ) { // } @@ -80,6 +81,11 @@ protected function configureSubscribingClientErrorHandler(Client $client, LoopIn protected function reconnectSubscribingClient(LoopInterface $loop) { $this->subscribingClientReconnectionTimer = $loop->addTimer(1, function () use ($loop) { + $this->subscribingClientReconnectionAttempts++; + if ($this->reconnectionTimeout() <= $this->subscribingClientReconnectionAttempts) { + Log::error('Taking too long bruh'); + exit; + } Log::info('Attempting to reconnect Redis subscriber'); $this->connectSubcribingClient($loop); }); @@ -115,6 +121,10 @@ protected function configurePublishingClientErrorHandler(Client $client, LoopInt protected function reconnectPublishingClient(LoopInterface $loop) { $this->publishingClientReconnectionTimer = $loop->addTimer(1, function () use ($loop) { + if ($this->reconnectionTimeout() <= $this->publishingClientReconnectionAttempts) { + Log::error('Taking too long bruh'); + exit; + } Log::info('Attempting to reconnect Redis publisher'); $this->connectPublishingClient($loop); }); @@ -269,4 +279,9 @@ protected function processQueuedPublishEvents(): void } $this->queuedPublishEvents = []; } + + protected function reconnectionTimeout() + { + return $this->server['timeout'] ?? 60; + } } From fb4b614fdc90073b4b5fab1775ea2f2b9d92315f Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Sun, 1 Dec 2024 19:40:10 +0000 Subject: [PATCH 03/11] extract connection class --- src/Servers/Reverb/Publishing/RedisClient.php | 256 ++++++++++++++++++ .../Reverb/Publishing/RedisClientFactory.php | 1 - .../Reverb/Publishing/RedisPubSubProvider.php | 230 ++-------------- 3 files changed, 276 insertions(+), 211 deletions(-) create mode 100644 src/Servers/Reverb/Publishing/RedisClient.php diff --git a/src/Servers/Reverb/Publishing/RedisClient.php b/src/Servers/Reverb/Publishing/RedisClient.php new file mode 100644 index 00000000..65046eb2 --- /dev/null +++ b/src/Servers/Reverb/Publishing/RedisClient.php @@ -0,0 +1,256 @@ +clientFactory->make($this->loop, $this->redisUrl())->then( + function (Client $client) { + $this->client = $client; + $this->clientReconnectionTimer = 0; + $this->configureClientErrorHandler(); + $this->onConnect && call_user_func($this->onConnect, $client); + + Log::info("Redis connection to [{$this->name}] successful"); + }, + function (Exception $e) { + $this->client = null; + Log::error($e->getMessage()); + $this->reconnect(); + } + ); + } + + /** + * Attempt to reconnect to the Redis server. + */ + public function reconnect(): void + { + $this->loop->addTimer(1, function () { + $this->clientReconnectionTimer++; + if ($this->clientReconnectionTimer >= $this->reconnectionTimeout()) { + Log::error("Failed to reconnect to Redis connection [{$this->name}] within {$this->reconnectionTimeout()} second limit"); + + exit; + } + Log::info("Attempting to reconnect Redis connection [{$this->name}]"); + $this->connect(); + }); + } + + /** + * Disconnect from the Redis server. + */ + public function disconnnect(): void + { + $this->client?->close(); + } + + /** + * Subscribe to the given Redis channel. + */ + public function subscribe(): void + { + if (! $this->isConnected($this->client)) { + $this->queueSubscriptionEvent(); + + return; + } + + $this->client->subscribe($this->channel); + } + + /** + * Publish an event to the given channel. + */ + public function publish(array $payload): PromiseInterface + { + if (! $this->isConnected($this->client)) { + $this->queuePublishEvent($payload); + + return new Promise(fn () => new RuntimeException); + } + + $this->client->publish($this->channel, $payload); + } + + /** + * Listen for a given event. + */ + public function on(string $event, callable $callback): void + { + if (! $this->isConnected($this->client)) { + $this->queueSubscriptionEvent('on', [$event => $callback]); + } + + $this->client->on($event, $callback); + } + + /** + * Determine if the client is currently connected to the server. + */ + public function isConnected(): bool + { + return (bool) $this->client === true && $this->client instanceof Client; + } + + /** + * Handle a connection failure to the Redis server. + */ + protected function configureClientErrorHandler(): void + { + $this->client->on('close', function () { + $this->client = null; + Log::info("Disconnected fromRedis connection [{$this->name}]"); + $this->reconnect(); + }); + } + + /** + * Queue the given subscription event. + */ + protected function queueSubscriptionEvent(): void + { + $this->queuedSubscriptionEvents['subscribe'] = true; + } + + /** + * Queue the given publish event. + */ + protected function queuePublishEvent(array $payload): void + { + $this->queuedPublishEvents[] = $payload; + } + + /** + * Process the queued subscription events. + */ + protected function processQueuedSubscriptionEvents(): void + { + dump($this->queuedSubscriptionEvents); + foreach ($this->queuedSubscriptionEvents as $event => $args) { + match ($event) { + 'subscribe' => $this->subscribe(), + 'on' => $this->on(...$args), + default => null + }; + + } + $this->queuedSubscriptionEvents = []; + } + + /** + * Process the queued publish events. + */ + protected function processQueuedPublishEvents(): void + { + dump($this->queuedPublishEvents); + foreach ($this->queuedPublishEvents as $event) { + $this->publish($event); + } + $this->queuedPublishEvents = []; + } + + /** + * Get the connection URL for Redis. + */ + protected function redisUrl(): string + { + $config = empty($this->server) ? Config::get('database.redis.default') : $this->server; + + $parsed = (new ConfigurationUrlParser)->parseConfiguration($config); + + $driver = strtolower($parsed['driver'] ?? ''); + + if (in_array($driver, ['tcp', 'tls'])) { + $parsed['scheme'] = $driver; + } + + [$host, $port, $protocol, $query] = [ + $parsed['host'], + $parsed['port'] ?: 6379, + Arr::get($parsed, 'scheme') === 'tls' ? 's' : '', + [], + ]; + + if ($parsed['username'] ?? false) { + $query['username'] = $parsed['username']; + } + + if ($parsed['password'] ?? false) { + $query['password'] = $parsed['password']; + } + + if ($parsed['database'] ?? false) { + $query['db'] = $parsed['database']; + } + + $query = http_build_query($query); + + return "redis{$protocol}://{$host}:{$port}".($query ? "?{$query}" : ''); + } + + /** + * Determine the configured reconnection timeout. + * + * @return void + */ + protected function reconnectionTimeout(): int + { + return (int) ($this->server['timeout'] ?? 60); + } +} diff --git a/src/Servers/Reverb/Publishing/RedisClientFactory.php b/src/Servers/Reverb/Publishing/RedisClientFactory.php index 655eeffb..5efb5505 100644 --- a/src/Servers/Reverb/Publishing/RedisClientFactory.php +++ b/src/Servers/Reverb/Publishing/RedisClientFactory.php @@ -2,7 +2,6 @@ namespace Laravel\Reverb\Servers\Reverb\Publishing; -use Clue\React\Redis\Client; use Clue\React\Redis\Factory; use React\EventLoop\LoopInterface; use React\Promise\PromiseInterface; diff --git a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php index cb5f225a..ae03e4b2 100644 --- a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php +++ b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php @@ -2,16 +2,9 @@ namespace Laravel\Reverb\Servers\Reverb\Publishing; -use Clue\React\Redis\Client; -use Exception; -use Illuminate\Support\Arr; -use Illuminate\Support\ConfigurationUrlParser; -use Illuminate\Support\Facades\Config; -use Laravel\Reverb\Loggers\Log; use Laravel\Reverb\Servers\Reverb\Contracts\PubSubIncomingMessageHandler; use Laravel\Reverb\Servers\Reverb\Contracts\PubSubProvider; use React\EventLoop\LoopInterface; -use React\Promise\Promise; use React\Promise\PromiseInterface; class RedisPubSubProvider implements PubSubProvider @@ -20,18 +13,6 @@ class RedisPubSubProvider implements PubSubProvider protected $subscribingClient; - protected $publishingClientReconnectionTimer; - - protected $subscribingClientReconnectionTimer; - - protected $publishingClientReconnectionAttempts = 0; - - protected $subscribingClientReconnectionAttempts = 0; - - protected $queuedSubscriptionEvents = []; - - protected $queuedPublishEvents = []; - public function __construct( protected RedisClientFactory $clientFactory, protected PubSubIncomingMessageHandler $messageHandler, @@ -46,88 +27,23 @@ public function __construct( */ public function connect(LoopInterface $loop): void { - $this->connectSubcribingClient($loop); - $this->connectPublishingClient($loop); - } - - protected function connectSubcribingClient($loop) - { - $this->clientFactory->make($loop, $this->redisUrl())->then( - function (Client $client) use ($loop) { - $this->subscribingClient = $client; - $this->subscribingClientReconnectionTimer = null; - $this->configureSubscribingClientErrorHandler($this->subscribingClient, $loop); - $this->processQueuedSubscriptionEvents(); - $this->subscribe(); - Log::info('Redis subscriber connected'); - }, - function (Exception $e) use ($loop) { - $this->subscribingClient = null; - Log::error($e->getMessage()); - $this->reconnectSubscribingClient($loop); - } + $this->subscribingClient = new RedisClient( + $loop, + $this->clientFactory, + $this->channel, + 'subscriber', + $this->server, + fn () => $this->subscribe()); + $this->subscribingClient->connect(); + + $this->publishingClient = new RedisClient( + $loop, + $this->clientFactory, + $this->channel, + 'publisher', + $this->server ); - } - - protected function configureSubscribingClientErrorHandler(Client $client, LoopInterface $loop) - { - $client->on('close', function () use ($loop) { - $this->subscribingClient = null; - Log::info('Redis subscriber disconnected'); - $this->reconnectSubscribingClient($loop); - }); - } - - protected function reconnectSubscribingClient(LoopInterface $loop) - { - $this->subscribingClientReconnectionTimer = $loop->addTimer(1, function () use ($loop) { - $this->subscribingClientReconnectionAttempts++; - if ($this->reconnectionTimeout() <= $this->subscribingClientReconnectionAttempts) { - Log::error('Taking too long bruh'); - exit; - } - Log::info('Attempting to reconnect Redis subscriber'); - $this->connectSubcribingClient($loop); - }); - } - - protected function connectPublishingClient($loop) - { - $this->clientFactory->make($loop, $this->redisUrl())->then( - function (Client $client) use ($loop) { - $this->publishingClient = $client; - $this->publishingClientReconnectionTimer = null; - $this->configurePublishingClientErrorHandler($this->publishingClient, $loop); - $this->processQueuedPublishEvents(); - Log::info('Redis publisher connected'); - }, - function (Exception $e) use ($loop) { - $this->publishingClient = null; - Log::error($e->getMessage()); - $this->reconnectPublishingClient($loop); - } - ); - } - - protected function configurePublishingClientErrorHandler(Client $client, LoopInterface $loop) - { - $client->on('close', function () use ($loop) { - $this->publishingClient = null; - Log::info('Redis publisher disconnected'); - $this->reconnectPublishingClient($loop); - }); - } - - protected function reconnectPublishingClient(LoopInterface $loop) - { - $this->publishingClientReconnectionTimer = $loop->addTimer(1, function () use ($loop) { - if ($this->reconnectionTimeout() <= $this->publishingClientReconnectionAttempts) { - Log::error('Taking too long bruh'); - exit; - } - Log::info('Attempting to reconnect Redis publisher'); - $this->connectPublishingClient($loop); - }); + $this->publishingClient->connect(); } /** @@ -135,8 +51,8 @@ protected function reconnectPublishingClient(LoopInterface $loop) */ public function disconnect(): void { - $this->subscribingClient?->close(); - $this->publishingClient?->close(); + $this->subscribingClient?->disconnect(); + $this->publishingClient?->disconnect(); } /** @@ -144,14 +60,7 @@ public function disconnect(): void */ public function subscribe(): void { - if (! $this->clientIsReady($this->subscribingClient)) { - $this->queueSubscriptionEvent('subscribe'); - - return; - } - - Log::info('Subscribing'); - $this->subscribingClient->subscribe($this->channel); + $this->subscribingClient->subscribe(); $this->subscribingClient->on('message', function (string $channel, string $payload) { $this->messageHandler->handle($payload); @@ -169,14 +78,6 @@ public function subscribe(): void */ public function on(string $event, callable $callback): void { - dump($event, $callback); - - if (! $this->clientIsReady($this->subscribingClient)) { - $this->queueSubscriptionEvent('on', [$event => $callback]); - } - - dump($event, $callback); - $this->subscribingClient->on('message', function (string $channel, string $payload) use ($event, $callback) { $payload = json_decode($payload, associative: true, flags: JSON_THROW_ON_ERROR); @@ -191,97 +92,6 @@ public function on(string $event, callable $callback): void */ public function publish(array $payload): PromiseInterface { - Log::info('Sending'); - if (! $this->clientIsReady($this->publishingClient)) { - $this->queuePublishEvent($payload); - - return new Promise(fn () => new Exception('It\'s broken')); - } - - Log::info('Publishing'); - - return $this->publishingClient->publish($this->channel, json_encode($payload)); - } - - /** - * Get the connection URL for Redis. - */ - protected function redisUrl(): string - { - $config = empty($this->server) ? Config::get('database.redis.default') : $this->server; - - $parsed = (new ConfigurationUrlParser)->parseConfiguration($config); - - $driver = strtolower($parsed['driver'] ?? ''); - - if (in_array($driver, ['tcp', 'tls'])) { - $parsed['scheme'] = $driver; - } - - [$host, $port, $protocol, $query] = [ - $parsed['host'], - $parsed['port'] ?: 6379, - Arr::get($parsed, 'scheme') === 'tls' ? 's' : '', - [], - ]; - - if ($parsed['username'] ?? false) { - $query['username'] = $parsed['username']; - } - - if ($parsed['password'] ?? false) { - $query['password'] = $parsed['password']; - } - - if ($parsed['database'] ?? false) { - $query['db'] = $parsed['database']; - } - - $query = http_build_query($query); - - return "redis{$protocol}://{$host}:{$port}".($query ? "?{$query}" : ''); - } - - protected function queueSubscriptionEvent(): void - { - $this->queuedSubscriptionEvents['subscribe'] = true; - } - - protected function queuePublishEvent(array $payload): void - { - $this->queuedPublishEvents[] = $payload; - } - - protected function clientIsReady(mixed $client): bool - { - return (bool) $client === true && $client instanceof Client; - } - - protected function processQueuedSubscriptionEvents(): void - { - dump($this->queuedSubscriptionEvents); - foreach ($this->queuedSubscriptionEvents as $event => $args) { - match ($event) { - 'subscribe' => $this->subscribe(), - 'on' => $this->on(...$args), - default => null - }; - - } - $this->queuedSubscriptionEvents = []; - } - - protected function processQueuedPublishEvents(): void - { - dump($this->queuedPublishEvents); - foreach ($this->queuedPublishEvents as $event) { - $this->publish($event); - } - $this->queuedPublishEvents = []; - } - - protected function reconnectionTimeout() - { - return $this->server['timeout'] ?? 60; + return $this->publishingClient->publish($payload); } } From c4aabec055c07c701146c8e8d0f15d9dfd1bebfc Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Mon, 2 Dec 2024 01:08:42 +0000 Subject: [PATCH 04/11] typo --- src/Servers/Reverb/Publishing/RedisClient.php | 6 ++++-- .../Servers/Reverb/Publishing/RedisPubSubProviderTest.php | 3 +-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Servers/Reverb/Publishing/RedisClient.php b/src/Servers/Reverb/Publishing/RedisClient.php index 65046eb2..fa98f310 100644 --- a/src/Servers/Reverb/Publishing/RedisClient.php +++ b/src/Servers/Reverb/Publishing/RedisClient.php @@ -95,7 +95,7 @@ public function reconnect(): void /** * Disconnect from the Redis server. */ - public function disconnnect(): void + public function disconnect(): void { $this->client?->close(); } @@ -125,7 +125,7 @@ public function publish(array $payload): PromiseInterface return new Promise(fn () => new RuntimeException); } - $this->client->publish($this->channel, $payload); + return $this->client->publish($this->channel, json_encode($payload)); } /** @@ -135,6 +135,8 @@ public function on(string $event, callable $callback): void { if (! $this->isConnected($this->client)) { $this->queueSubscriptionEvent('on', [$event => $callback]); + + return; } $this->client->on($event, $callback); diff --git a/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php b/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php index fffaa5b2..a3e5313a 100644 --- a/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php +++ b/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php @@ -7,7 +7,6 @@ use React\EventLoop\LoopInterface; it('resubscribes to the scaling channel on unsubscribe event', function () { - $channel = 'reverb'; $subscribingClient = Mockery::mock(Client::class); @@ -41,4 +40,4 @@ $provider->connect(Mockery::mock(LoopInterface::class)); $provider->subscribe(); -}); +})->skip(); From 9810c702b2d8104881db8dc9a8d17e8c11b60530 Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Mon, 2 Dec 2024 01:12:15 +0000 Subject: [PATCH 05/11] prevent duplicate subscribe --- tests/ReverbTestCase.php | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/ReverbTestCase.php b/tests/ReverbTestCase.php index 87c2e78a..ff5248f6 100644 --- a/tests/ReverbTestCase.php +++ b/tests/ReverbTestCase.php @@ -77,7 +77,6 @@ public function usingRedis(): void app(ServerProviderManager::class)->withPublishing(); app(PubSubProvider::class)->connect($this->loop); - app(PubSubProvider::class)->subscribe(); } /** From 95dcc77e444bfa0c5c567618c4af38ae26ee550b Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Fri, 6 Dec 2024 16:46:45 -0800 Subject: [PATCH 06/11] prevent reconnecting for controlled disconnects --- src/Servers/Reverb/Publishing/RedisClient.php | 25 +++++++++++++------ .../Reverb/Publishing/RedisPubSubProvider.php | 3 ++- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/Servers/Reverb/Publishing/RedisClient.php b/src/Servers/Reverb/Publishing/RedisClient.php index fa98f310..02b10318 100644 --- a/src/Servers/Reverb/Publishing/RedisClient.php +++ b/src/Servers/Reverb/Publishing/RedisClient.php @@ -27,6 +27,11 @@ class RedisClient */ protected int $clientReconnectionTimer = 0; + /** + * Determine if the client should attempt to reconnect when disconnected from the server. + */ + protected bool $shouldReconnect = true; + /** * Subscription events queued during while disconnected from Redis. */ @@ -63,7 +68,9 @@ function (Client $client) { $this->client = $client; $this->clientReconnectionTimer = 0; $this->configureClientErrorHandler(); - $this->onConnect && call_user_func($this->onConnect, $client); + if ($this->onConnect) { + call_user_func($this->onConnect, $client); + } Log::info("Redis connection to [{$this->name}] successful"); }, @@ -80,6 +87,10 @@ function (Exception $e) { */ public function reconnect(): void { + if (! $this->shouldReconnect) { + return; + } + $this->loop->addTimer(1, function () { $this->clientReconnectionTimer++; if ($this->clientReconnectionTimer >= $this->reconnectionTimeout()) { @@ -97,6 +108,8 @@ public function reconnect(): void */ public function disconnect(): void { + $this->shouldReconnect = false; + $this->client?->close(); } @@ -106,7 +119,7 @@ public function disconnect(): void public function subscribe(): void { if (! $this->isConnected($this->client)) { - $this->queueSubscriptionEvent(); + $this->queueSubscriptionEvent('subscribe', []); return; } @@ -165,9 +178,9 @@ protected function configureClientErrorHandler(): void /** * Queue the given subscription event. */ - protected function queueSubscriptionEvent(): void + protected function queueSubscriptionEvent($event, $payload): void { - $this->queuedSubscriptionEvents['subscribe'] = true; + $this->queuedSubscriptionEvents[$event] = $payload; } /** @@ -183,7 +196,6 @@ protected function queuePublishEvent(array $payload): void */ protected function processQueuedSubscriptionEvents(): void { - dump($this->queuedSubscriptionEvents); foreach ($this->queuedSubscriptionEvents as $event => $args) { match ($event) { 'subscribe' => $this->subscribe(), @@ -200,7 +212,6 @@ protected function processQueuedSubscriptionEvents(): void */ protected function processQueuedPublishEvents(): void { - dump($this->queuedPublishEvents); foreach ($this->queuedPublishEvents as $event) { $this->publish($event); } @@ -248,8 +259,6 @@ protected function redisUrl(): string /** * Determine the configured reconnection timeout. - * - * @return void */ protected function reconnectionTimeout(): int { diff --git a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php index ae03e4b2..e1ff94d0 100644 --- a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php +++ b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php @@ -33,7 +33,8 @@ public function connect(LoopInterface $loop): void $this->channel, 'subscriber', $this->server, - fn () => $this->subscribe()); + fn () => $this->subscribe() + ); $this->subscribingClient->connect(); $this->publishingClient = new RedisClient( From 54e2a0b68918939df7bea10a65da8dd51e5ebade Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Fri, 6 Dec 2024 17:14:25 -0800 Subject: [PATCH 07/11] fix test --- src/Servers/Reverb/Publishing/RedisPubSubProvider.php | 1 + tests/Unit/EventTest.php | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php index e1ff94d0..bccf6252 100644 --- a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php +++ b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php @@ -93,6 +93,7 @@ public function on(string $event, callable $callback): void */ public function publish(array $payload): PromiseInterface { + dd($payload); return $this->publishingClient->publish($payload); } } diff --git a/tests/Unit/EventTest.php b/tests/Unit/EventTest.php index d078c721..7faa109c 100644 --- a/tests/Unit/EventTest.php +++ b/tests/Unit/EventTest.php @@ -11,7 +11,7 @@ app(ServerProviderManager::class)->withPublishing(); $pubSub = Mockery::mock(PubSubProvider::class); $pubSub->shouldReceive('publish')->once() - ->with(['type' => 'message', 'application' => serialize($app), 'payload' => ['channel' => 'test-channel'], 'socket_id' => null]); + ->with(['type' => 'message', 'application' => serialize($app), 'payload' => ['channel' => 'test-channel']]); $this->app->instance(PubSubProvider::class, $pubSub); From ba0a3fd0afe8ce87b47be048f85e279e4f302c48 Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Fri, 6 Dec 2024 17:22:48 -0800 Subject: [PATCH 08/11] remove dump --- src/Servers/Reverb/Publishing/RedisPubSubProvider.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php index bccf6252..e1ff94d0 100644 --- a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php +++ b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php @@ -93,7 +93,6 @@ public function on(string $event, callable $callback): void */ public function publish(array $payload): PromiseInterface { - dd($payload); return $this->publishingClient->publish($payload); } } From 0e833a18fba309b45cb394191b34c174c913f4c2 Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Fri, 6 Dec 2024 17:29:01 -0800 Subject: [PATCH 09/11] stub tests --- .../Reverb/Publishing/RedisPubSubProviderTest.php | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php b/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php index a3e5313a..7cce7b1e 100644 --- a/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php +++ b/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php @@ -41,3 +41,17 @@ $provider->subscribe(); })->skip(); + +it('can successfully reconnect', function () {})->todo(); + +it('can timeout and fail when unable to reconnect', function () {})->todo(); + +it('queues subscription events', function () {})->todo(); + +it('can process queued subscription events', function () {})->todo(); + +it('queues publish events', function () {})->todo(); + +it('can process queued publish events', function () {})->todo(); + +it('does not attempt to reconnect after a controlled disconnection', function () {})->todo(); From 291915a413c8dbf6b6bbce77b01568835a6ca8aa Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Sat, 7 Dec 2024 21:30:50 -0500 Subject: [PATCH 10/11] fix test --- .../Reverb/Publishing/RedisPubSubProvider.php | 18 +++++++++--------- .../Publishing/RedisPubSubProviderTest.php | 13 ++++++++----- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php index e1ff94d0..26642b35 100644 --- a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php +++ b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php @@ -27,24 +27,24 @@ public function __construct( */ public function connect(LoopInterface $loop): void { - $this->subscribingClient = new RedisClient( + $this->publishingClient = new RedisClient( $loop, $this->clientFactory, $this->channel, - 'subscriber', - $this->server, - fn () => $this->subscribe() + 'publisher', + $this->server ); - $this->subscribingClient->connect(); + $this->publishingClient->connect(); - $this->publishingClient = new RedisClient( + $this->subscribingClient = new RedisClient( $loop, $this->clientFactory, $this->channel, - 'publisher', - $this->server + 'subscriber', + $this->server, + fn () => $this->subscribe() ); - $this->publishingClient->connect(); + $this->subscribingClient->connect(); } /** diff --git a/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php b/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php index 7cce7b1e..bc2a769c 100644 --- a/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php +++ b/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php @@ -5,6 +5,7 @@ use Laravel\Reverb\Servers\Reverb\Publishing\RedisClientFactory; use Laravel\Reverb\Servers\Reverb\Publishing\RedisPubSubProvider; use React\EventLoop\LoopInterface; +use React\Promise\Promise; it('resubscribes to the scaling channel on unsubscribe event', function () { $channel = 'reverb'; @@ -21,6 +22,10 @@ ->with('message', Mockery::any()) ->zeroOrMoreTimes(); + $subscribingClient->shouldReceive('on') + ->with('close', Mockery::any()) + ->zeroOrMoreTimes(); + $subscribingClient->shouldReceive('subscribe') ->twice() ->with($channel); @@ -30,17 +35,15 @@ // The first call to make() will return a publishing client $clientFactory->shouldReceive('make') ->once() - ->andReturn(Mockery::mock(Client::class)); + ->andReturn(new Promise(fn (callable $resolve) => $resolve)); $clientFactory->shouldReceive('make') ->once() - ->andReturn($subscribingClient); + ->andReturn(new Promise(fn (callable $resolve) => $resolve($subscribingClient))); $provider = new RedisPubSubProvider($clientFactory, Mockery::mock(PubSubIncomingMessageHandler::class), $channel); $provider->connect(Mockery::mock(LoopInterface::class)); - - $provider->subscribe(); -})->skip(); +}); it('can successfully reconnect', function () {})->todo(); From 762915c336f146939ef99923425153243395703a Mon Sep 17 00:00:00 2001 From: Joe Dixon Date: Fri, 20 Dec 2024 13:03:32 +0000 Subject: [PATCH 11/11] tests --- src/Servers/Reverb/Publishing/RedisClient.php | 6 +- .../Publishing/RedisPubSubProviderTest.php | 78 ++++++++++++++++++- 2 files changed, 78 insertions(+), 6 deletions(-) diff --git a/src/Servers/Reverb/Publishing/RedisClient.php b/src/Servers/Reverb/Publishing/RedisClient.php index 02b10318..f5cd3fb2 100644 --- a/src/Servers/Reverb/Publishing/RedisClient.php +++ b/src/Servers/Reverb/Publishing/RedisClient.php @@ -96,7 +96,7 @@ public function reconnect(): void if ($this->clientReconnectionTimer >= $this->reconnectionTimeout()) { Log::error("Failed to reconnect to Redis connection [{$this->name}] within {$this->reconnectionTimeout()} second limit"); - exit; + throw new Exception("Failed to reconnect to Redis connection [{$this->name}] within {$this->reconnectionTimeout()} second limit"); } Log::info("Attempting to reconnect Redis connection [{$this->name}]"); $this->connect(); @@ -170,7 +170,9 @@ protected function configureClientErrorHandler(): void { $this->client->on('close', function () { $this->client = null; - Log::info("Disconnected fromRedis connection [{$this->name}]"); + + Log::info("Disconnected from Redis connection [{$this->name}]"); + $this->reconnect(); }); } diff --git a/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php b/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php index bc2a769c..ee0416fb 100644 --- a/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php +++ b/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php @@ -4,6 +4,7 @@ use Laravel\Reverb\Servers\Reverb\Contracts\PubSubIncomingMessageHandler; use Laravel\Reverb\Servers\Reverb\Publishing\RedisClientFactory; use Laravel\Reverb\Servers\Reverb\Publishing\RedisPubSubProvider; +use React\EventLoop\Loop; use React\EventLoop\LoopInterface; use React\Promise\Promise; @@ -45,15 +46,84 @@ $provider->connect(Mockery::mock(LoopInterface::class)); }); -it('can successfully reconnect', function () {})->todo(); +it('can successfully reconnect', function () { + $clientFactory = Mockery::mock(RedisClientFactory::class); + $loop = Mockery::mock(LoopInterface::class); + + $loop->shouldReceive('addTimer') + ->once() + ->with(1, Mockery::any()); + + // Publisher client + $clientFactory->shouldReceive('make') + ->once() + ->andReturn(new Promise(fn () => throw new Exception)); + + // Subscriber client + $clientFactory->shouldReceive('make') + ->once() + ->andReturn(new Promise(fn (callable $resolve) => $resolve)); + + $provider = new RedisPubSubProvider($clientFactory, Mockery::mock(PubSubIncomingMessageHandler::class), 'reverb'); + $provider->connect($loop); +}); + +it('can timeout and fail when unable to reconnect', function () { + $clientFactory = Mockery::mock(RedisClientFactory::class); + $loop = Loop::get(); + + // Publisher client + $clientFactory->shouldReceive('make') + ->once() + ->andReturn(new Promise(fn () => throw new Exception)); + + // Subscriber client + $clientFactory->shouldReceive('make') + ->once() + ->andReturn(new Promise(fn (callable $resolve) => $resolve)); -it('can timeout and fail when unable to reconnect', function () {})->todo(); + $provider = new RedisPubSubProvider($clientFactory, Mockery::mock(PubSubIncomingMessageHandler::class), 'reverb', ['host' => 'localhost', 'port' => 6379, 'timeout' => 1]); + $provider->connect($loop); -it('queues subscription events', function () {})->todo(); + $loop->run(); +})->throws(Exception::class, 'Failed to reconnect to Redis connection [publisher] within 1 second limit'); + +it('queues subscription events', function () { + $clientFactory = Mockery::mock(RedisClientFactory::class); + + $clientFactory->shouldReceive('make') + ->twice() + ->andReturn(new Promise(fn (callable $resolve) => $resolve)); + + $provider = new RedisPubSubProvider($clientFactory, Mockery::mock(PubSubIncomingMessageHandler::class), 'reverb'); + $provider->connect(Mockery::mock(LoopInterface::class)); + $provider->subscribe(); + + $subscribingClient = (new ReflectionProperty($provider, 'subscribingClient'))->getValue($provider); + $queuedSubscriptionEvents = (new ReflectionProperty($subscribingClient, 'queuedSubscriptionEvents'))->getValue($subscribingClient); + + expect(array_keys($queuedSubscriptionEvents))->toBe(['subscribe', 'on']); +}); it('can process queued subscription events', function () {})->todo(); -it('queues publish events', function () {})->todo(); +it('queues publish events', function () { + $clientFactory = Mockery::mock(RedisClientFactory::class); + + $clientFactory->shouldReceive('make') + ->twice() + ->andReturn(new Promise(fn (callable $resolve) => $resolve)); + + $provider = new RedisPubSubProvider($clientFactory, Mockery::mock(PubSubIncomingMessageHandler::class), 'reverb'); + $provider->connect(Mockery::mock(LoopInterface::class)); + $provider->publish(['event' => 'first test']); + $provider->publish(['event' => 'second test']); + + $publishingClient = (new ReflectionProperty($provider, 'publishingClient'))->getValue($provider); + $queuedPublishEvents = (new ReflectionProperty($publishingClient, 'queuedPublishEvents'))->getValue($publishingClient); + + expect($queuedPublishEvents)->toBe([['event' => 'first test'], ['event' => 'second test']]); +}); it('can process queued publish events', function () {})->todo();