From 847295b6ae6d10b276dbecf0ce5c04bb3a68cf2a Mon Sep 17 00:00:00 2001 From: Andrea Borghi Date: Fri, 11 Oct 2024 15:52:36 +0200 Subject: [PATCH] Use service endpoints from services.endpoints instead of hardcoded ones --- geoservercloud/geoservercloud.py | 252 +++++++++++++++------------ geoservercloud/services/__init__.py | 9 +- geoservercloud/services/endpoints.py | 174 ++++++++++++++++++ tests/test_role.py | 16 +- tests/test_style.py | 4 +- tests/test_user.py | 9 +- 6 files changed, 341 insertions(+), 123 deletions(-) create mode 100644 geoservercloud/services/endpoints.py diff --git a/geoservercloud/geoservercloud.py b/geoservercloud/geoservercloud.py index 8370f9c..62833d3 100644 --- a/geoservercloud/geoservercloud.py +++ b/geoservercloud/geoservercloud.py @@ -10,7 +10,13 @@ from requests import Response from geoservercloud import utils -from geoservercloud.services import RestService +from geoservercloud.services import ( + AclEndpoints, + GwcEndpoints, + OwsEndpoints, + RestEndpoints, + RestService, +) from geoservercloud.templates import Templates @@ -27,15 +33,15 @@ def __init__( self.password: str = password self.auth: tuple[str, str] = (user, password) self.rest_service: RestService = RestService(url, self.auth) + self.acl_endpoints: AclEndpoints = AclEndpoints() + self.gwc_endpoints: GwcEndpoints = GwcEndpoints() + self.ows_endpoints: OwsEndpoints = OwsEndpoints() + self.rest_endpoints: RestEndpoints = RestEndpoints() self.wms: WebMapService_1_3_0 | None = None self.wmts: WebMapTileService | None = None self.default_workspace: str | None = None self.default_datastore: str | None = None - @staticmethod - def workspace_wms_settings_path(workspace: str) -> str: - return f"/rest/services/wms/workspaces/{workspace}/settings.json" - @staticmethod def get_wmts_layer_bbox( url: str, layer_name: str @@ -48,9 +54,9 @@ def get_wmts_layer_bbox( def create_wms(self) -> None: if self.default_workspace: - path: str = f"/{self.default_workspace}/wms" + path: str = self.ows_endpoints.workspace_wms(self.default_workspace) else: - path = "/wms" + path = self.ows_endpoints.wms() self.wms = WebMapService_1_3_0( f"{self.url}{path}", username=self.user, @@ -59,7 +65,7 @@ def create_wms(self) -> None: ) def create_wmts(self) -> None: - path = "/gwc/service/wmts" + path = self.ows_endpoints.wmts() self.wmts = WebMapTileService( f"{self.url}{path}", version="1.0.0", @@ -83,10 +89,12 @@ def create_workspace( "isolated": isolated, } } - response: Response = self.post_request("/rest/workspaces.json", json=payload) + response: Response = self.post_request( + self.rest_endpoints.workspaces(), json=payload + ) if response.status_code == 409: response = self.put_request( - f"/rest/workspaces/{workspace}.json", json=payload + self.rest_endpoints.workspace(workspace), json=payload ) if set_default_workspace: self.default_workspace = workspace @@ -96,8 +104,9 @@ def delete_workspace(self, workspace: str) -> Response: """ Delete a GeoServer workspace (recursively) """ - path: str = f"/rest/workspaces/{workspace}.json?recurse=true" - response: Response = self.delete_request(path) + response: Response = self.delete_request( + self.rest_endpoints.workspace(workspace), params={"recurse": "true"} + ) if self.default_workspace == workspace: self.default_workspace = None self.wms = None @@ -119,10 +128,10 @@ def publish_workspace(self, workspace) -> Response: """ Publish the WMS service for a given workspace """ - path: str = f"{self.workspace_wms_settings_path(workspace)}" - data: dict[str, dict[str, Any]] = Templates.workspace_wms(workspace) - return self.put_request(path, json=data) + return self.put_request( + self.rest_endpoints.workspace_wms_settings(workspace), json=data + ) def set_default_locale_for_service( self, workspace: str, locale: str | None @@ -130,13 +139,14 @@ def set_default_locale_for_service( """ Set a default language for localized WMS requests """ - path: str = self.workspace_wms_settings_path(workspace) data: dict[str, dict[str, Any]] = { "wms": { "defaultLocale": locale, } } - return self.put_request(path, json=data) + return self.put_request( + self.rest_endpoints.workspace_wms_settings(workspace), json=data + ) def unset_default_locale_for_service(self, workspace) -> None: """ @@ -160,8 +170,6 @@ def create_pg_datastore( Create a PostGIS datastore from the DB connection parameters, or update it if it already exist. """ response: None | Response = None - path = f"/rest/workspaces/{workspace}/datastores.json" - resource_path = f"/rest/workspaces/{workspace}/datastores/{datastore}.json" payload: dict[str, dict[str, Any]] = Templates.postgis_data_store( datastore=datastore, pg_host=pg_host, @@ -172,10 +180,16 @@ def create_pg_datastore( namespace=f"http://{workspace}", pg_schema=pg_schema, ) - if not self.resource_exists(resource_path): - response = self.post_request(path, json=payload) + if not self.resource_exists( + self.rest_endpoints.datastore(workspace, datastore) + ): + response = self.post_request( + self.rest_endpoints.datastores(workspace), json=payload + ) else: - response = self.put_request(resource_path, json=payload) + response = self.put_request( + self.rest_endpoints.datastore(workspace, datastore), json=payload + ) if set_default_datastore: self.default_datastore = datastore @@ -195,8 +209,6 @@ def create_jndi_datastore( Create a PostGIS datastore from JNDI resource, or update it if it already exist. """ response: None | Response = None - path = f"/rest/workspaces/{workspace}/datastores.json" - resource_path = f"/rest/workspaces/{workspace}/datastores/{datastore}.json" payload: dict[str, dict[str, Any]] = Templates.postgis_jndi_data_store( datastore=datastore, jndi_reference=jndi_reference, @@ -204,10 +216,16 @@ def create_jndi_datastore( pg_schema=pg_schema, description=description, ) - if not self.resource_exists(resource_path): - response = self.post_request(path, json=payload) + if not self.resource_exists( + self.rest_endpoints.datastore(workspace, datastore) + ): + response = self.post_request( + self.rest_endpoints.datastores(workspace), json=payload + ) else: - response = self.put_request(resource_path, json=payload) + response = self.put_request( + self.rest_endpoints.datastore(workspace, datastore), json=payload + ) if set_default_datastore: self.default_datastore = datastore @@ -223,13 +241,15 @@ def create_wmts_store( """ Create a cascaded WMTS store, or update it if it already exist. """ - path = f"/rest/workspaces/{workspace}/wmtsstores.json" - resource_path = f"/rest/workspaces/{workspace}/wmtsstores/{name}.json" payload = Templates.wmts_store(workspace, name, capabilities) - if not self.resource_exists(resource_path): - return self.post_request(path, json=payload) + if not self.resource_exists(self.rest_endpoints.wmtsstore(workspace, name)): + return self.post_request( + self.rest_endpoints.wmtsstores(workspace), json=payload + ) else: - return self.put_request(resource_path, json=payload) + return self.put_request( + self.rest_endpoints.wmtsstore(workspace, name), json=payload + ) def create_feature_type( self, @@ -250,12 +270,6 @@ def create_feature_type( datastore = datastore or self.default_datastore if not datastore: raise ValueError("Datastore not provided") - path: str = ( - f"/rest/workspaces/{workspace}/datastores/{datastore}/featuretypes.json" - ) - resource_path: str = ( - f"/rest/workspaces/{workspace}/datastores/{datastore}/featuretypes/{layer}.json" - ) payload: dict[str, dict[str, Any]] = Templates.feature_type( layer=layer, workspace=workspace, @@ -272,10 +286,17 @@ def create_feature_type( else: payload["featureType"]["abstract"] = abstract - if not self.resource_exists(resource_path): - return self.post_request(path, json=payload) + if not self.resource_exists( + self.rest_endpoints.featuretype(workspace, datastore, layer) + ): + return self.post_request( + self.rest_endpoints.featuretypes(workspace, datastore), json=payload + ) else: - return self.put_request(resource_path, json=payload) + return self.put_request( + self.rest_endpoints.featuretype(workspace, datastore, layer), + json=payload, + ) def create_layer_group( self, @@ -293,8 +314,6 @@ def create_layer_group( workspace = workspace or self.default_workspace if not workspace: raise ValueError("Workspace not provided") - path: str = f"/rest/workspaces/{workspace}/layergroups.json" - resource_path: str = f"/rest/workspaces/{workspace}/layergroups/{group}.json" payload: dict[str, dict[str, Any]] = Templates.layer_group( group=group, layers=layers, @@ -304,10 +323,14 @@ def create_layer_group( epsg=epsg, mode=mode, ) - if not self.resource_exists(resource_path): - return self.post_request(path, json=payload) + if not self.resource_exists(self.rest_endpoints.layergroup(workspace, group)): + return self.post_request( + self.rest_endpoints.layergroups(workspace), json=payload + ) else: - return self.put_request(resource_path, json=payload) + return self.put_request( + self.rest_endpoints.layergroup(workspace, group), json=payload + ) def create_wmts_layer( self, @@ -324,19 +347,21 @@ def create_wmts_layer( """ if not published_layer: published_layer = native_layer - resource_path = f"/rest/workspaces/{workspace}/wmtsstores/{wmts_store}/layers/{published_layer}.json" - if self.resource_exists(resource_path): - self.delete_request(resource_path, params={"recurse": "true"}) - wmts_store_path = f"/rest/workspaces/{workspace}/wmtsstores/{wmts_store}.json" + if self.resource_exists( + self.rest_endpoints.wmtslayer(workspace, wmts_store, published_layer) + ): + self.delete_request( + self.rest_endpoints.wmtslayer(workspace, wmts_store, published_layer), + params={"recurse": "true"}, + ) capabilities_url = ( - self.get_request(wmts_store_path) + self.get_request(self.rest_endpoints.wmtsstore(workspace, wmts_store)) .json() .get("wmtsStore") .get("capabilitiesURL") ) wgs84_bbox = self.get_wmts_layer_bbox(capabilities_url, native_layer) - path = f"/rest/workspaces/{workspace}/wmtsstores/{wmts_store}/layers.json" payload = Templates.wmts_layer( published_layer, native_layer, @@ -346,11 +371,12 @@ def create_wmts_layer( international_abstract=international_abstract, ) - return self.post_request(path, json=payload) + return self.post_request( + self.rest_endpoints.wmtslayers(workspace, wmts_store), json=payload + ) def get_gwc_layer(self, workspace: str, layer: str) -> dict[str, Any] | None: - path = f"/gwc/rest/layers/{workspace}:{layer}.json" - response = self.get_request(path) + response = self.get_request(self.gwc_endpoints.layer(workspace, layer)) if response.status_code == 404: return None return response.json() @@ -360,7 +386,7 @@ def publish_gwc_layer( ) -> Response | None: # Reload config to make sure GWC is aware of GeoServer layers self.post_request( - "/gwc/rest/reload", + self.gwc_endpoints.reload(), headers={"Content-Type": "application/json"}, data="reload_configuration=1", # type: ignore ) @@ -369,7 +395,7 @@ def publish_gwc_layer( return None payload = Templates.gwc_layer(workspace, layer, f"EPSG:{epsg}") return self.put_request( - f"/gwc/rest/layers/{workspace}:{layer}.json", + self.gwc_endpoints.layer(workspace, layer), json=payload, ) @@ -382,12 +408,14 @@ def create_style_from_file( """Create a style from a file, or update it if it already exists. Supported file extensions are .sld and .zip.""" path = ( - "/rest/styles" if not workspace else f"/rest/workspaces/{workspace}/styles" + self.rest_endpoints.styles() + if not workspace + else self.rest_endpoints.workspace_styles(workspace) ) resource_path = ( - f"/rest/styles/{style}.json" + self.rest_endpoints.style(style) if not workspace - else f"/rest/workspaces/{workspace}/styles/{style}.json" + else self.rest_endpoints.workspace_style(workspace, style) ) file_ext = os.path.splitext(file)[1] @@ -409,14 +437,15 @@ def create_style_from_file( def set_default_layer_style( self, layer: str, workspace: str, style: str ) -> Response: - path = f"/rest/layers/{workspace}:{layer}.json" data = {"layer": {"defaultStyle": {"name": style}}} - return self.put_request(path, json=data) + return self.put_request( + self.rest_endpoints.workspace_layer(workspace, layer), json=data + ) def get_wms_capabilities( self, workspace: str, accept_languages=None ) -> dict[str, Any]: - path: str = f"/{workspace}/wms" + path: str = self.ows_endpoints.workspace_wms(workspace) params: dict[str, str] = { "service": "WMS", "version": "1.3.0", @@ -439,13 +468,14 @@ def get_wms_layers( return capabilities def get_wfs_capabilities(self, workspace: str) -> dict[str, Any]: - path: str = f"/{workspace}/wfs" params: dict[str, str] = { "service": "WFS", "version": "1.1.0", "request": "GetCapabilities", } - response: Response = self.get_request(path, params=params) + response: Response = self.get_request( + self.ows_endpoints.workspace_wfs(workspace), params=params + ) return xmltodict.parse(response.content) def get_wfs_layers(self, workspace: str) -> Any | dict[str, Any]: @@ -534,9 +564,9 @@ def get_legend_graphic( """ path: str if not workspace: - path = "/wms" + path = self.ows_endpoints.wms() else: - path = f"/{workspace}/wms" + path = self.ows_endpoints.workspace_wms(workspace) params: dict[str, Any] = { "service": "WMS", "version": "1.3.0", @@ -572,8 +602,8 @@ def get_tile( def get_feature( self, - workspace: str, - type_name: str, + workspace: str | None = None, + type_name: str | None = None, feature_id: int | None = None, max_feature: int | None = None, format: str = "application/json", @@ -581,7 +611,10 @@ def get_feature( """WFS GetFeature request Return the feature(s) as dict if found, otherwise return the raw response content as bytes """ - path = f"/{workspace}/wfs" + if not workspace: + path = self.ows_endpoints.wfs() + else: + path = self.ows_endpoints.workspace_wfs(workspace) params = { "service": "WFS", "version": "1.1.0", @@ -601,14 +634,17 @@ def get_feature( def describe_feature_type( self, - workspace: str, + workspace: str | None = None, type_name: str | None = None, format: str = "application/json", ) -> dict[str, Any] | bytes: """WFS DescribeFeatureType request Return the feature type(s) as dict if found, otherwise return the raw response content as bytes """ - path = f"/{workspace}/wfs" + if not workspace: + path = self.ows_endpoints.wfs() + else: + path = self.ows_endpoints.workspace_wfs(workspace) params = { "service": "WFS", "version": "1.1.0", @@ -625,15 +661,18 @@ def describe_feature_type( def get_property_value( self, - workspace: str, - type_name: str, - property: str, + workspace: str | None = None, + type_name: str | None = None, + property: str | None = None, ) -> dict | list | bytes: """WFS GetPropertyValue request Return the properties as dict (if one feature was found), a list (if multiple features were found) or an empty dict if no feature was found. Otherwise throw a requests.exceptions.HTTPError """ - path = f"/{workspace}/wfs" + if not workspace: + path = self.ows_endpoints.wfs() + else: + path = self.ows_endpoints.workspace_wfs(workspace) params = { "service": "WFS", "version": "2.0.0", @@ -661,7 +700,7 @@ def create_user(self, user: str, password: str, enabled: bool = True) -> Respons } } return self.post_request( - "/rest/security/usergroup/users", json=payload, headers=headers + self.rest_endpoints.users(), json=payload, headers=headers ) def update_user( @@ -677,26 +716,26 @@ def update_user( if enabled is not None: payload["user"]["enabled"] = enabled return self.post_request( - f"/rest/security/usergroup/user/{user}", json=payload, headers=headers + self.rest_endpoints.user(user), json=payload, headers=headers ) def delete_user(self, user: str) -> Response: """ Delete a GeoServer user """ - return self.delete_request(f"/rest/security/usergroup/user/{user}") + return self.delete_request(self.rest_endpoints.user(user)) def create_role(self, role_name: str) -> Response: """ Create a GeoServer role """ - return self.post_request(f"/rest/security/roles/role/{role_name}") + return self.post_request(self.rest_endpoints.role(role_name)) def delete_role(self, role_name: str) -> Response: """ Delete a GeoServer role """ - return self.delete_request(f"/rest/security/roles/role/{role_name}") + return self.delete_request(self.rest_endpoints.role(role_name)) def create_role_if_not_exists(self, role_name: str) -> Response | None: """ @@ -711,7 +750,7 @@ def role_exists(self, role_name: str) -> bool: Check if a GeoServer role exists """ response = self.get_request( - f"/rest/security/roles", headers={"Accept": "application/json"} + self.rest_endpoints.roles(), headers={"Accept": "application/json"} ) roles = response.json().get("roles", []) return role_name in roles @@ -720,7 +759,7 @@ def get_user_roles(self, user: str) -> list[str] | Response: """ Get all roles assigned to a GeoServer user """ - response = self.get_request(f"/rest/security/roles/user/{user}.json") + response = self.get_request(self.rest_endpoints.user_roles(user)) try: return response.json().get("roles") except JSONDecodeError: @@ -730,13 +769,13 @@ def assign_role_to_user(self, user: str, role: str) -> Response: """ Assign a role to a GeoServer user """ - return self.post_request(f"/rest/security/roles/role/{role}/user/{user}") + return self.post_request(self.rest_endpoints.role_user(role, user)) def remove_role_from_user(self, user: str, role: str) -> Response: """ Remove a role from a GeoServer user """ - return self.delete_request(f"/rest/security/roles/role/{role}/user/{user}") + return self.delete_request(self.rest_endpoints.role_user(role, user)) def create_acl_admin_rule( self, @@ -749,9 +788,8 @@ def create_acl_admin_rule( """ Create a GeoServer ACL admin rule """ - path = "/acl/api/adminrules" return self.post_request( - path, + self.acl_endpoints.adminrules(), json={ "priority": priority, "access": access, @@ -765,22 +803,19 @@ def delete_acl_admin_rule(self, id: int) -> Response: """ Delete a GeoServer ACL admin rule by id """ - path = f"/acl/api/adminrules/id/{id}" - return self.delete_request(path) + return self.delete_request(self.acl_endpoints.adminrule(id)) def delete_all_acl_admin_rules(self) -> Response: """ Delete all existing GeoServer ACL admin rules """ - path = "/acl/api/adminrules" - return self.delete_request(path) + return self.delete_request(self.acl_endpoints.adminrules()) def get_acl_rules(self) -> dict[str, Any]: """ Return all GeoServer ACL data rules """ - path = "/acl/api/rules" - response = self.get_request(path) + response = self.get_request(self.acl_endpoints.rules()) return response.json() def create_acl_rules_for_requests( @@ -822,7 +857,6 @@ def create_acl_rule( """ Create a GeoServer ACL data rule """ - path = "/acl/api/rules" json = {"priority": priority, "access": access} if role: json["role"] = role @@ -834,31 +868,20 @@ def create_acl_rule( json["request"] = request if workspace: json["workspace"] = workspace - return self.post_request(path, json=json) + return self.post_request(self.acl_endpoints.rules(), json=json) def delete_all_acl_rules(self) -> Response: """ Delete all existing GeoServer ACL data rules """ - path = "/acl/api/rules" - return self.delete_request(path) - - def create_or_update_resource(self, path, resource_path, payload) -> Response: - """ - Create a GeoServer resource or update it if it already exists - """ - if not self.resource_exists(resource_path): - return self.post_request(path, json=payload) - else: - return self.put_request(resource_path, json=payload) + return self.delete_request(self.acl_endpoints.rules()) def create_gridset(self, epsg: int) -> Response | None: """ Create a gridset for GeoWebCache for a given projection Supported EPSG codes are 2056, 21781 and 3857 """ - resource_path: str = f"/gwc/rest/gridsets/EPSG:{epsg}.xml" - if self.resource_exists(resource_path): + if self.resource_exists(self.gwc_endpoints.gridset(epsg)): return None file_path: Path = Path(__file__).parent / "gridsets" / f"{epsg}.xml" headers: dict[str, str] = {"Content-Type": "application/xml"} @@ -866,7 +889,18 @@ def create_gridset(self, epsg: int) -> Response | None: data: bytes = file_path.read_bytes() except FileNotFoundError: raise ValueError(f"No gridset definition found for EPSG:{epsg}") - return self.put_request(resource_path, data=data, headers=headers) + return self.put_request( + self.gwc_endpoints.gridset(epsg), data=data, headers=headers + ) + + def create_or_update_resource(self, path, resource_path, payload) -> Response: + """ + Create a GeoServer resource or update it if it already exists + """ + if not self.resource_exists(resource_path): + return self.post_request(path, json=payload) + else: + return self.put_request(resource_path, json=payload) def resource_exists(self, path: str) -> bool: """ diff --git a/geoservercloud/services/__init__.py b/geoservercloud/services/__init__.py index 62d1db2..d885ea3 100644 --- a/geoservercloud/services/__init__.py +++ b/geoservercloud/services/__init__.py @@ -1,3 +1,10 @@ +from .endpoints import AclEndpoints, GwcEndpoints, OwsEndpoints, RestEndpoints from .restservice import RestService -__all__ = ["RestService"] +__all__ = [ + "RestService", + "AclEndpoints", + "OwsEndpoints", + "GwcEndpoints", + "RestEndpoints", +] diff --git a/geoservercloud/services/endpoints.py b/geoservercloud/services/endpoints.py new file mode 100644 index 0000000..4150fd5 --- /dev/null +++ b/geoservercloud/services/endpoints.py @@ -0,0 +1,174 @@ +class AclEndpoints: + def __init__(self, base_url: str = "/acl") -> None: + self.base_url: str = base_url + + def adminrules(self) -> str: + return f"{self.base_url}/api/adminrules" + + def adminrule(self, id: str) -> str: + return f"{self.base_url}/api/adminrules/id/{id}" + + def rules(self) -> str: + return f"{self.base_url}/api/rules" + + +class GwcEndpoints: + def __init__(self, base_url: str = "/gwc/rest") -> None: + self.base_url: str = base_url + + def reload(self) -> str: + return f"{self.base_url}/reload" + + def layers(self, workspace_name: str) -> str: + return f"{self.base_url}/layers.json" + + def layer(self, workspace_name: str, layer_name: str) -> str: + return f"{self.base_url}/layers/{workspace_name}:{layer_name}.json" + + def gridsets(self) -> str: + return f"{self.base_url}/gridsets.json" + + def gridset(self, epsg: int) -> str: + return f"{self.base_url}/gridsets/EPSG:{str(epsg)}.xml" + + +class OwsEndpoints: + def __init__(self, base_url: str = "") -> None: + self.base_url: str = base_url + + def ows(self) -> str: + return f"{self.base_url}/ows" + + def wms(self) -> str: + return f"{self.base_url}/wms" + + def wfs(self) -> str: + return f"{self.base_url}/wfs" + + def wcs(self) -> str: + return f"{self.base_url}/wcs" + + def wmts(self) -> str: + return f"{self.base_url}/gwc/service/wmts" + + def workspace_ows(self, workspace_name: str) -> str: + return f"{self.base_url}/{workspace_name}/ows" + + def workspace_wms(self, workspace_name: str) -> str: + return f"{self.base_url}/{workspace_name}/wms" + + def workspace_wfs(self, workspace_name: str) -> str: + return f"{self.base_url}/{workspace_name}/wfs" + + def workspace_wcs(self, workspace_name: str) -> str: + return f"{self.base_url}/{workspace_name}/wcs" + + +class RestEndpoints: + def __init__(self, base_url: str = "/rest") -> None: + self.base_url: str = base_url + + def styles(self) -> str: + return f"{self.base_url}/styles.json" + + def style(self, style_name: str) -> str: + return f"{self.base_url}/styles/{style_name}.json" + + def workspaces(self) -> str: + return f"{self.base_url}/workspaces.json" + + def workspace(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}.json" + + def workspace_styles(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/styles.json" + + def workspace_style(self, workspace_name: str, style_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/styles/{style_name}.json" + + def workspace_layer(self, workspace_name: str, layer_name: str) -> str: + return f"{self.base_url}/layers/{workspace_name}:{layer_name}.json" + + def workspace_wms_settings(self, workspace_name: str) -> str: + return f"{self.base_url}/services/wms/workspaces/{workspace_name}/settings.json" + + def workspace_wfs_settings(self, workspace_name: str) -> str: + return f"{self.base_url}/services/wfs/workspaces/{workspace_name}/settings.json" + + def datastores(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/datastores.json" + + def datastore(self, workspace_name: str, datastore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}.json" + + def featuretypes(self, workspace_name: str, datastore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}/featuretypes.json" + + def featuretype( + self, workspace_name: str, datastore_name: str, featuretype_name: str + ) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}/featuretypes/{featuretype_name}.json" + + def layergroup(self, workspace_name: str, layergroup_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/layergroups/{layergroup_name}.json" + + def layergroups(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/layergroups.json" + + def coveragestores(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/coveragestores.json" + + def coveragestore(self, workspace_name: str, coveragestore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}.json" + + def coverages(self, workspace_name: str, coveragestore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}/coverages.json" + + def coverage( + self, workspace_name: str, coveragestore_name: str, coverage_name: str + ) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}/coverages/{coverage_name}.json" + + def wmsstores(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmsstores.json" + + def wmsstore(self, workspace_name: str, wmsstore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmsstores/{wmsstore_name}.json" + + def wmtsstores(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores.json" + + def wmtsstore(self, workspace_name: str, wmtsstore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}.json" + + def wmtslayers(self, workspace_name: str, wmtsstore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}/layers.json" + + def wmtslayer( + self, workspace_name: str, wmtsstore_name: str, wmtslayer_name: str + ) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}/layers/{wmtslayer_name}.json" + + def namespaces(self) -> str: + return f"{self.base_url}/namespaces.json" + + def namespace(self, namespace_name: str) -> str: + return f"{self.base_url}/namespaces/{namespace_name}.json" + + def users(self) -> str: + return f"{self.base_url}/security/usergroup/users.json" + + def user(self, username: str) -> str: + return f"{self.base_url}/security/usergroup/user/{username}.json" + + def roles(self) -> str: + return f"{self.base_url}/security/roles.json" + + def user_roles(self, username: str) -> str: + return f"{self.base_url}/security/roles/user/{username}.json" + + def role(self, role_name: str) -> str: + return f"{self.base_url}/security/roles/role/{role_name}.json" + + def role_user(self, role_name: str, username: str) -> str: + return f"{self.base_url}/security/roles/role/{role_name}/user/{username}.json" diff --git a/tests/test_role.py b/tests/test_role.py index 06c1808..9633655 100644 --- a/tests/test_role.py +++ b/tests/test_role.py @@ -7,7 +7,7 @@ def test_create_role(geoserver: GeoServerCloud) -> None: role = "test_role" with responses.RequestsMock() as rsps: rsps.post( - url=f"{geoserver.url}/rest/security/roles/role/{role}", + url=f"{geoserver.url}/rest/security/roles/role/{role}.json", status=201, ) @@ -20,10 +20,12 @@ def test_create_role_if_not_exists_case_true(geoserver: GeoServerCloud) -> None: role = "test_role" with responses.RequestsMock() as rsps: rsps.get( - url=f"{geoserver.url}/rest/security/roles", status=200, json={"roles": []} + url=f"{geoserver.url}/rest/security/roles.json", + status=200, + json={"roles": []}, ) rsps.post( - url=f"{geoserver.url}/rest/security/roles/role/{role}", + url=f"{geoserver.url}/rest/security/roles/role/{role}.json", status=201, ) @@ -36,7 +38,7 @@ def test_create_role_if_not_exists_case_false(geoserver: GeoServerCloud) -> None role = "test_role" with responses.RequestsMock() as rsps: rsps.get( - url=f"{geoserver.url}/rest/security/roles", + url=f"{geoserver.url}/rest/security/roles.json", status=200, json={"roles": [role]}, ) @@ -49,7 +51,7 @@ def test_delete_role(geoserver: GeoServerCloud) -> None: role = "test_role" with responses.RequestsMock() as rsps: rsps.delete( - url=f"{geoserver.url}/rest/security/roles/role/{role}", + url=f"{geoserver.url}/rest/security/roles/role/{role}.json", status=200, ) @@ -78,7 +80,7 @@ def test_assign_role_to_user(geoserver: GeoServerCloud) -> None: role = "test_role" with responses.RequestsMock() as rsps: rsps.post( - url=f"{geoserver.url}/rest/security/roles/role/{role}/user/{user}", + url=f"{geoserver.url}/rest/security/roles/role/{role}/user/{user}.json", status=200, ) @@ -92,7 +94,7 @@ def test_remove_role_from_user(geoserver: GeoServerCloud) -> None: role = "test_role" with responses.RequestsMock() as rsps: rsps.delete( - url=f"{geoserver.url}/rest/security/roles/role/{role}/user/{user}", + url=f"{geoserver.url}/rest/security/roles/role/{role}/user/{user}.json", status=200, ) diff --git a/tests/test_style.py b/tests/test_style.py index 482faf3..20025fd 100644 --- a/tests/test_style.py +++ b/tests/test_style.py @@ -17,7 +17,7 @@ def test_create_style(geoserver: GeoServerCloud) -> None: status=404, ) rsps.post( - url=f"{GEOSERVER_URL}/rest/styles", + url=f"{GEOSERVER_URL}/rest/styles.json", status=201, ) @@ -57,7 +57,7 @@ def test_create_style_zip(geoserver: GeoServerCloud) -> None: status=404, ) rsps.post( - url=f"{GEOSERVER_URL}/rest/styles", + url=f"{GEOSERVER_URL}/rest/styles.json", status=201, ) diff --git a/tests/test_user.py b/tests/test_user.py index 1d833da..c7b12ab 100644 --- a/tests/test_user.py +++ b/tests/test_user.py @@ -9,7 +9,7 @@ def test_create_user(geoserver: GeoServerCloud) -> None: with responses.RequestsMock() as rsps: rsps.post( - url=f"{geoserver.url}/rest/security/usergroup/users", + url=f"{geoserver.url}/rest/security/usergroup/users.json", status=201, match=[ responses.matchers.json_params_matcher( @@ -30,7 +30,7 @@ def test_create_user(geoserver: GeoServerCloud) -> None: def test_update_user_password(geoserver: GeoServerCloud) -> None: with responses.RequestsMock() as rsps: rsps.post( - url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}", + url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}.json", status=200, match=[ responses.matchers.json_params_matcher( @@ -49,7 +49,7 @@ def test_update_user_password(geoserver: GeoServerCloud) -> None: def test_update_user_enabled(geoserver: GeoServerCloud) -> None: with responses.RequestsMock() as rsps: rsps.post( - url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}", + url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}.json", status=200, match=[ responses.matchers.json_params_matcher( @@ -68,7 +68,8 @@ def test_update_user_enabled(geoserver: GeoServerCloud) -> None: def test_delete_user(geoserver: GeoServerCloud) -> None: with responses.RequestsMock() as rsps: rsps.delete( - url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}", status=200 + url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}.json", + status=200, ) response = geoserver.delete_user(TEST_USER) assert response.status_code == 200