Skip to content

Commit

Permalink
Rename RedisChannel to RedisConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Aug 20, 2023
1 parent b52acc4 commit 5fb48b3
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 87 deletions.
12 changes: 6 additions & 6 deletions src/Connection/Authenticator.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ public function __construct(
) {
}

public function connect(?Cancellation $cancellation = null): RedisChannel
public function connect(?Cancellation $cancellation = null): RedisConnection
{
$channel = $this->connector->connect($cancellation);
$connection = $this->connector->connect($cancellation);

$channel->send('AUTH', $this->password);
$connection->send('AUTH', $this->password);

if (!($channel->receive()?->unwrap())) {
throw new RedisException('Failed to authenticate to redis instance: ' . $channel->getName());
if (!($connection->receive()?->unwrap())) {
throw new RedisException('Failed to authenticate to redis instance: ' . $connection->getName());
}

return $channel;
return $connection;
}
}
12 changes: 6 additions & 6 deletions src/Connection/DatabaseSelector.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ public function __construct(
) {
}

public function connect(?Cancellation $cancellation = null): RedisChannel
public function connect(?Cancellation $cancellation = null): RedisConnection
{
$channel = $this->connector->connect($cancellation);
$connection = $this->connector->connect($cancellation);

$channel->send('SELECT', (string) $this->database);
$connection->send('SELECT', (string) $this->database);

if (!($channel->receive()?->unwrap())) {
throw new RedisException('Failed to select database: ' . $channel->getName());
if (!($connection->receive()?->unwrap())) {
throw new RedisException('Failed to select database: ' . $connection->getName());
}

return $channel;
return $connection;
}
}
34 changes: 17 additions & 17 deletions src/Connection/ReconnectingRedisLink.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ final class ReconnectingRedisLink implements RedisLink

private bool $running = false;

private ?RedisChannel $channel = null;
private ?RedisConnection $connection = null;

public function __construct(private readonly RedisConnector $connector)
{
Expand All @@ -32,7 +32,7 @@ public function __construct(private readonly RedisConnector $connector)
public function __destruct()
{
$this->running = false;
$this->channel?->close();
$this->connection?->close();
}

public function execute(string $command, array $parameters): RedisResponse
Expand All @@ -47,7 +47,7 @@ public function execute(string $command, array $parameters): RedisResponse
$response = $this->enqueue($command, $parameters)->await();
} finally {
if (\strcasecmp($command, 'quit') === 0) {
$this->channel?->close();
$this->connection?->close();
}
}

Expand All @@ -68,12 +68,12 @@ private function enqueue(string $command, array $parameters): Future
$deferred = new DeferredFuture();
$this->queue->push([$deferred, $command, $parameters]);

$this->channel?->reference();
$this->connection?->reference();

try {
$this->channel?->send($command, ...$parameters);
$this->connection?->send($command, ...$parameters);
} catch (RedisException) {
$this->channel = null;
$this->connection = null;
}

return $deferred->getFuture();
Expand All @@ -84,43 +84,43 @@ private function run(): void
$connector = $this->connector;
$queue = $this->queue;
$running = &$this->running;
$channel = &$this->channel;
$connection = &$this->connection;
$database = &$this->database;

EventLoop::queue(static function () use (&$channel, &$running, &$database, $queue, $connector): void {
EventLoop::queue(static function () use (&$connection, &$running, &$database, $queue, $connector): void {
try {
while ($running) {
if ($database !== null) {
$channel = (new DatabaseSelector($database, $connector))->connect();
$connection = (new DatabaseSelector($database, $connector))->connect();
} else {
$channel = $connector->connect();
$connection = $connector->connect();
}

$channel->unreference();
$connection->unreference();

try {
foreach ($queue as [$deferred, $command, $parameters]) {
$channel->reference();
$channel->send($command, ...$parameters);
$connection->reference();
$connection->send($command, ...$parameters);
}

while ($response = $channel->receive()) {
while ($response = $connection->receive()) {
/** @var DeferredFuture $deferred */
[$deferred] = $queue->shift();
if ($queue->isEmpty()) {
$channel->unreference();
$connection->unreference();
}

$deferred->complete($response);
}
} catch (RedisException) {
// Attempt to reconnect after failure.
} finally {
$channel = null;
$connection = null;
}
}
} catch (\Throwable $exception) {
$exception = new RedisChannelException($exception->getMessage(), 0, $exception);
$exception = new RedisConnectionException($exception->getMessage(), 0, $exception);

while (!$queue->isEmpty()) {
/** @var DeferredFuture $deferred */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@
use Amp\Redis\RedisException;

/**
* A RedisChannel allows sending and receiving values, but does not contain any reconnect logic or linking responses
* A RedisConnection allows sending and receiving values, but does not contain any reconnect logic or linking responses
* to requests.
*/
interface RedisChannel extends Closable
interface RedisConnection extends Closable
{
/**
* @throws RedisException If reading from the channel fails.
* @throws RedisException If reading from the connection fails.
*/
public function receive(): ?RedisResponse;

/**
* @throws RedisException If writing to the channel fails.
* @throws RedisException If writing to the connection fails.
*/
public function send(string ...$args): void;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

use Amp\Redis\RedisException;

final class RedisChannelException extends RedisException
final class RedisConnectionException extends RedisException
{
}
2 changes: 1 addition & 1 deletion src/Connection/RedisConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ interface RedisConnector
/**
* @throws RedisException
*/
public function connect(?Cancellation $cancellation = null): RedisChannel;
public function connect(?Cancellation $cancellation = null): RedisConnection;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use Amp\Socket\Socket;
use Revolt\EventLoop;

final class SocketRedisChannel implements RedisChannel
final class SocketRedisConnection implements RedisConnection
{
use ForbidCloning;
use ForbidSerialization;
Expand Down Expand Up @@ -64,7 +64,7 @@ public function receive(): ?RedisResponse
public function send(string ...$args): void
{
if ($this->socket->isClosed()) {
throw new RedisChannelException('Redis connection already closed');
throw new RedisConnectionException('Redis connection already closed');
}

$payload = \implode("\r\n", \array_map(fn (string $arg) => '$' . \strlen($arg) . "\r\n" . $arg, $args));
Expand All @@ -73,7 +73,7 @@ public function send(string ...$args): void
try {
$this->socket->write($payload);
} catch (StreamException $e) {
throw new RedisChannelException($e->getMessage(), 0, $e);
throw new RedisConnectionException($e->getMessage(), 0, $e);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/Connection/SocketRedisConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ public function __construct(
/**
* @throws CancelledException
* @throws RedisException
* @throws RedisChannelException
* @throws RedisConnectionException
*/
public function connect(?Cancellation $cancellation = null): RedisChannel
public function connect(?Cancellation $cancellation = null): RedisConnection
{
try {
$socketConnector = $this->socketConnector ?? Socket\socketConnector();
$socket = $socketConnector->connect($this->uri, $this->connectContext, $cancellation);
} catch (Socket\SocketException $e) {
throw new RedisChannelException(
throw new RedisConnectionException(
'Failed to connect to redis instance (' . $this->uri . ')',
0,
$e
);
}

return new SocketRedisChannel($socket);
return new SocketRedisConnection($socket);
}
}
36 changes: 18 additions & 18 deletions src/RedisSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
use Amp\ForbidSerialization;
use Amp\Future;
use Amp\Pipeline\Queue;
use Amp\Redis\Connection\RedisChannel;
use Amp\Redis\Connection\RedisChannelException;
use Amp\Redis\Connection\RedisConnection;
use Amp\Redis\Connection\RedisConnectionException;
use Amp\Redis\Connection\RedisConnector;
use Revolt\EventLoop;
use function Amp\async;
Expand All @@ -17,7 +17,7 @@ final class RedisSubscriber
use ForbidCloning;
use ForbidSerialization;

private ?RedisChannel $channel = null;
private ?RedisConnection $connection = null;

private bool $running = false;

Expand All @@ -35,7 +35,7 @@ public function __construct(
public function __destruct()
{
$this->running = false;
$this->channel?->close();
$this->connection?->close();
}

public function subscribe(string $channel): RedisSubscription
Expand All @@ -51,7 +51,7 @@ public function subscribe(string $channel): RedisSubscription

if ($subscribe) {
try {
$this->channel?->send('subscribe', $channel);
$this->connection?->send('subscribe', $channel);
} catch (\Throwable $e) {
$this->unloadEmitter($queue, $channel);

Expand All @@ -75,7 +75,7 @@ public function subscribeToPattern(string $pattern): RedisSubscription

if ($subscribe) {
try {
$this->channel?->send('psubscribe', $pattern);
$this->connection?->send('psubscribe', $pattern);
} catch (\Throwable $e) {
$this->unloadPatternEmitter($queue, $pattern);

Expand All @@ -90,31 +90,31 @@ private function run(): void
{
$connector = $this->connector;
$running = &$this->running;
$channel = &$this->channel;
$connection = &$this->connection;
$queues = &$this->queues;
$patternQueues = &$this->patternQueues;

EventLoop::queue(static function () use (
&$running,
&$channel,
&$connection,
&$queues,
&$patternQueues,
$connector
): void {
try {
while ($running) {
$channel = $connector->connect();
$connection = $connector->connect();

try {
foreach (\array_keys($queues) as $queue) {
$channel->send('subscribe', $queue);
$connection->send('subscribe', $queue);
}

foreach (\array_keys($patternQueues) as $pattern) {
$channel->send('psubscribe', $pattern);
$connection->send('psubscribe', $pattern);
}

while ($response = $channel->receive()?->unwrap()) {
while ($response = $connection->receive()?->unwrap()) {
/** @psalm-suppress RedundantCondition */
\assert(
\is_array($response) && \array_is_list($response),
Expand All @@ -134,11 +134,11 @@ private function run(): void
} catch (RedisException) {
// Attempt to reconnect after failure.
} finally {
$channel = null;
$connection = null;
}
}
} catch (\Throwable $exception) {
$exception = new RedisChannelException($exception->getMessage(), 0, $exception);
$exception = new RedisConnectionException($exception->getMessage(), 0, $exception);

$queueGroups = \array_merge($queues, $patternQueues);

Expand Down Expand Up @@ -178,11 +178,11 @@ private function unloadEmitter(Queue $queue, string $channel): void
async(function () use ($channel): void {
try {
if (empty($this->queues[$channel])) {
$this->channel?->send('unsubscribe', $channel);
$this->connection?->send('unsubscribe', $channel);
}

if ($this->isIdle()) {
$this->channel?->close();
$this->connection?->close();
}
} catch (RedisException) {
// if there's an exception, the unsubscribe is implicitly successful, because the connection broke
Expand All @@ -207,11 +207,11 @@ private function unloadPatternEmitter(Queue $queue, string $pattern): void
async(function () use ($pattern): void {
try {
if (empty($this->patternQueues[$pattern])) {
$this->channel?->send('punsubscribe', $pattern);
$this->connection?->send('punsubscribe', $pattern);
}

if ($this->isIdle()) {
$this->channel?->close();
$this->connection?->close();
}
} catch (RedisException) {
// if there's an exception, the unsubscribe is implicitly successful, because the connection broke
Expand Down
4 changes: 2 additions & 2 deletions test/AuthenticatorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use Amp\PHPUnit\AsyncTestCase;
use Amp\Process\Process;
use Amp\Redis\Connection\RedisChannelException;
use Amp\Redis\Connection\RedisConnectionException;
use function Amp\delay;

class AuthenticatorTest extends AsyncTestCase
Expand Down Expand Up @@ -47,7 +47,7 @@ public function testFailure(): void
{
$redis = createRedisClient(\sprintf(self::URI_FORMAT, self::PORT, 'wrong'));

$this->expectException(RedisChannelException::class);
$this->expectException(RedisConnectionException::class);

$this->expectExceptionMessage('invalid');

Expand Down
Loading

0 comments on commit 5fb48b3

Please sign in to comment.