From 0729f201b841a9c9229fcb6abc0e425336a52d08 Mon Sep 17 00:00:00 2001 From: Alex Petenchea Date: Mon, 18 Sep 2023 19:23:54 +0300 Subject: [PATCH] [DE-666] Cluster Rebalance (#287) * Adding cluster-rebalance methods * Adding cluster-rebalance tests * Updating cluster docs with rebalance example * Stating ArangoDB version in the docs. --- arango/cluster.py | 177 +++++++++++++++++++++++++++++++++++++++++- arango/exceptions.py | 4 + docs/cluster.rst | 3 + tests/test_cluster.py | 54 +++++++++++++ 4 files changed, 237 insertions(+), 1 deletion(-) diff --git a/arango/cluster.py b/arango/cluster.py index 04affe3c..a272f50c 100644 --- a/arango/cluster.py +++ b/arango/cluster.py @@ -1,12 +1,13 @@ __all__ = ["Cluster"] -from typing import List +from typing import List, Optional from arango.api import ApiGroup from arango.exceptions import ( ClusterEndpointsError, ClusterHealthError, ClusterMaintenanceModeError, + ClusterRebalanceError, ClusterServerCountError, ClusterServerEngineError, ClusterServerIDError, @@ -195,3 +196,177 @@ def response_handler(resp: Response) -> List[str]: return [item["endpoint"] for item in resp.body["endpoints"]] return self._execute(request, response_handler) + + def calculate_imbalance(self) -> Result[Json]: + """Compute the current cluster imbalance, including + the amount of ongoing and pending move shard operations. + + :return: Cluster imbalance information. + :rtype: dict + :raise: arango.exceptions.ClusterRebalanceError: If retrieval fails. + """ + request = Request(method="get", endpoint="/_admin/cluster/rebalance") + + def response_handler(resp: Response) -> Json: + if not resp.is_success: + raise ClusterRebalanceError(resp, request) + result: Json = resp.body["result"] + return result + + return self._execute(request, response_handler) + + def rebalance( + self, + version: int = 1, + max_moves: Optional[int] = None, + leader_changes: Optional[bool] = None, + move_leaders: Optional[bool] = None, + move_followers: Optional[bool] = None, + pi_factor: Optional[float] = None, + exclude_system_collections: Optional[bool] = None, + databases_excluded: Optional[List[str]] = None, + ) -> Result[Json]: + """Compute and execute a cluster rebalance plan. + + :param version: Must be set to 1. + :type version: int + :param max_moves: Maximum number of moves to be computed. + :type max_moves: int | None + :param leader_changes: Allow leader changes without moving data. + :type leader_changes: bool | None + :param move_leaders: Allow moving shard leaders. + :type move_leaders: bool | None + :param move_followers: Allow moving shard followers. + :type move_followers: bool | None + :param pi_factor: A weighting factor that should remain untouched. + :type pi_factor: float | None + :param exclude_system_collections: Ignore system collections in the + rebalance plan. + :type exclude_system_collections: bool | None + :param databases_excluded: List of database names to be excluded + from the analysis. + :type databases_excluded: [str] | None + :return: Cluster rebalance plan that has been executed. + :rtype: dict + :raise: arango.exceptions.ClusterRebalanceError: If retrieval fails. + """ + data: Json = dict(version=version) + if max_moves is not None: + data["maximumNumberOfMoves"] = max_moves + if leader_changes is not None: + data["leaderChanges"] = leader_changes + if move_leaders is not None: + data["moveLeaders"] = move_leaders + if move_followers is not None: + data["moveFollowers"] = move_followers + if pi_factor is not None: + data["piFactor"] = pi_factor + if exclude_system_collections is not None: + data["excludeSystemCollections"] = exclude_system_collections + if databases_excluded is not None: + data["databasesExcluded"] = databases_excluded + + request = Request(method="put", endpoint="/_admin/cluster/rebalance", data=data) + + def response_handler(resp: Response) -> Json: + if not resp.is_success: + raise ClusterRebalanceError(resp, request) + result: Json = resp.body["result"] + return result + + return self._execute(request, response_handler) + + def calculate_rebalance_plan( + self, + version: int = 1, + max_moves: Optional[int] = None, + leader_changes: Optional[bool] = None, + move_leaders: Optional[bool] = None, + move_followers: Optional[bool] = None, + pi_factor: Optional[float] = None, + exclude_system_collections: Optional[bool] = None, + databases_excluded: Optional[List[str]] = None, + ) -> Result[Json]: + """Compute the cluster rebalance plan. + + :param version: Must be set to 1. + :type version: int + :param max_moves: Maximum number of moves to be computed. + :type max_moves: int | None + :param leader_changes: Allow leader changes without moving data. + :type leader_changes: bool | None + :param move_leaders: Allow moving shard leaders. + :type move_leaders: bool | None + :param move_followers: Allow moving shard followers. + :type move_followers: bool | None + :param pi_factor: A weighting factor that should remain untouched. + :type pi_factor: float | None + :param exclude_system_collections: Ignore system collections in the + rebalance plan. + :type exclude_system_collections: bool | None + :param databases_excluded: List of database names to be excluded + from the analysis. + :type databases_excluded: [str] | None + :return: Cluster rebalance plan. + :rtype: dict + :raise: arango.exceptions.ClusterRebalanceError: If retrieval fails. + """ + data: Json = dict(version=version) + if max_moves is not None: + data["maximumNumberOfMoves"] = max_moves + if leader_changes is not None: + data["leaderChanges"] = leader_changes + if move_leaders is not None: + data["moveLeaders"] = move_leaders + if move_followers is not None: + data["moveFollowers"] = move_followers + if pi_factor is not None: + data["piFactor"] = pi_factor + if exclude_system_collections is not None: + data["excludeSystemCollections"] = exclude_system_collections + if databases_excluded is not None: + data["databasesExcluded"] = databases_excluded + + request = Request( + method="post", endpoint="/_admin/cluster/rebalance", data=data + ) + + def response_handler(resp: Response) -> Json: + if not resp.is_success: + raise ClusterRebalanceError(resp, request) + result: Json = resp.body["result"] + return result + + return self._execute(request, response_handler) + + def execute_rebalance_plan( + self, moves: List[Json], version: int = 1 + ) -> Result[bool]: + """Execute the given set of move shard operations. + + You can use :meth:`Cluster.calculate_rebalance_plan` to calculate + these operations to improve the balance of shards, leader shards, + and follower shards. + + :param moves: List of move shard operations. + :type moves: [dict] + :param version: Must be set to 1. + :type version: int + :return: True if the methods have been accepted and scheduled + for execution. + :rtype: bool + :raise: arango.exceptions.ClusterRebalanceError: If request fails. + """ + data: Json = dict(version=version, moves=moves) + + request = Request( + method="post", endpoint="/_admin/cluster/rebalance/execute", data=data + ) + + def response_handler(resp: Response) -> bool: + if not resp.is_success: + raise ClusterRebalanceError(resp, request) + result: bool = resp.body["code"] == 202 + return result + + return self._execute(request, response_handler) diff --git a/arango/exceptions.py b/arango/exceptions.py index 2db3a3ee..fbdc2fb7 100644 --- a/arango/exceptions.py +++ b/arango/exceptions.py @@ -991,6 +991,10 @@ class ClusterServerCountError(ArangoServerError): """Failed to retrieve cluster server count.""" +class ClusterRebalanceError(ArangoServerError): + """Failed to execute cluster re-balancing operation (load/set).""" + + ################## # JWT Exceptions # ################## diff --git a/docs/cluster.rst b/docs/cluster.rst index 28f80ed9..fbb3bb5e 100644 --- a/docs/cluster.rst +++ b/docs/cluster.rst @@ -91,4 +91,7 @@ Below is an example on how to manage clusters using python-arango. cluster.toggle_maintenance_mode('on') cluster.toggle_maintenance_mode('off') + # Rebalance the distribution of shards. Available with ArangoDB 3.10+. + cluster.rebalance() + See :ref:`ArangoClient` and :ref:`Cluster` for API specification. diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 3534895a..5702f877 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -1,10 +1,12 @@ import pytest +from packaging import version from arango.errno import DATABASE_NOT_FOUND, FORBIDDEN from arango.exceptions import ( ClusterEndpointsError, ClusterHealthError, ClusterMaintenanceModeError, + ClusterRebalanceError, ClusterServerCountError, ClusterServerEngineError, ClusterServerIDError, @@ -134,3 +136,55 @@ def test_cluster_server_count(db, bad_db, cluster): with assert_raises(ClusterServerCountError) as err: bad_db.cluster.server_count() assert err.value.error_code in {FORBIDDEN, DATABASE_NOT_FOUND} + + +def test_cluster_rebalance(sys_db, bad_db, cluster, db_version): + if not cluster: + pytest.skip("Only tested in a cluster setup") + + if db_version < version.parse("3.10.0"): + pytest.skip("Only tested on ArangoDB 3.10+") + + # Test imbalance retrieval + imbalance = sys_db.cluster.calculate_imbalance() + assert "leader" in imbalance + assert "shards" in imbalance + assert imbalance["pendingMoveShards"] == 0 + assert imbalance["todoMoveShards"] == 0 + + with assert_raises(ClusterRebalanceError) as err: + bad_db.cluster.calculate_imbalance() + assert err.value.error_code == FORBIDDEN + + # Test rebalance computation + rebalance = sys_db.cluster.calculate_rebalance_plan( + max_moves=3, + leader_changes=True, + move_leaders=True, + move_followers=True, + pi_factor=1234.5, + databases_excluded=["_system"], + ) + assert "imbalanceBefore" in rebalance + assert "imbalanceAfter" in rebalance + assert "moves" in rebalance + + with assert_raises(ClusterRebalanceError) as err: + bad_db.cluster.calculate_rebalance_plan() + assert err.value.error_code == FORBIDDEN + + # Test rebalance execution + assert sys_db.cluster.execute_rebalance_plan(rebalance["moves"]) is True + with assert_raises(ClusterRebalanceError) as err: + bad_db.cluster.execute_rebalance_plan(rebalance["moves"]) + assert err.value.error_code == FORBIDDEN + + # Rebalance cluster in one go + rebalance = sys_db.cluster.rebalance() + assert "imbalanceBefore" in rebalance + assert "imbalanceAfter" in rebalance + assert "moves" in rebalance + + with assert_raises(ClusterRebalanceError) as err: + bad_db.cluster.rebalance() + assert err.value.error_code == FORBIDDEN