diff --git a/src/Connection/Authenticator.php b/src/Connection/Authenticator.php index fc8b043..f26bc90 100644 --- a/src/Connection/Authenticator.php +++ b/src/Connection/Authenticator.php @@ -18,16 +18,16 @@ public function __construct( ) { } - public function connect(?Cancellation $cancellation = null): RedisChannel + public function connect(?Cancellation $cancellation = null): RedisConnection { - $channel = $this->connector->connect($cancellation); + $connection = $this->connector->connect($cancellation); - $channel->send('AUTH', $this->password); + $connection->send('AUTH', $this->password); - if (!($channel->receive()?->unwrap())) { - throw new RedisException('Failed to authenticate to redis instance: ' . $channel->getName()); + if (!($connection->receive()?->unwrap())) { + throw new RedisException('Failed to authenticate to redis instance: ' . $connection->getName()); } - return $channel; + return $connection; } } diff --git a/src/Connection/DatabaseSelector.php b/src/Connection/DatabaseSelector.php index 9395ae9..1005d6e 100644 --- a/src/Connection/DatabaseSelector.php +++ b/src/Connection/DatabaseSelector.php @@ -18,16 +18,16 @@ public function __construct( ) { } - public function connect(?Cancellation $cancellation = null): RedisChannel + public function connect(?Cancellation $cancellation = null): RedisConnection { - $channel = $this->connector->connect($cancellation); + $connection = $this->connector->connect($cancellation); - $channel->send('SELECT', (string) $this->database); + $connection->send('SELECT', (string) $this->database); - if (!($channel->receive()?->unwrap())) { - throw new RedisException('Failed to select database: ' . $channel->getName()); + if (!($connection->receive()?->unwrap())) { + throw new RedisException('Failed to select database: ' . $connection->getName()); } - return $channel; + return $connection; } } diff --git a/src/Connection/ReconnectingRedisLink.php b/src/Connection/ReconnectingRedisLink.php index 1cc22fd..4a4c8b9 100644 --- a/src/Connection/ReconnectingRedisLink.php +++ b/src/Connection/ReconnectingRedisLink.php @@ -22,7 +22,7 @@ final class ReconnectingRedisLink implements RedisLink private bool $running = false; - private ?RedisChannel $channel = null; + private ?RedisConnection $connection = null; public function __construct(private readonly RedisConnector $connector) { @@ -32,7 +32,7 @@ public function __construct(private readonly RedisConnector $connector) public function __destruct() { $this->running = false; - $this->channel?->close(); + $this->connection?->close(); } public function execute(string $command, array $parameters): RedisResponse @@ -47,7 +47,7 @@ public function execute(string $command, array $parameters): RedisResponse $response = $this->enqueue($command, $parameters)->await(); } finally { if (\strcasecmp($command, 'quit') === 0) { - $this->channel?->close(); + $this->connection?->close(); } } @@ -68,12 +68,12 @@ private function enqueue(string $command, array $parameters): Future $deferred = new DeferredFuture(); $this->queue->push([$deferred, $command, $parameters]); - $this->channel?->reference(); + $this->connection?->reference(); try { - $this->channel?->send($command, ...$parameters); + $this->connection?->send($command, ...$parameters); } catch (RedisException) { - $this->channel = null; + $this->connection = null; } return $deferred->getFuture(); @@ -84,31 +84,31 @@ private function run(): void $connector = $this->connector; $queue = $this->queue; $running = &$this->running; - $channel = &$this->channel; + $connection = &$this->connection; $database = &$this->database; - EventLoop::queue(static function () use (&$channel, &$running, &$database, $queue, $connector): void { + EventLoop::queue(static function () use (&$connection, &$running, &$database, $queue, $connector): void { try { while ($running) { if ($database !== null) { - $channel = (new DatabaseSelector($database, $connector))->connect(); + $connection = (new DatabaseSelector($database, $connector))->connect(); } else { - $channel = $connector->connect(); + $connection = $connector->connect(); } - $channel->unreference(); + $connection->unreference(); try { foreach ($queue as [$deferred, $command, $parameters]) { - $channel->reference(); - $channel->send($command, ...$parameters); + $connection->reference(); + $connection->send($command, ...$parameters); } - while ($response = $channel->receive()) { + while ($response = $connection->receive()) { /** @var DeferredFuture $deferred */ [$deferred] = $queue->shift(); if ($queue->isEmpty()) { - $channel->unreference(); + $connection->unreference(); } $deferred->complete($response); @@ -116,11 +116,11 @@ private function run(): void } catch (RedisException) { // Attempt to reconnect after failure. } finally { - $channel = null; + $connection = null; } } } catch (\Throwable $exception) { - $exception = new RedisChannelException($exception->getMessage(), 0, $exception); + $exception = new RedisConnectionException($exception->getMessage(), 0, $exception); while (!$queue->isEmpty()) { /** @var DeferredFuture $deferred */ diff --git a/src/Connection/RedisChannel.php b/src/Connection/RedisConnection.php similarity index 63% rename from src/Connection/RedisChannel.php rename to src/Connection/RedisConnection.php index 7eb11bb..bc49b54 100644 --- a/src/Connection/RedisChannel.php +++ b/src/Connection/RedisConnection.php @@ -7,18 +7,18 @@ use Amp\Redis\RedisException; /** - * A RedisChannel allows sending and receiving values, but does not contain any reconnect logic or linking responses + * A RedisConnection allows sending and receiving values, but does not contain any reconnect logic or linking responses * to requests. */ -interface RedisChannel extends Closable +interface RedisConnection extends Closable { /** - * @throws RedisException If reading from the channel fails. + * @throws RedisException If reading from the connection fails. */ public function receive(): ?RedisResponse; /** - * @throws RedisException If writing to the channel fails. + * @throws RedisException If writing to the connection fails. */ public function send(string ...$args): void; diff --git a/src/Connection/RedisChannelException.php b/src/Connection/RedisConnectionException.php similarity index 62% rename from src/Connection/RedisChannelException.php rename to src/Connection/RedisConnectionException.php index ecdf91d..9b17aca 100644 --- a/src/Connection/RedisChannelException.php +++ b/src/Connection/RedisConnectionException.php @@ -4,6 +4,6 @@ use Amp\Redis\RedisException; -final class RedisChannelException extends RedisException +final class RedisConnectionException extends RedisException { } diff --git a/src/Connection/RedisConnector.php b/src/Connection/RedisConnector.php index 1b7bb14..0dc4a3a 100644 --- a/src/Connection/RedisConnector.php +++ b/src/Connection/RedisConnector.php @@ -10,5 +10,5 @@ interface RedisConnector /** * @throws RedisException */ - public function connect(?Cancellation $cancellation = null): RedisChannel; + public function connect(?Cancellation $cancellation = null): RedisConnection; } diff --git a/src/Connection/SocketRedisChannel.php b/src/Connection/SocketRedisConnection.php similarity index 92% rename from src/Connection/SocketRedisChannel.php rename to src/Connection/SocketRedisConnection.php index f7d719f..36f0492 100644 --- a/src/Connection/SocketRedisChannel.php +++ b/src/Connection/SocketRedisConnection.php @@ -14,7 +14,7 @@ use Amp\Socket\Socket; use Revolt\EventLoop; -final class SocketRedisChannel implements RedisChannel +final class SocketRedisConnection implements RedisConnection { use ForbidCloning; use ForbidSerialization; @@ -64,7 +64,7 @@ public function receive(): ?RedisResponse public function send(string ...$args): void { if ($this->socket->isClosed()) { - throw new RedisChannelException('Redis connection already closed'); + throw new RedisConnectionException('Redis connection already closed'); } $payload = \implode("\r\n", \array_map(fn (string $arg) => '$' . \strlen($arg) . "\r\n" . $arg, $args)); @@ -73,7 +73,7 @@ public function send(string ...$args): void try { $this->socket->write($payload); } catch (StreamException $e) { - throw new RedisChannelException($e->getMessage(), 0, $e); + throw new RedisConnectionException($e->getMessage(), 0, $e); } } diff --git a/src/Connection/SocketRedisConnector.php b/src/Connection/SocketRedisConnector.php index 927dd24..b86c96d 100644 --- a/src/Connection/SocketRedisConnector.php +++ b/src/Connection/SocketRedisConnector.php @@ -29,21 +29,21 @@ public function __construct( /** * @throws CancelledException * @throws RedisException - * @throws RedisChannelException + * @throws RedisConnectionException */ - public function connect(?Cancellation $cancellation = null): RedisChannel + public function connect(?Cancellation $cancellation = null): RedisConnection { try { $socketConnector = $this->socketConnector ?? Socket\socketConnector(); $socket = $socketConnector->connect($this->uri, $this->connectContext, $cancellation); } catch (Socket\SocketException $e) { - throw new RedisChannelException( + throw new RedisConnectionException( 'Failed to connect to redis instance (' . $this->uri . ')', 0, $e ); } - return new SocketRedisChannel($socket); + return new SocketRedisConnection($socket); } } diff --git a/src/RedisSubscriber.php b/src/RedisSubscriber.php index dec72ed..3af7e73 100644 --- a/src/RedisSubscriber.php +++ b/src/RedisSubscriber.php @@ -6,8 +6,8 @@ use Amp\ForbidSerialization; use Amp\Future; use Amp\Pipeline\Queue; -use Amp\Redis\Connection\RedisChannel; -use Amp\Redis\Connection\RedisChannelException; +use Amp\Redis\Connection\RedisConnection; +use Amp\Redis\Connection\RedisConnectionException; use Amp\Redis\Connection\RedisConnector; use Revolt\EventLoop; use function Amp\async; @@ -17,7 +17,7 @@ final class RedisSubscriber use ForbidCloning; use ForbidSerialization; - private ?RedisChannel $channel = null; + private ?RedisConnection $connection = null; private bool $running = false; @@ -35,7 +35,7 @@ public function __construct( public function __destruct() { $this->running = false; - $this->channel?->close(); + $this->connection?->close(); } public function subscribe(string $channel): RedisSubscription @@ -51,7 +51,7 @@ public function subscribe(string $channel): RedisSubscription if ($subscribe) { try { - $this->channel?->send('subscribe', $channel); + $this->connection?->send('subscribe', $channel); } catch (\Throwable $e) { $this->unloadEmitter($queue, $channel); @@ -75,7 +75,7 @@ public function subscribeToPattern(string $pattern): RedisSubscription if ($subscribe) { try { - $this->channel?->send('psubscribe', $pattern); + $this->connection?->send('psubscribe', $pattern); } catch (\Throwable $e) { $this->unloadPatternEmitter($queue, $pattern); @@ -90,31 +90,31 @@ private function run(): void { $connector = $this->connector; $running = &$this->running; - $channel = &$this->channel; + $connection = &$this->connection; $queues = &$this->queues; $patternQueues = &$this->patternQueues; EventLoop::queue(static function () use ( &$running, - &$channel, + &$connection, &$queues, &$patternQueues, $connector ): void { try { while ($running) { - $channel = $connector->connect(); + $connection = $connector->connect(); try { foreach (\array_keys($queues) as $queue) { - $channel->send('subscribe', $queue); + $connection->send('subscribe', $queue); } foreach (\array_keys($patternQueues) as $pattern) { - $channel->send('psubscribe', $pattern); + $connection->send('psubscribe', $pattern); } - while ($response = $channel->receive()?->unwrap()) { + while ($response = $connection->receive()?->unwrap()) { /** @psalm-suppress RedundantCondition */ \assert( \is_array($response) && \array_is_list($response), @@ -134,11 +134,11 @@ private function run(): void } catch (RedisException) { // Attempt to reconnect after failure. } finally { - $channel = null; + $connection = null; } } } catch (\Throwable $exception) { - $exception = new RedisChannelException($exception->getMessage(), 0, $exception); + $exception = new RedisConnectionException($exception->getMessage(), 0, $exception); $queueGroups = \array_merge($queues, $patternQueues); @@ -178,11 +178,11 @@ private function unloadEmitter(Queue $queue, string $channel): void async(function () use ($channel): void { try { if (empty($this->queues[$channel])) { - $this->channel?->send('unsubscribe', $channel); + $this->connection?->send('unsubscribe', $channel); } if ($this->isIdle()) { - $this->channel?->close(); + $this->connection?->close(); } } catch (RedisException) { // if there's an exception, the unsubscribe is implicitly successful, because the connection broke @@ -207,11 +207,11 @@ private function unloadPatternEmitter(Queue $queue, string $pattern): void async(function () use ($pattern): void { try { if (empty($this->patternQueues[$pattern])) { - $this->channel?->send('punsubscribe', $pattern); + $this->connection?->send('punsubscribe', $pattern); } if ($this->isIdle()) { - $this->channel?->close(); + $this->connection?->close(); } } catch (RedisException) { // if there's an exception, the unsubscribe is implicitly successful, because the connection broke diff --git a/test/AuthenticatorTest.php b/test/AuthenticatorTest.php index d53a00d..d378d88 100644 --- a/test/AuthenticatorTest.php +++ b/test/AuthenticatorTest.php @@ -4,7 +4,7 @@ use Amp\PHPUnit\AsyncTestCase; use Amp\Process\Process; -use Amp\Redis\Connection\RedisChannelException; +use Amp\Redis\Connection\RedisConnectionException; use function Amp\delay; class AuthenticatorTest extends AsyncTestCase @@ -47,7 +47,7 @@ public function testFailure(): void { $redis = createRedisClient(\sprintf(self::URI_FORMAT, self::PORT, 'wrong')); - $this->expectException(RedisChannelException::class); + $this->expectException(RedisConnectionException::class); $this->expectExceptionMessage('invalid'); diff --git a/test/BasicTest.php b/test/BasicTest.php index a8f607f..80d87e2 100644 --- a/test/BasicTest.php +++ b/test/BasicTest.php @@ -19,18 +19,18 @@ public function testRaw(): void $this->setTimeout(5); $this->connector = createRedisConnector($this->getUri()); - $channel = $this->connector->connect(); + $connection = $this->connector->connect(); - $channel->send('PING'); + $connection->send('PING'); - $this->assertSame('PONG', $channel->receive()->unwrap()); + $this->assertSame('PONG', $connection->receive()->unwrap()); - $channel->close(); + $connection->close(); $this->expectException(RedisException::class); $this->expectExceptionMessage('Redis connection already closed'); - $channel->send('PING'); + $connection->send('PING'); } public function testRawCloseReadRemote(): void @@ -40,13 +40,13 @@ public function testRawCloseReadRemote(): void $config = RedisConfig::fromUri($this->getUri()); $this->connector = createRedisConnector($this->getUri()); - $channel = $this->connector->connect(); + $connection = $this->connector->connect(); - $channel->send('QUIT'); + $connection->send('QUIT'); - $this->assertSame('OK', $channel->receive()->unwrap()); + $this->assertSame('OK', $connection->receive()->unwrap()); - $this->assertNull($channel->receive()); + $this->assertNull($connection->receive()); } public function testRawCloseReadLocal(): void @@ -56,15 +56,15 @@ public function testRawCloseReadLocal(): void $config = RedisConfig::fromUri($this->getUri()); $this->connector = createRedisConnector($this->getUri()); - $channel = $this->connector->connect(); + $connection = $this->connector->connect(); - $channel->send('QUIT'); + $connection->send('QUIT'); - $this->assertSame('OK', $channel->receive()->unwrap()); + $this->assertSame('OK', $connection->receive()->unwrap()); - $channel->close(); + $connection->close(); - $this->assertNull($channel->receive()); + $this->assertNull($connection->receive()); } public function testRawCloseWriteRemote(): void @@ -74,17 +74,17 @@ public function testRawCloseWriteRemote(): void $config = RedisConfig::fromUri($this->getUri()); $this->connector = createRedisConnector($this->getUri()); - $channel = $this->connector->connect(); + $connection = $this->connector->connect(); - $channel->send('QUIT'); + $connection->send('QUIT'); - $this->assertSame('OK', $channel->receive()->unwrap()); + $this->assertSame('OK', $connection->receive()->unwrap()); delay(0); $this->expectException(RedisException::class); - $channel->send('PING'); + $connection->send('PING'); } public function testRawCloseWriteLocal(): void @@ -94,20 +94,20 @@ public function testRawCloseWriteLocal(): void $config = RedisConfig::fromUri($this->getUri()); $this->connector = createRedisConnector($this->getUri()); - $channel = $this->connector->connect(); + $connection = $this->connector->connect(); - $channel->send('QUIT'); + $connection->send('QUIT'); - $this->assertSame('OK', $channel->receive()->unwrap()); + $this->assertSame('OK', $connection->receive()->unwrap()); delay(0); - $channel->close(); + $connection->close(); $this->expectException(RedisException::class); $this->expectExceptionMessage('Redis connection already closed'); - $channel->send('PING'); + $connection->send('PING'); } public function testConnect(): void diff --git a/test/DownTest.php b/test/DownTest.php index b6e1c62..7b5feac 100644 --- a/test/DownTest.php +++ b/test/DownTest.php @@ -3,13 +3,13 @@ namespace Amp\Redis; use Amp\PHPUnit\AsyncTestCase; -use Amp\Redis\Connection\RedisChannelException; +use Amp\Redis\Connection\RedisConnectionException; class DownTest extends AsyncTestCase { public function test(): void { - $this->expectException(RedisChannelException::class); + $this->expectException(RedisConnectionException::class); $redis = createRedisClient('tcp://127.0.0.1:25325'); $redis->ping();