Skip to content

Commit

Permalink
[DE-666] Cluster Rebalance (#287)
Browse files Browse the repository at this point in the history
* Adding cluster-rebalance methods

* Adding cluster-rebalance tests

* Updating cluster docs with rebalance example

* Stating ArangoDB version in the docs.
  • Loading branch information
apetenchea authored Sep 18, 2023
1 parent e5ff4d7 commit 0729f20
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 1 deletion.
177 changes: 176 additions & 1 deletion arango/cluster.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
__all__ = ["Cluster"]

from typing import List
from typing import List, Optional

from arango.api import ApiGroup
from arango.exceptions import (
ClusterEndpointsError,
ClusterHealthError,
ClusterMaintenanceModeError,
ClusterRebalanceError,
ClusterServerCountError,
ClusterServerEngineError,
ClusterServerIDError,
Expand Down Expand Up @@ -195,3 +196,177 @@ def response_handler(resp: Response) -> List[str]:
return [item["endpoint"] for item in resp.body["endpoints"]]

return self._execute(request, response_handler)

def calculate_imbalance(self) -> Result[Json]:
"""Compute the current cluster imbalance, including
the amount of ongoing and pending move shard operations.
:return: Cluster imbalance information.
:rtype: dict
:raise: arango.exceptions.ClusterRebalanceError: If retrieval fails.
"""
request = Request(method="get", endpoint="/_admin/cluster/rebalance")

def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ClusterRebalanceError(resp, request)
result: Json = resp.body["result"]
return result

return self._execute(request, response_handler)

def rebalance(
self,
version: int = 1,
max_moves: Optional[int] = None,
leader_changes: Optional[bool] = None,
move_leaders: Optional[bool] = None,
move_followers: Optional[bool] = None,
pi_factor: Optional[float] = None,
exclude_system_collections: Optional[bool] = None,
databases_excluded: Optional[List[str]] = None,
) -> Result[Json]:
"""Compute and execute a cluster rebalance plan.
:param version: Must be set to 1.
:type version: int
:param max_moves: Maximum number of moves to be computed.
:type max_moves: int | None
:param leader_changes: Allow leader changes without moving data.
:type leader_changes: bool | None
:param move_leaders: Allow moving shard leaders.
:type move_leaders: bool | None
:param move_followers: Allow moving shard followers.
:type move_followers: bool | None
:param pi_factor: A weighting factor that should remain untouched.
:type pi_factor: float | None
:param exclude_system_collections: Ignore system collections in the
rebalance plan.
:type exclude_system_collections: bool | None
:param databases_excluded: List of database names to be excluded
from the analysis.
:type databases_excluded: [str] | None
:return: Cluster rebalance plan that has been executed.
:rtype: dict
:raise: arango.exceptions.ClusterRebalanceError: If retrieval fails.
"""
data: Json = dict(version=version)
if max_moves is not None:
data["maximumNumberOfMoves"] = max_moves
if leader_changes is not None:
data["leaderChanges"] = leader_changes
if move_leaders is not None:
data["moveLeaders"] = move_leaders
if move_followers is not None:
data["moveFollowers"] = move_followers
if pi_factor is not None:
data["piFactor"] = pi_factor
if exclude_system_collections is not None:
data["excludeSystemCollections"] = exclude_system_collections
if databases_excluded is not None:
data["databasesExcluded"] = databases_excluded

request = Request(method="put", endpoint="/_admin/cluster/rebalance", data=data)

def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ClusterRebalanceError(resp, request)
result: Json = resp.body["result"]
return result

return self._execute(request, response_handler)

def calculate_rebalance_plan(
self,
version: int = 1,
max_moves: Optional[int] = None,
leader_changes: Optional[bool] = None,
move_leaders: Optional[bool] = None,
move_followers: Optional[bool] = None,
pi_factor: Optional[float] = None,
exclude_system_collections: Optional[bool] = None,
databases_excluded: Optional[List[str]] = None,
) -> Result[Json]:
"""Compute the cluster rebalance plan.
:param version: Must be set to 1.
:type version: int
:param max_moves: Maximum number of moves to be computed.
:type max_moves: int | None
:param leader_changes: Allow leader changes without moving data.
:type leader_changes: bool | None
:param move_leaders: Allow moving shard leaders.
:type move_leaders: bool | None
:param move_followers: Allow moving shard followers.
:type move_followers: bool | None
:param pi_factor: A weighting factor that should remain untouched.
:type pi_factor: float | None
:param exclude_system_collections: Ignore system collections in the
rebalance plan.
:type exclude_system_collections: bool | None
:param databases_excluded: List of database names to be excluded
from the analysis.
:type databases_excluded: [str] | None
:return: Cluster rebalance plan.
:rtype: dict
:raise: arango.exceptions.ClusterRebalanceError: If retrieval fails.
"""
data: Json = dict(version=version)
if max_moves is not None:
data["maximumNumberOfMoves"] = max_moves
if leader_changes is not None:
data["leaderChanges"] = leader_changes
if move_leaders is not None:
data["moveLeaders"] = move_leaders
if move_followers is not None:
data["moveFollowers"] = move_followers
if pi_factor is not None:
data["piFactor"] = pi_factor
if exclude_system_collections is not None:
data["excludeSystemCollections"] = exclude_system_collections
if databases_excluded is not None:
data["databasesExcluded"] = databases_excluded

request = Request(
method="post", endpoint="/_admin/cluster/rebalance", data=data
)

def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise ClusterRebalanceError(resp, request)
result: Json = resp.body["result"]
return result

return self._execute(request, response_handler)

def execute_rebalance_plan(
self, moves: List[Json], version: int = 1
) -> Result[bool]:
"""Execute the given set of move shard operations.
You can use :meth:`Cluster.calculate_rebalance_plan` to calculate
these operations to improve the balance of shards, leader shards,
and follower shards.
:param moves: List of move shard operations.
:type moves: [dict]
:param version: Must be set to 1.
:type version: int
:return: True if the methods have been accepted and scheduled
for execution.
:rtype: bool
:raise: arango.exceptions.ClusterRebalanceError: If request fails.
"""
data: Json = dict(version=version, moves=moves)

request = Request(
method="post", endpoint="/_admin/cluster/rebalance/execute", data=data
)

def response_handler(resp: Response) -> bool:
if not resp.is_success:
raise ClusterRebalanceError(resp, request)
result: bool = resp.body["code"] == 202
return result

return self._execute(request, response_handler)
4 changes: 4 additions & 0 deletions arango/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,10 @@ class ClusterServerCountError(ArangoServerError):
"""Failed to retrieve cluster server count."""


class ClusterRebalanceError(ArangoServerError):
"""Failed to execute cluster re-balancing operation (load/set)."""


##################
# JWT Exceptions #
##################
Expand Down
3 changes: 3 additions & 0 deletions docs/cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,7 @@ Below is an example on how to manage clusters using python-arango.
cluster.toggle_maintenance_mode('on')
cluster.toggle_maintenance_mode('off')
# Rebalance the distribution of shards. Available with ArangoDB 3.10+.
cluster.rebalance()
See :ref:`ArangoClient` and :ref:`Cluster` for API specification.
54 changes: 54 additions & 0 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import pytest
from packaging import version

from arango.errno import DATABASE_NOT_FOUND, FORBIDDEN
from arango.exceptions import (
ClusterEndpointsError,
ClusterHealthError,
ClusterMaintenanceModeError,
ClusterRebalanceError,
ClusterServerCountError,
ClusterServerEngineError,
ClusterServerIDError,
Expand Down Expand Up @@ -134,3 +136,55 @@ def test_cluster_server_count(db, bad_db, cluster):
with assert_raises(ClusterServerCountError) as err:
bad_db.cluster.server_count()
assert err.value.error_code in {FORBIDDEN, DATABASE_NOT_FOUND}


def test_cluster_rebalance(sys_db, bad_db, cluster, db_version):
if not cluster:
pytest.skip("Only tested in a cluster setup")

if db_version < version.parse("3.10.0"):
pytest.skip("Only tested on ArangoDB 3.10+")

# Test imbalance retrieval
imbalance = sys_db.cluster.calculate_imbalance()
assert "leader" in imbalance
assert "shards" in imbalance
assert imbalance["pendingMoveShards"] == 0
assert imbalance["todoMoveShards"] == 0

with assert_raises(ClusterRebalanceError) as err:
bad_db.cluster.calculate_imbalance()
assert err.value.error_code == FORBIDDEN

# Test rebalance computation
rebalance = sys_db.cluster.calculate_rebalance_plan(
max_moves=3,
leader_changes=True,
move_leaders=True,
move_followers=True,
pi_factor=1234.5,
databases_excluded=["_system"],
)
assert "imbalanceBefore" in rebalance
assert "imbalanceAfter" in rebalance
assert "moves" in rebalance

with assert_raises(ClusterRebalanceError) as err:
bad_db.cluster.calculate_rebalance_plan()
assert err.value.error_code == FORBIDDEN

# Test rebalance execution
assert sys_db.cluster.execute_rebalance_plan(rebalance["moves"]) is True
with assert_raises(ClusterRebalanceError) as err:
bad_db.cluster.execute_rebalance_plan(rebalance["moves"])
assert err.value.error_code == FORBIDDEN

# Rebalance cluster in one go
rebalance = sys_db.cluster.rebalance()
assert "imbalanceBefore" in rebalance
assert "imbalanceAfter" in rebalance
assert "moves" in rebalance

with assert_raises(ClusterRebalanceError) as err:
bad_db.cluster.rebalance()
assert err.value.error_code == FORBIDDEN

0 comments on commit 0729f20

Please sign in to comment.