Skip to content

Commit

Permalink
[GROW-3599] don't call RedisCluster.initialize on empty ConnectionPool (
Browse files Browse the repository at this point in the history
  • Loading branch information
zach-iee committed Sep 15, 2023
1 parent fa17c77 commit 871ca8f
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 8 deletions.
19 changes: 15 additions & 4 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ConnectionError,
DataError,
MasterDownError,
MaxConnectionsError,
MovedError,
RedisClusterException,
RedisError,
Expand Down Expand Up @@ -386,7 +387,12 @@ class AbstractRedisCluster:
list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
)

ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, ClusterDownError)
ERRORS_ALLOW_RETRY = (
ClusterDownError,
ConnectionError,
MaxConnectionsError,
TimeoutError,
)

def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
"""Replace the default cluster node.
Expand Down Expand Up @@ -1142,7 +1148,7 @@ def _execute_command(self, target_node, *args, **kwargs):
response, **kwargs
)
return response
except AuthenticationError:
except (AuthenticationError, MaxConnectionsError):
raise
except (ConnectionError, TimeoutError) as e:
# Connection retries are being handled in the node's
Expand Down Expand Up @@ -2034,7 +2040,12 @@ def send_cluster_commands(
allow_redirections=allow_redirections,
attempts_count=self.cluster_error_retry_attempts - retry_attempts,
)
except (ClusterDownError, ConnectionError, TimeoutError) as e:
except (
ClusterDownError,
ConnectionError,
MaxConnectionsError,
TimeoutError,
) as e:
if retry_attempts > 0:
# Try again with the new cluster setup. All other errors
# should be raised.
Expand Down Expand Up @@ -2109,7 +2120,7 @@ def _send_cluster_commands(
backoff = self.retry._backoff.compute(attempts_count)
if backoff > 0:
time.sleep(backoff)
if isinstance(e, (ConnectionError, TimeoutError)):
if type(e) in (ConnectionError, TimeoutError):
self.nodes_manager.initialize()
if is_default_node:
self.replace_default_node()
Expand Down
5 changes: 3 additions & 2 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
DataError,
ExecAbortError,
InvalidResponse,
MaxConnectionsError,
ModuleError,
NoPermissionError,
NoScriptError,
Expand Down Expand Up @@ -1471,7 +1472,7 @@ def get_encoder(self):
def make_connection(self):
"Create a new connection"
if self._created_connections >= self.max_connections:
raise ConnectionError("Too many connections")
raise MaxConnectionsError("Too many connections")
self._created_connections += 1
return self.connection_class(**self.connection_kwargs)

Expand Down Expand Up @@ -1631,7 +1632,7 @@ def get_connection(self, command_name, *keys, **options):
except Empty:
# Note that this is not caught by the redis client and will be
# raised unless handled by application code. If you want never to
raise ConnectionError("No connection available.")
raise MaxConnectionsError("No connection available.")

# If the ``connection`` is actually ``None`` then that's a cue to make
# a new connection to add to the pool.
Expand Down
5 changes: 5 additions & 0 deletions redis/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,9 @@ class SlotNotCoveredError(RedisClusterException):


class MaxConnectionsError(ConnectionError):
"""
Indicates that a connection pool ran out of connections and
can't create more due to maximum connection count limitation
"""

...
59 changes: 59 additions & 0 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
from redis.crc import key_slot
from redis.exceptions import (
AskError,
AuthenticationError,
ClusterDownError,
ConnectionError,
DataError,
MaxConnectionsError,
MovedError,
NoPermissionError,
RedisClusterException,
Expand Down Expand Up @@ -821,6 +823,21 @@ def raise_error(target_node, *args, **kwargs):
rc.get("bar")
assert compute.call_count == rc.cluster_error_retry_attempts

@pytest.mark.parametrize("error", [AuthenticationError, MaxConnectionsError])
def test_skip_initialize(self, r, error):
for n in r.nodes_manager.nodes_cache.values():
n.redis_connection.connection_pool.max_connections = 3
for _ in range(0, n.redis_connection.connection_pool.max_connections):
n.redis_connection.connection_pool.get_connection("GET")

with patch.object(NodesManager, "initialize") as i:
with pytest.raises(MaxConnectionsError):
r.get("a")
assert i.call_count == 0

for n in r.nodes_manager.nodes_cache.values():
n.redis_connection.connection_pool.reset()

@pytest.mark.parametrize("reinitialize_steps", [2, 10, 99])
def test_recover_slot_not_covered_error(self, request, reinitialize_steps):
rc = _get_client(RedisCluster, request, reinitialize_steps=reinitialize_steps)
Expand Down Expand Up @@ -3079,7 +3096,49 @@ def test_empty_stack(self, r):
result = p.execute()
assert result == []

@pytest.mark.parametrize("error", [AuthenticationError, MaxConnectionsError])
def test_error_does_not_trigger_initialize(self, r, error):
with patch("redis.cluster.get_connection") as get_connection:

def raise_error(target_node, *args, **kwargs):
get_connection.failed_calls += 1
raise error("mocked error")

get_connection.side_effect = raise_error

r.set_retry(Retry(ConstantBackoff(0.1), 5))
pipeline = r.pipeline()

with patch.object(NodesManager, "initialize") as i:
with pytest.raises(error):
pipeline.get("bar")
pipeline.get("bar")
pipeline.execute()
assert i.call_count == 0

@pytest.mark.parametrize("error", [ConnectionError, TimeoutError])
def test_error_trigger_initialize(self, r, error):
with patch("redis.cluster.get_connection") as get_connection:

def raise_error(target_node, *args, **kwargs):
get_connection.failed_calls += 1
raise error("mocked error")

get_connection.side_effect = raise_error

r.set_retry(Retry(ConstantBackoff(0.1), 5))
pipeline = r.pipeline()

with patch.object(NodesManager, "initialize") as i:
with pytest.raises(error):
pipeline.get("bar")
pipeline.get("bar")
pipeline.execute()
assert i.call_count == r.cluster_error_retry_attempts + 1

@pytest.mark.parametrize(
"error", [ConnectionError, TimeoutError, MaxConnectionsError]
)
def test_additional_backoff_cluster_pipeline(self, r, error):
with patch.object(ConstantBackoff, "compute") as compute:

Expand Down
5 changes: 3 additions & 2 deletions tests/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import redis
from redis.connection import ssl_available, to_bool
from redis.exceptions import MaxConnectionsError

from .conftest import _get_client, skip_if_redis_enterprise, skip_if_server_version_lt
from .test_pubsub import wait_for_message
Expand Down Expand Up @@ -63,7 +64,7 @@ def test_max_connections(self, master_host):
pool = self.get_pool(max_connections=2, connection_kwargs=connection_kwargs)
pool.get_connection("_")
pool.get_connection("_")
with pytest.raises(redis.ConnectionError):
with pytest.raises(MaxConnectionsError):
pool.get_connection("_")

def test_reuse_previously_released_connection(self, master_host):
Expand Down Expand Up @@ -142,7 +143,7 @@ def test_connection_pool_blocks_until_timeout(self, master_host):
pool.get_connection("_")

start = time.time()
with pytest.raises(redis.ConnectionError):
with pytest.raises(MaxConnectionsError):
pool.get_connection("_")
# we should have waited at least 0.1 seconds
assert time.time() - start >= 0.1
Expand Down

0 comments on commit 871ca8f

Please sign in to comment.