diff --git a/config/reverb.php b/config/reverb.php index 3fead53b..13289265 100644 --- a/config/reverb.php +++ b/config/reverb.php @@ -46,6 +46,7 @@ 'username' => env('REDIS_USERNAME'), 'password' => env('REDIS_PASSWORD'), 'database' => env('REDIS_DB', '0'), + 'timeout' => env('REDIS_TIMEOUT', 60), ], ], 'pulse_ingest_interval' => env('REVERB_PULSE_INGEST_INTERVAL', 15), diff --git a/src/Servers/Reverb/Console/Commands/StartServer.php b/src/Servers/Reverb/Console/Commands/StartServer.php index d4209444..23046cb2 100644 --- a/src/Servers/Reverb/Console/Commands/StartServer.php +++ b/src/Servers/Reverb/Console/Commands/StartServer.php @@ -81,7 +81,6 @@ protected function ensureHorizontalScalability(LoopInterface $loop): void { if ($this->laravel->make(ServerProviderManager::class)->driver('reverb')->subscribesToEvents()) { $this->laravel->make(PubSubProvider::class)->connect($loop); - $this->laravel->make(PubSubProvider::class)->subscribe(); } } diff --git a/src/Servers/Reverb/Publishing/RedisClient.php b/src/Servers/Reverb/Publishing/RedisClient.php new file mode 100644 index 00000000..f5cd3fb2 --- /dev/null +++ b/src/Servers/Reverb/Publishing/RedisClient.php @@ -0,0 +1,269 @@ +clientFactory->make($this->loop, $this->redisUrl())->then( + function (Client $client) { + $this->client = $client; + $this->clientReconnectionTimer = 0; + $this->configureClientErrorHandler(); + if ($this->onConnect) { + call_user_func($this->onConnect, $client); + } + + Log::info("Redis connection to [{$this->name}] successful"); + }, + function (Exception $e) { + $this->client = null; + Log::error($e->getMessage()); + $this->reconnect(); + } + ); + } + + /** + * Attempt to reconnect to the Redis server. + */ + public function reconnect(): void + { + if (! $this->shouldReconnect) { + return; + } + + $this->loop->addTimer(1, function () { + $this->clientReconnectionTimer++; + if ($this->clientReconnectionTimer >= $this->reconnectionTimeout()) { + Log::error("Failed to reconnect to Redis connection [{$this->name}] within {$this->reconnectionTimeout()} second limit"); + + throw new Exception("Failed to reconnect to Redis connection [{$this->name}] within {$this->reconnectionTimeout()} second limit"); + } + Log::info("Attempting to reconnect Redis connection [{$this->name}]"); + $this->connect(); + }); + } + + /** + * Disconnect from the Redis server. + */ + public function disconnect(): void + { + $this->shouldReconnect = false; + + $this->client?->close(); + } + + /** + * Subscribe to the given Redis channel. + */ + public function subscribe(): void + { + if (! $this->isConnected($this->client)) { + $this->queueSubscriptionEvent('subscribe', []); + + return; + } + + $this->client->subscribe($this->channel); + } + + /** + * Publish an event to the given channel. + */ + public function publish(array $payload): PromiseInterface + { + if (! $this->isConnected($this->client)) { + $this->queuePublishEvent($payload); + + return new Promise(fn () => new RuntimeException); + } + + return $this->client->publish($this->channel, json_encode($payload)); + } + + /** + * Listen for a given event. + */ + public function on(string $event, callable $callback): void + { + if (! $this->isConnected($this->client)) { + $this->queueSubscriptionEvent('on', [$event => $callback]); + + return; + } + + $this->client->on($event, $callback); + } + + /** + * Determine if the client is currently connected to the server. + */ + public function isConnected(): bool + { + return (bool) $this->client === true && $this->client instanceof Client; + } + + /** + * Handle a connection failure to the Redis server. + */ + protected function configureClientErrorHandler(): void + { + $this->client->on('close', function () { + $this->client = null; + + Log::info("Disconnected from Redis connection [{$this->name}]"); + + $this->reconnect(); + }); + } + + /** + * Queue the given subscription event. + */ + protected function queueSubscriptionEvent($event, $payload): void + { + $this->queuedSubscriptionEvents[$event] = $payload; + } + + /** + * Queue the given publish event. + */ + protected function queuePublishEvent(array $payload): void + { + $this->queuedPublishEvents[] = $payload; + } + + /** + * Process the queued subscription events. + */ + protected function processQueuedSubscriptionEvents(): void + { + foreach ($this->queuedSubscriptionEvents as $event => $args) { + match ($event) { + 'subscribe' => $this->subscribe(), + 'on' => $this->on(...$args), + default => null + }; + + } + $this->queuedSubscriptionEvents = []; + } + + /** + * Process the queued publish events. + */ + protected function processQueuedPublishEvents(): void + { + foreach ($this->queuedPublishEvents as $event) { + $this->publish($event); + } + $this->queuedPublishEvents = []; + } + + /** + * Get the connection URL for Redis. + */ + protected function redisUrl(): string + { + $config = empty($this->server) ? Config::get('database.redis.default') : $this->server; + + $parsed = (new ConfigurationUrlParser)->parseConfiguration($config); + + $driver = strtolower($parsed['driver'] ?? ''); + + if (in_array($driver, ['tcp', 'tls'])) { + $parsed['scheme'] = $driver; + } + + [$host, $port, $protocol, $query] = [ + $parsed['host'], + $parsed['port'] ?: 6379, + Arr::get($parsed, 'scheme') === 'tls' ? 's' : '', + [], + ]; + + if ($parsed['username'] ?? false) { + $query['username'] = $parsed['username']; + } + + if ($parsed['password'] ?? false) { + $query['password'] = $parsed['password']; + } + + if ($parsed['database'] ?? false) { + $query['db'] = $parsed['database']; + } + + $query = http_build_query($query); + + return "redis{$protocol}://{$host}:{$port}".($query ? "?{$query}" : ''); + } + + /** + * Determine the configured reconnection timeout. + */ + protected function reconnectionTimeout(): int + { + return (int) ($this->server['timeout'] ?? 60); + } +} diff --git a/src/Servers/Reverb/Publishing/RedisClientFactory.php b/src/Servers/Reverb/Publishing/RedisClientFactory.php index 7db35f30..5efb5505 100644 --- a/src/Servers/Reverb/Publishing/RedisClientFactory.php +++ b/src/Servers/Reverb/Publishing/RedisClientFactory.php @@ -2,18 +2,18 @@ namespace Laravel\Reverb\Servers\Reverb\Publishing; -use Clue\React\Redis\Client; use Clue\React\Redis\Factory; use React\EventLoop\LoopInterface; +use React\Promise\PromiseInterface; class RedisClientFactory { /** * Create a new Redis client. */ - public function make(LoopInterface $loop, string $redisUrl): Client + public function make(LoopInterface $loop, string $redisUrl): PromiseInterface { - return (new Factory($loop))->createLazyClient( + return (new Factory($loop))->createClient( $redisUrl ); } diff --git a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php index 731cab79..26642b35 100644 --- a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php +++ b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php @@ -2,14 +2,10 @@ namespace Laravel\Reverb\Servers\Reverb\Publishing; -use Illuminate\Support\Arr; -use Illuminate\Support\ConfigurationUrlParser; -use Illuminate\Support\Facades\Config; use Laravel\Reverb\Servers\Reverb\Contracts\PubSubIncomingMessageHandler; use Laravel\Reverb\Servers\Reverb\Contracts\PubSubProvider; use React\EventLoop\LoopInterface; use React\Promise\PromiseInterface; -use RuntimeException; class RedisPubSubProvider implements PubSubProvider { @@ -31,8 +27,24 @@ public function __construct( */ public function connect(LoopInterface $loop): void { - $this->publishingClient = $this->clientFactory->make($loop, $this->redisUrl()); - $this->subscribingClient = $this->clientFactory->make($loop, $this->redisUrl()); + $this->publishingClient = new RedisClient( + $loop, + $this->clientFactory, + $this->channel, + 'publisher', + $this->server + ); + $this->publishingClient->connect(); + + $this->subscribingClient = new RedisClient( + $loop, + $this->clientFactory, + $this->channel, + 'subscriber', + $this->server, + fn () => $this->subscribe() + ); + $this->subscribingClient->connect(); } /** @@ -40,8 +52,8 @@ public function connect(LoopInterface $loop): void */ public function disconnect(): void { - $this->subscribingClient?->close(); - $this->publishingClient?->close(); + $this->subscribingClient?->disconnect(); + $this->publishingClient?->disconnect(); } /** @@ -49,9 +61,7 @@ public function disconnect(): void */ public function subscribe(): void { - $this->ensureConnected(); - - $this->subscribingClient->subscribe($this->channel); + $this->subscribingClient->subscribe(); $this->subscribingClient->on('message', function (string $channel, string $payload) { $this->messageHandler->handle($payload); @@ -79,61 +89,10 @@ public function on(string $event, callable $callback): void } /** - * Publish a payload to the publisher. + * Publish a payload to the publishingClientReconnectionTimer. */ public function publish(array $payload): PromiseInterface { - $this->ensureConnected(); - - return $this->publishingClient->publish($this->channel, json_encode($payload)); - } - - /** - * Get the connection URL for Redis. - */ - protected function redisUrl(): string - { - $config = empty($this->server) ? Config::get('database.redis.default') : $this->server; - - $parsed = (new ConfigurationUrlParser)->parseConfiguration($config); - - $driver = strtolower($parsed['driver'] ?? ''); - - if (in_array($driver, ['tcp', 'tls'])) { - $parsed['scheme'] = $driver; - } - - [$host, $port, $protocol, $query] = [ - $parsed['host'], - $parsed['port'] ?: 6379, - Arr::get($parsed, 'scheme') === 'tls' ? 's' : '', - [], - ]; - - if ($parsed['username'] ?? false) { - $query['username'] = $parsed['username']; - } - - if ($parsed['password'] ?? false) { - $query['password'] = $parsed['password']; - } - - if ($parsed['database'] ?? false) { - $query['db'] = $parsed['database']; - } - - $query = http_build_query($query); - - return "redis{$protocol}://{$host}:{$port}".($query ? "?{$query}" : ''); - } - - /** - * Ensure that a connection to Redis has been established. - */ - protected function ensureConnected(): void - { - if (! $this->publishingClient) { - throw new RuntimeException('Connection to Redis has not been established.'); - } + return $this->publishingClient->publish($payload); } } diff --git a/tests/ReverbTestCase.php b/tests/ReverbTestCase.php index 87c2e78a..ff5248f6 100644 --- a/tests/ReverbTestCase.php +++ b/tests/ReverbTestCase.php @@ -77,7 +77,6 @@ public function usingRedis(): void app(ServerProviderManager::class)->withPublishing(); app(PubSubProvider::class)->connect($this->loop); - app(PubSubProvider::class)->subscribe(); } /** diff --git a/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php b/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php index fffaa5b2..ee0416fb 100644 --- a/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php +++ b/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php @@ -4,10 +4,11 @@ use Laravel\Reverb\Servers\Reverb\Contracts\PubSubIncomingMessageHandler; use Laravel\Reverb\Servers\Reverb\Publishing\RedisClientFactory; use Laravel\Reverb\Servers\Reverb\Publishing\RedisPubSubProvider; +use React\EventLoop\Loop; use React\EventLoop\LoopInterface; +use React\Promise\Promise; it('resubscribes to the scaling channel on unsubscribe event', function () { - $channel = 'reverb'; $subscribingClient = Mockery::mock(Client::class); @@ -22,6 +23,10 @@ ->with('message', Mockery::any()) ->zeroOrMoreTimes(); + $subscribingClient->shouldReceive('on') + ->with('close', Mockery::any()) + ->zeroOrMoreTimes(); + $subscribingClient->shouldReceive('subscribe') ->twice() ->with($channel); @@ -31,14 +36,95 @@ // The first call to make() will return a publishing client $clientFactory->shouldReceive('make') ->once() - ->andReturn(Mockery::mock(Client::class)); + ->andReturn(new Promise(fn (callable $resolve) => $resolve)); $clientFactory->shouldReceive('make') ->once() - ->andReturn($subscribingClient); + ->andReturn(new Promise(fn (callable $resolve) => $resolve($subscribingClient))); $provider = new RedisPubSubProvider($clientFactory, Mockery::mock(PubSubIncomingMessageHandler::class), $channel); $provider->connect(Mockery::mock(LoopInterface::class)); +}); + +it('can successfully reconnect', function () { + $clientFactory = Mockery::mock(RedisClientFactory::class); + $loop = Mockery::mock(LoopInterface::class); + + $loop->shouldReceive('addTimer') + ->once() + ->with(1, Mockery::any()); + + // Publisher client + $clientFactory->shouldReceive('make') + ->once() + ->andReturn(new Promise(fn () => throw new Exception)); + + // Subscriber client + $clientFactory->shouldReceive('make') + ->once() + ->andReturn(new Promise(fn (callable $resolve) => $resolve)); + + $provider = new RedisPubSubProvider($clientFactory, Mockery::mock(PubSubIncomingMessageHandler::class), 'reverb'); + $provider->connect($loop); +}); + +it('can timeout and fail when unable to reconnect', function () { + $clientFactory = Mockery::mock(RedisClientFactory::class); + $loop = Loop::get(); + + // Publisher client + $clientFactory->shouldReceive('make') + ->once() + ->andReturn(new Promise(fn () => throw new Exception)); + + // Subscriber client + $clientFactory->shouldReceive('make') + ->once() + ->andReturn(new Promise(fn (callable $resolve) => $resolve)); + + $provider = new RedisPubSubProvider($clientFactory, Mockery::mock(PubSubIncomingMessageHandler::class), 'reverb', ['host' => 'localhost', 'port' => 6379, 'timeout' => 1]); + $provider->connect($loop); + + $loop->run(); +})->throws(Exception::class, 'Failed to reconnect to Redis connection [publisher] within 1 second limit'); + +it('queues subscription events', function () { + $clientFactory = Mockery::mock(RedisClientFactory::class); + + $clientFactory->shouldReceive('make') + ->twice() + ->andReturn(new Promise(fn (callable $resolve) => $resolve)); + $provider = new RedisPubSubProvider($clientFactory, Mockery::mock(PubSubIncomingMessageHandler::class), 'reverb'); + $provider->connect(Mockery::mock(LoopInterface::class)); $provider->subscribe(); + + $subscribingClient = (new ReflectionProperty($provider, 'subscribingClient'))->getValue($provider); + $queuedSubscriptionEvents = (new ReflectionProperty($subscribingClient, 'queuedSubscriptionEvents'))->getValue($subscribingClient); + + expect(array_keys($queuedSubscriptionEvents))->toBe(['subscribe', 'on']); +}); + +it('can process queued subscription events', function () {})->todo(); + +it('queues publish events', function () { + $clientFactory = Mockery::mock(RedisClientFactory::class); + + $clientFactory->shouldReceive('make') + ->twice() + ->andReturn(new Promise(fn (callable $resolve) => $resolve)); + + $provider = new RedisPubSubProvider($clientFactory, Mockery::mock(PubSubIncomingMessageHandler::class), 'reverb'); + $provider->connect(Mockery::mock(LoopInterface::class)); + $provider->publish(['event' => 'first test']); + $provider->publish(['event' => 'second test']); + + $publishingClient = (new ReflectionProperty($provider, 'publishingClient'))->getValue($provider); + $queuedPublishEvents = (new ReflectionProperty($publishingClient, 'queuedPublishEvents'))->getValue($publishingClient); + + expect($queuedPublishEvents)->toBe([['event' => 'first test'], ['event' => 'second test']]); }); + +it('can process queued publish events', function () {})->todo(); + +it('does not attempt to reconnect after a controlled disconnection', function () {})->todo();