Skip to content

Commit

Permalink
Add user and role management
Browse files Browse the repository at this point in the history
  • Loading branch information
vuilleumierc committed Oct 10, 2024
1 parent 211af87 commit 283be30
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 0 deletions.
60 changes: 60 additions & 0 deletions geoservercloud/geoservercloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,44 @@ def get_property_value(
else:
return value_collection.get("wfs:member", {})

def create_user(self, user: str, password: str, enabled: bool = True) -> Response:
"""
Create a GeoServer user
"""
headers: dict[str, str] = {"Content-Type": "application/json"}
payload: dict[str, dict[str, Any]] = {
"user": {
"userName": user,
"password": password,
"enabled": enabled,
}
}
return self.post_request(
"/rest/security/usergroup/users", json=payload, headers=headers
)

def update_user(
self, user: str, password: str | None = None, enabled: bool | None = None
) -> Response:
"""
Update a GeoServer user
"""
headers: dict[str, str] = {"Content-Type": "application/json"}
payload: dict[str, dict[str, Any]] = {"user": {}}
if password:
payload["user"]["password"] = password
if enabled is not None:
payload["user"]["enabled"] = enabled
return self.post_request(
f"/rest/security/usergroup/user/{user}", json=payload, headers=headers
)

def delete_user(self, user: str) -> Response:
"""
Delete a GeoServer user
"""
return self.delete_request(f"/rest/security/usergroup/user/{user}")

def create_role(self, role_name: str) -> Response:
"""
Create a GeoServer role
Expand Down Expand Up @@ -678,6 +716,28 @@ def role_exists(self, role_name: str) -> bool:
roles = response.json().get("roles", [])
return role_name in roles

def get_user_roles(self, user: str) -> list[str] | Response:
"""
Get all roles assigned to a GeoServer user
"""
response = self.get_request(f"/rest/security/roles/user/{user}.json")
try:
return response.json().get("roles")
except JSONDecodeError:
return response

def assign_role_to_user(self, user: str, role: str) -> Response:
"""
Assign a role to a GeoServer user
"""
return self.post_request(f"/rest/security/roles/role/{role}/user/{user}")

def remove_role_from_user(self, user: str, role: str) -> Response:
"""
Remove a role from a GeoServer user
"""
return self.delete_request(f"/rest/security/roles/role/{role}/user/{user}")

def create_acl_admin_rule(
self,
priority: int = 0,
Expand Down
43 changes: 43 additions & 0 deletions tests/test_security.py → tests/test_role.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,46 @@ def test_delete_role(geoserver: GeoServerCloud) -> None:
response = geoserver.delete_role(role)

assert response.status_code == 200


def test_get_user_roles(geoserver: GeoServerCloud) -> None:
user = "test_user"
roles = ["test_role1", "test_role2"]
with responses.RequestsMock() as rsps:
rsps.get(
url=f"{geoserver.url}/rest/security/roles/user/{user}.json",
status=200,
json={"roles": roles},
)

response = geoserver.get_user_roles(user)

assert response == roles


def test_assign_role_to_user(geoserver: GeoServerCloud) -> None:
user = "test_user"
role = "test_role"
with responses.RequestsMock() as rsps:
rsps.post(
url=f"{geoserver.url}/rest/security/roles/role/{role}/user/{user}",
status=200,
)

response = geoserver.assign_role_to_user(user, role)

assert response.status_code == 200


def test_remove_role_from_user(geoserver: GeoServerCloud) -> None:
user = "test_user"
role = "test_role"
with responses.RequestsMock() as rsps:
rsps.delete(
url=f"{geoserver.url}/rest/security/roles/role/{role}/user/{user}",
status=200,
)

response = geoserver.remove_role_from_user(user, role)

assert response.status_code == 200
74 changes: 74 additions & 0 deletions tests/test_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import responses
import responses.matchers

from geoservercloud.geoservercloud import GeoServerCloud

TEST_USER = "test_user"


def test_create_user(geoserver: GeoServerCloud) -> None:
with responses.RequestsMock() as rsps:
rsps.post(
url=f"{geoserver.url}/rest/security/usergroup/users",
status=201,
match=[
responses.matchers.json_params_matcher(
{
"user": {
"userName": TEST_USER,
"password": "test_password",
"enabled": True,
}
}
)
],
)
response = geoserver.create_user(TEST_USER, "test_password")
assert response.status_code == 201


def test_update_user_password(geoserver: GeoServerCloud) -> None:
with responses.RequestsMock() as rsps:
rsps.post(
url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}",
status=200,
match=[
responses.matchers.json_params_matcher(
{
"user": {
"password": "new_password",
}
}
)
],
)
response = geoserver.update_user(TEST_USER, password="new_password")
assert response.status_code == 200


def test_update_user_enabled(geoserver: GeoServerCloud) -> None:
with responses.RequestsMock() as rsps:
rsps.post(
url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}",
status=200,
match=[
responses.matchers.json_params_matcher(
{
"user": {
"enabled": False,
}
}
)
],
)
response = geoserver.update_user(TEST_USER, enabled=False)
assert response.status_code == 200


def test_delete_user(geoserver: GeoServerCloud) -> None:
with responses.RequestsMock() as rsps:
rsps.delete(
url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}", status=200
)
response = geoserver.delete_user(TEST_USER)
assert response.status_code == 200

0 comments on commit 283be30

Please sign in to comment.