From 65631f1214dfa3573ccc4cdff1f6a929649b027d Mon Sep 17 00:00:00 2001 From: "Md. Ashiquzzaman" Date: Fri, 4 Oct 2024 20:06:00 +0600 Subject: [PATCH] [1.x] Re-subscribes to the scaling channel when the underlying connection is lost (#251) * Re-subscribe to the scaling channel when the connection drops * add tests * Update RedisPubSubProvider.php --------- Co-authored-by: Taylor Otwell --- .../Reverb/Publishing/RedisPubSubProvider.php | 6 +++ .../Publishing/RedisPubSubProviderTest.php | 44 +++++++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php diff --git a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php index dde3438d..731cab79 100644 --- a/src/Servers/Reverb/Publishing/RedisPubSubProvider.php +++ b/src/Servers/Reverb/Publishing/RedisPubSubProvider.php @@ -56,6 +56,12 @@ public function subscribe(): void $this->subscribingClient->on('message', function (string $channel, string $payload) { $this->messageHandler->handle($payload); }); + + $this->subscribingClient->on('unsubscribe', function (string $channel) { + if ($this->channel === $channel) { + $this->subscribingClient->subscribe($channel); + } + }); } /** diff --git a/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php b/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php new file mode 100644 index 00000000..2441dd8f --- /dev/null +++ b/tests/Unit/Servers/Reverb/Publishing/RedisPubSubProviderTest.php @@ -0,0 +1,44 @@ +shouldReceive('on') + ->with('unsubscribe', Mockery::on(function ($callback) use ($channel) { + $callback($channel); + return true; + }))->once(); + + $subscribingClient->shouldReceive('on') + ->with('message', Mockery::any()) + ->zeroOrMoreTimes(); + + $subscribingClient->shouldReceive('subscribe') + ->twice() + ->with($channel); + + + $clientFactory = Mockery::mock(RedisClientFactory::class); + + // The first call to make() will return a publishing client + $clientFactory->shouldReceive('make') + ->once() + ->andReturn(Mockery::mock(Client::class)); + + $clientFactory->shouldReceive('make') + ->once() + ->andReturn($subscribingClient); + + $provider = new RedisPubSubProvider($clientFactory, Mockery::mock(PubSubIncomingMessageHandler::class), $channel); + $provider->connect(Mockery::mock(LoopInterface::class)); + + $provider->subscribe(); +});