diff --git a/geoservercloud/geoservercloud.py b/geoservercloud/geoservercloud.py index c5ded87..a20ecab 100644 --- a/geoservercloud/geoservercloud.py +++ b/geoservercloud/geoservercloud.py @@ -648,6 +648,44 @@ def get_property_value( else: return value_collection.get("wfs:member", {}) + def create_user(self, user: str, password: str, enabled: bool = True) -> Response: + """ + Create a GeoServer user + """ + headers: dict[str, str] = {"Content-Type": "application/json"} + payload: dict[str, dict[str, Any]] = { + "user": { + "userName": user, + "password": password, + "enabled": enabled, + } + } + return self.post_request( + "/rest/security/usergroup/users", json=payload, headers=headers + ) + + def update_user( + self, user: str, password: str | None = None, enabled: bool | None = None + ) -> Response: + """ + Update a GeoServer user + """ + headers: dict[str, str] = {"Content-Type": "application/json"} + payload: dict[str, dict[str, Any]] = {"user": {}} + if password: + payload["user"]["password"] = password + if enabled is not None: + payload["user"]["enabled"] = enabled + return self.post_request( + f"/rest/security/usergroup/user/{user}", json=payload, headers=headers + ) + + def delete_user(self, user: str) -> Response: + """ + Delete a GeoServer user + """ + return self.delete_request(f"/rest/security/usergroup/user/{user}") + def create_role(self, role_name: str) -> Response: """ Create a GeoServer role @@ -678,6 +716,28 @@ def role_exists(self, role_name: str) -> bool: roles = response.json().get("roles", []) return role_name in roles + def get_user_roles(self, user: str) -> list[str] | Response: + """ + Get all roles assigned to a GeoServer user + """ + response = self.get_request(f"/rest/security/roles/user/{user}.json") + try: + return response.json().get("roles") + except JSONDecodeError: + return response + + def assign_role_to_user(self, user: str, role: str) -> Response: + """ + Assign a role to a GeoServer user + """ + return self.post_request(f"/rest/security/roles/role/{role}/user/{user}") + + def remove_role_from_user(self, user: str, role: str) -> Response: + """ + Remove a role from a GeoServer user + """ + return self.delete_request(f"/rest/security/roles/role/{role}/user/{user}") + def create_acl_admin_rule( self, priority: int = 0, diff --git a/tests/test_security.py b/tests/test_role.py similarity index 57% rename from tests/test_security.py rename to tests/test_role.py index e15e00c..06c1808 100644 --- a/tests/test_security.py +++ b/tests/test_role.py @@ -56,3 +56,46 @@ def test_delete_role(geoserver: GeoServerCloud) -> None: response = geoserver.delete_role(role) assert response.status_code == 200 + + +def test_get_user_roles(geoserver: GeoServerCloud) -> None: + user = "test_user" + roles = ["test_role1", "test_role2"] + with responses.RequestsMock() as rsps: + rsps.get( + url=f"{geoserver.url}/rest/security/roles/user/{user}.json", + status=200, + json={"roles": roles}, + ) + + response = geoserver.get_user_roles(user) + + assert response == roles + + +def test_assign_role_to_user(geoserver: GeoServerCloud) -> None: + user = "test_user" + role = "test_role" + with responses.RequestsMock() as rsps: + rsps.post( + url=f"{geoserver.url}/rest/security/roles/role/{role}/user/{user}", + status=200, + ) + + response = geoserver.assign_role_to_user(user, role) + + assert response.status_code == 200 + + +def test_remove_role_from_user(geoserver: GeoServerCloud) -> None: + user = "test_user" + role = "test_role" + with responses.RequestsMock() as rsps: + rsps.delete( + url=f"{geoserver.url}/rest/security/roles/role/{role}/user/{user}", + status=200, + ) + + response = geoserver.remove_role_from_user(user, role) + + assert response.status_code == 200 diff --git a/tests/test_user.py b/tests/test_user.py new file mode 100644 index 0000000..1d833da --- /dev/null +++ b/tests/test_user.py @@ -0,0 +1,74 @@ +import responses +import responses.matchers + +from geoservercloud.geoservercloud import GeoServerCloud + +TEST_USER = "test_user" + + +def test_create_user(geoserver: GeoServerCloud) -> None: + with responses.RequestsMock() as rsps: + rsps.post( + url=f"{geoserver.url}/rest/security/usergroup/users", + status=201, + match=[ + responses.matchers.json_params_matcher( + { + "user": { + "userName": TEST_USER, + "password": "test_password", + "enabled": True, + } + } + ) + ], + ) + response = geoserver.create_user(TEST_USER, "test_password") + assert response.status_code == 201 + + +def test_update_user_password(geoserver: GeoServerCloud) -> None: + with responses.RequestsMock() as rsps: + rsps.post( + url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}", + status=200, + match=[ + responses.matchers.json_params_matcher( + { + "user": { + "password": "new_password", + } + } + ) + ], + ) + response = geoserver.update_user(TEST_USER, password="new_password") + assert response.status_code == 200 + + +def test_update_user_enabled(geoserver: GeoServerCloud) -> None: + with responses.RequestsMock() as rsps: + rsps.post( + url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}", + status=200, + match=[ + responses.matchers.json_params_matcher( + { + "user": { + "enabled": False, + } + } + ) + ], + ) + response = geoserver.update_user(TEST_USER, enabled=False) + assert response.status_code == 200 + + +def test_delete_user(geoserver: GeoServerCloud) -> None: + with responses.RequestsMock() as rsps: + rsps.delete( + url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}", status=200 + ) + response = geoserver.delete_user(TEST_USER) + assert response.status_code == 200