From 1217f94f632da49c4c374d202b6efa69517d06ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9cile=20Vuilleumier?= Date: Wed, 16 Oct 2024 11:20:53 +0200 Subject: [PATCH] Add new service layer and use it for workspace, datastore, WMS, WFS, WMTS, user, role (WIP) --- geoservercloud/geoservercloud.py | 578 ++++++------------------ geoservercloud/models/__init__.py | 5 +- geoservercloud/models/common.py | 30 +- geoservercloud/models/datastore.py | 100 +++-- geoservercloud/models/datastores.py | 39 +- geoservercloud/models/featuretype.py | 6 +- geoservercloud/models/featuretypes.py | 5 +- geoservercloud/models/style.py | 4 +- geoservercloud/models/styles.py | 5 +- geoservercloud/models/workspace.py | 34 +- geoservercloud/models/workspaces.py | 40 +- geoservercloud/services/__init__.py | 7 +- geoservercloud/services/endpoints.py | 174 -------- geoservercloud/services/owsservice.py | 214 +++++++++ geoservercloud/services/restclient.py | 98 +++++ geoservercloud/services/restservice.py | 581 +++++++++++++++++++++---- geoservercloud/templates.py | 1 - tests/models/test_common.py | 5 +- tests/models/test_datastore.py | 96 ++-- tests/models/test_datastores.py | 55 +-- tests/models/test_workspace.py | 13 +- tests/models/test_workspaces.py | 34 +- tests/resources/test_resource.py | 43 -- tests/test_cascaded_wmts.py | 29 +- tests/test_datastore.py | 122 ++++-- tests/test_feature_type.py | 52 +++ tests/test_gwc.py | 21 +- tests/test_role.py | 35 +- tests/test_user.py | 24 +- tests/test_wms.py | 3 +- tests/test_workspace.py | 121 +++-- 31 files changed, 1476 insertions(+), 1098 deletions(-) delete mode 100644 geoservercloud/services/endpoints.py create mode 100644 geoservercloud/services/owsservice.py create mode 100644 geoservercloud/services/restclient.py delete mode 100644 tests/resources/test_resource.py diff --git a/geoservercloud/geoservercloud.py b/geoservercloud/geoservercloud.py index 2b81c1c..d0c43d4 100644 --- a/geoservercloud/geoservercloud.py +++ b/geoservercloud/geoservercloud.py @@ -3,7 +3,6 @@ from pathlib import Path from typing import Any -import xmltodict from owslib.map.wms130 import WebMapService_1_3_0 from owslib.util import ResponseWrapper from owslib.wmts import WebMapTileService @@ -11,27 +10,31 @@ from geoservercloud import utils from geoservercloud.models import ( - DataStores, FeatureType, FeatureTypes, - KeyDollarListDict, PostGisDataStore, Style, Styles, Workspace, - Workspaces, -) -from geoservercloud.services import ( - AclEndpoints, - GwcEndpoints, - OwsEndpoints, - RestEndpoints, - RestService, ) +from geoservercloud.services import OwsService, RestService from geoservercloud.templates import Templates class GeoServerCloud: + """ + Facade class allowing CRUD operations on GeoServer resources + + Attributes + ---------- + url : str + base GeoServer URL + user : str + GeoServer username + password : str + GeoServer password + """ + def __init__( self, url: str = "http://localhost:9090/geoserver/cloud", @@ -44,120 +47,98 @@ def __init__( self.password: str = password self.auth: tuple[str, str] = (user, password) self.rest_service: RestService = RestService(url, self.auth) - self.acl_endpoints: AclEndpoints = AclEndpoints() - self.gwc_endpoints: GwcEndpoints = GwcEndpoints() - self.ows_endpoints: OwsEndpoints = OwsEndpoints() - self.rest_endpoints: RestEndpoints = RestEndpoints() + self.ows_service: OwsService = OwsService(url, self.auth) self.wms: WebMapService_1_3_0 | None = None self.wmts: WebMapTileService | None = None self.default_workspace: str | None = None self.default_datastore: str | None = None - @staticmethod - def get_wmts_layer_bbox( - url: str, layer_name: str - ) -> tuple[float, float, float, float] | None: - wmts = WebMapTileService(url) - try: - return wmts[layer_name].boundingBoxWGS84 - except (KeyError, AttributeError): - return None - def create_wms(self) -> None: if self.default_workspace: - path: str = self.ows_endpoints.workspace_wms(self.default_workspace) + self.wms = self.ows_service.create_wms(self.default_workspace) else: - path = self.ows_endpoints.wms() - self.wms = WebMapService_1_3_0( - f"{self.url}{path}", - username=self.user, - password=self.password, - timeout=240, - ) + self.wms = self.ows_service.create_wms() def create_wmts(self) -> None: - path = self.ows_endpoints.wmts() - self.wmts = WebMapTileService( - f"{self.url}{path}", - version="1.0.0", - username=self.user, - password=self.password, - ) + self.wmts = self.ows_service.create_wmts() - def get_workspaces(self) -> Workspaces: - response: Response = self.get_request(self.rest_endpoints.workspaces()) - workspaces = Workspaces.from_dict(response.json()) - return workspaces + def get_workspaces(self) -> tuple[list[dict[str, str]] | str, int]: + """ + Get all GeoServer workspaces + """ + workspaces, status_code = self.rest_service.get_workspaces() + if isinstance(workspaces, str): + return workspaces, status_code + return workspaces.aslist(), status_code + + def get_workspace(self, workspace_name: str) -> tuple[dict[str, str] | str, int]: + """ + Get a workspace by name + """ + workspace, status_code = self.rest_service.get_workspace(workspace_name) + if isinstance(workspace, str): + return workspace, status_code + return workspace.asdict(), status_code def create_workspace( self, workspace_name: str, isolated: bool = False, set_default_workspace: bool = False, - ) -> Response: + ) -> tuple[str, int]: """ Create a workspace in GeoServer, if it does not already exist. It if exists, update it """ - response: Response = self.post_request( - self.rest_endpoints.workspaces(), - json=Workspace(workspace_name, isolated).post_payload(), - ) - if response.status_code == 409: - response = self.put_request( - self.rest_endpoints.workspace(workspace_name), - json=Workspace(workspace_name, isolated).put_payload(), - ) + workspace = Workspace(workspace_name, isolated) + content, status_code = self.rest_service.create_workspace(workspace) if set_default_workspace: self.default_workspace = workspace_name - return response + return content, status_code - def delete_workspace(self, workspace_name: str) -> Response: + def delete_workspace(self, workspace_name: str) -> tuple[str, int]: """ Delete a GeoServer workspace (recursively) """ - response: Response = self.delete_request( - self.rest_endpoints.workspace(workspace_name), params={"recurse": "true"} + content, status_code = self.rest_service.delete_workspace( + Workspace(workspace_name) ) if self.default_workspace == workspace_name: self.default_workspace = None self.wms = None self.wmts = None - return response + return content, status_code def recreate_workspace( - self, workspace_name: str, set_default_workspace: bool = False - ) -> Response: + self, + workspace_name: str, + isolated: bool = False, + set_default_workspace: bool = False, + ) -> tuple[str, int]: """ Create a workspace in GeoServer, and first delete it if it already exists. """ self.delete_workspace(workspace_name) return self.create_workspace( - workspace_name, set_default_workspace=set_default_workspace + workspace_name, + isolated=isolated, + set_default_workspace=set_default_workspace, ) - def publish_workspace(self, workspace_name) -> Response: + def publish_workspace(self, workspace_name) -> tuple[str, int]: """ Publish the WMS service for a given workspace """ - data: dict[str, dict[str, Any]] = Templates.workspace_wms(workspace_name) - return self.put_request( - self.rest_endpoints.workspace_wms_settings(workspace_name), json=data - ) + return self.rest_service.publish_workspace(Workspace(workspace_name)) def set_default_locale_for_service( self, workspace_name: str, locale: str | None - ) -> Response: + ) -> None: """ Set a default language for localized WMS requests """ - data: dict[str, dict[str, Any]] = { - "wms": { - "defaultLocale": locale, - } - } - return self.put_request( - self.rest_endpoints.workspace_wms_settings(workspace_name), json=data + self.rest_service.set_default_locale_for_service( + Workspace(workspace_name), locale ) def unset_default_locale_for_service(self, workspace_name) -> None: @@ -166,26 +147,29 @@ def unset_default_locale_for_service(self, workspace_name) -> None: """ self.set_default_locale_for_service(workspace_name, None) - def get_datastores(self, workspace_name: str) -> dict[str, Any]: + def get_datastores( + self, workspace_name: str + ) -> tuple[list[dict[str, str]] | str, int]: """ Get all datastores for a given workspace """ - response = self.get_request(self.rest_endpoints.datastores(workspace_name)) - return DataStores.from_dict(response.json()).datastores + datastores, status_code = self.rest_service.get_datastores(workspace_name) + if isinstance(datastores, str): + return datastores, status_code + return datastores.aslist(), status_code - def get_postgis_datastore( + def get_pg_datastore( self, workspace_name: str, datastore_name: str - ) -> dict[str, Any] | None: + ) -> tuple[dict[str, Any] | str, int]: """ - Get a specific datastore + Get a datastore by workspace and name """ - response = self.get_request( - self.rest_endpoints.datastore(workspace_name, datastore_name) + datastore, status_code = self.rest_service.get_pg_datastore( + workspace_name, datastore_name ) - if response.status_code == 404: - return None - else: - return PostGisDataStore.from_dict(response.json()) + if isinstance(datastore, str): + return datastore, status_code + return datastore.asdict(), status_code def create_pg_datastore( self, @@ -199,11 +183,10 @@ def create_pg_datastore( pg_schema: str = "public", description: str | None = None, set_default_datastore: bool = False, - ) -> Response | None: + ) -> tuple[str, int]: """ Create a PostGIS datastore from the DB connection parameters, or update it if it already exist. """ - response: None | Response = None datastore = PostGisDataStore( workspace_name, datastore_name, @@ -218,27 +201,17 @@ def create_pg_datastore( "namespace": f"http://{workspace_name}", "Expose primary keys": "true", }, - data_store_type="PostGIS", + type="PostGIS", description=description, ) - payload = datastore.put_payload() - - if not self.resource_exists( - self.rest_endpoints.datastore(workspace_name, datastore_name) - ): - response = self.post_request( - self.rest_endpoints.datastores(workspace_name), json=payload - ) - else: - response = self.put_request( - self.rest_endpoints.datastore(workspace_name, datastore_name), - json=payload, - ) + content, status_code = self.rest_service.create_pg_datastore( + workspace_name, datastore + ) if set_default_datastore: self.default_datastore = datastore_name - return response + return content, status_code def create_jndi_datastore( self, @@ -248,11 +221,10 @@ def create_jndi_datastore( pg_schema: str = "public", description: str | None = None, set_default_datastore: bool = False, - ) -> Response | None: + ) -> tuple[str, int]: """ Create a PostGIS datastore from JNDI resource, or update it if it already exist. """ - response: None | Response = None datastore = PostGisDataStore( workspace_name, datastore_name, @@ -263,47 +235,28 @@ def create_jndi_datastore( "namespace": f"http://{workspace_name}", "Expose primary keys": "true", }, - data_store_type="PostGIS (JNDI)", + type="PostGIS (JNDI)", description=description, ) - payload = datastore.put_payload() - if not self.resource_exists( - self.rest_endpoints.datastore(workspace_name, datastore_name) - ): - response = self.post_request( - self.rest_endpoints.datastores(workspace_name), json=payload - ) - else: - response = self.put_request( - self.rest_endpoints.datastore(workspace_name, datastore_name), - json=payload, - ) + content, code = self.rest_service.create_jndi_datastore( + workspace_name, datastore + ) if set_default_datastore: self.default_datastore = datastore_name - return response + return content, code def create_wmts_store( self, workspace_name: str, name: str, capabilities: str, - ) -> Response: + ) -> tuple[str, int]: """ Create a cascaded WMTS store, or update it if it already exist. """ - payload = Templates.wmts_store(workspace_name, name, capabilities) - if not self.resource_exists( - self.rest_endpoints.wmtsstore(workspace_name, name) - ): - return self.post_request( - self.rest_endpoints.wmtsstores(workspace_name), json=payload - ) - else: - return self.put_request( - self.rest_endpoints.wmtsstore(workspace_name, name), json=payload - ) + return self.rest_service.create_wmts_store(workspace_name, name, capabilities) # TODO: add a test for this method def get_feature_types( @@ -340,7 +293,7 @@ def create_feature_type( abstract: str | dict = "Default abstract", attributes: dict = Templates.geom_point_attribute(), # TODO: remove default value, because if should be None epsg: int = 4326, - ) -> Response: + ) -> tuple[str, int]: """ Create a feature type or update it if it already exist. """ @@ -350,35 +303,9 @@ def create_feature_type( datastore = datastore or self.default_datastore if not datastore: raise ValueError("Datastore not provided") - # TODO: use FeatureType.post_payload() - payload: dict[str, dict[str, Any]] = Templates.feature_type( - layer=layer, - workspace=workspace_name, - datastore=datastore, - attributes=utils.convert_attributes(attributes), - epsg=epsg, + return self.rest_service.create_feature_type( + layer, workspace_name, datastore, title, abstract, attributes, epsg ) - if type(title) is dict: - payload["featureType"]["internationalTitle"] = title - else: - payload["featureType"]["title"] = title - if type(abstract) is dict: - payload["featureType"]["internationalAbstract"] = abstract - else: - payload["featureType"]["abstract"] = abstract - - if not self.resource_exists( - self.rest_endpoints.featuretype(workspace_name, datastore, layer) - ): - return self.post_request( - self.rest_endpoints.featuretypes(workspace_name, datastore), - json=payload, - ) - else: - return self.put_request( - self.rest_endpoints.featuretype(workspace_name, datastore, layer), - json=payload, - ) def create_layer_group( self, @@ -425,65 +352,31 @@ def create_wmts_layer( epsg: int = 4326, international_title: dict[str, str] | None = None, international_abstract: dict[str, str] | None = None, - ) -> Response: + ) -> tuple[str, int]: """ Publish a remote WMTS layer (first delete it if it already exists) """ if not published_layer: published_layer = native_layer - if self.resource_exists( - self.rest_endpoints.wmtslayer(workspace_name, wmts_store, published_layer) - ): - self.delete_request( - self.rest_endpoints.wmtslayer( - workspace_name, wmts_store, published_layer - ), - params={"recurse": "true"}, - ) - capabilities_url = ( - self.get_request(self.rest_endpoints.wmtsstore(workspace_name, wmts_store)) - .json() - .get("wmtsStore") - .get("capabilitiesURL") - ) - wgs84_bbox = self.get_wmts_layer_bbox(capabilities_url, native_layer) - - payload = Templates.wmts_layer( - published_layer, + return self.rest_service.create_wmts_layer( + workspace_name, + wmts_store, native_layer, - wgs84_bbox=wgs84_bbox, - epsg=epsg, - international_title=international_title, - international_abstract=international_abstract, - ) - - return self.post_request( - self.rest_endpoints.wmtslayers(workspace_name, wmts_store), json=payload + published_layer, + epsg, + international_title, + international_abstract, ) - def get_gwc_layer(self, workspace_name: str, layer: str) -> dict[str, Any] | None: - response = self.get_request(self.gwc_endpoints.layer(workspace_name, layer)) - if response.status_code == 404: - return None - return response.json() + def get_gwc_layer( + self, workspace_name: str, layer: str + ) -> tuple[dict[str, Any] | str, int]: + return self.rest_service.get_gwc_layer(workspace_name, layer) def publish_gwc_layer( self, workspace_name: str, layer: str, epsg: int = 4326 - ) -> Response | None: - # Reload config to make sure GWC is aware of GeoServer layers - self.post_request( - self.gwc_endpoints.reload(), - headers={"Content-Type": "application/json"}, - data="reload_configuration=1", # type: ignore - ) - # Do not re-publish an existing layer - if self.get_gwc_layer(workspace_name, layer): - return None - payload = Templates.gwc_layer(workspace_name, layer, f"EPSG:{epsg}") - return self.put_request( - self.gwc_endpoints.layer(workspace_name, layer), - json=payload, - ) + ) -> tuple[str, int]: + return self.rest_service.publish_gwc_layer(workspace_name, layer, epsg) def get_styles(self, workspace_name: str | None = None) -> dict[str, Any]: """ @@ -554,48 +447,13 @@ def set_default_layer_style( self.rest_endpoints.workspace_layer(workspace_name, layer), json=data ) - def get_wms_capabilities( - self, workspace_name: str, accept_languages=None - ) -> dict[str, Any]: - path: str = self.ows_endpoints.workspace_wms(workspace_name) - params: dict[str, str] = { - "service": "WMS", - "version": "1.3.0", - "request": "GetCapabilities", - } - if accept_languages: - params["AcceptLanguages"] = accept_languages - response: Response = self.get_request(path, params=params) - return xmltodict.parse(response.content) - def get_wms_layers( self, workspace_name: str, accept_languages: str | None = None ) -> Any | dict[str, Any]: - capabilities: dict[str, Any] = self.get_wms_capabilities( - workspace_name, accept_languages - ) - try: - return capabilities["WMS_Capabilities"]["Capability"]["Layer"] - except KeyError: - return capabilities - - def get_wfs_capabilities(self, workspace_name: str) -> dict[str, Any]: - params: dict[str, str] = { - "service": "WFS", - "version": "1.1.0", - "request": "GetCapabilities", - } - response: Response = self.get_request( - self.ows_endpoints.workspace_wfs(workspace_name), params=params - ) - return xmltodict.parse(response.content) + return self.ows_service.get_wms_layers(workspace_name, accept_languages) def get_wfs_layers(self, workspace_name: str) -> Any | dict[str, Any]: - capabilities: dict[str, Any] = self.get_wfs_capabilities(workspace_name) - try: - return capabilities["wfs:WFS_Capabilities"]["FeatureTypeList"] - except KeyError: - return capabilities + return self.ows_service.get_wfs_layers(workspace_name) def get_map( self, @@ -674,24 +532,9 @@ def get_legend_graphic( """ WMS GetLegendGraphic request """ - path: str - if not workspace_name: - path = self.ows_endpoints.wms() - else: - path = self.ows_endpoints.workspace_wms(workspace_name) - params: dict[str, Any] = { - "service": "WMS", - "version": "1.3.0", - "request": "GetLegendGraphic", - "format": format, - "layer": layer, - } - if language: - params["language"] = language - if style: - params["style"] = style - response: Response = self.get_request(path, params=params) - return response + return self.ows_service.get_legend_graphic( + layer, format, language, style, workspace_name + ) def get_tile( self, layer, format, tile_matrix_set, tile_matrix, row, column @@ -719,159 +562,78 @@ def get_feature( feature_id: int | None = None, max_feature: int | None = None, format: str = "application/json", - ) -> dict[str, Any] | bytes: + ) -> dict[str, Any] | str: """WFS GetFeature request - Return the feature(s) as dict if found, otherwise return the raw response content as bytes + Return the feature(s) as dict if found, otherwise return the response content as string """ # FIXME: we should consider also the global wfs endpoint - path = self.ows_endpoints.workspace_wfs(workspace_name) - params = { - "service": "WFS", - "version": "1.1.0", - "request": "GetFeature", - "typeName": type_name, - "outputFormat": format, - } - if feature_id: - params["featureID"] = str(feature_id) - if max_feature: - params["maxFeatures"] = str(max_feature) - response = self.get_request(path, params=params) - try: - return response.json() - except JSONDecodeError: - return response.content + return self.ows_service.get_feature( + workspace_name, type_name, feature_id, max_feature, format + ) def describe_feature_type( self, workspace_name: str | None = None, type_name: str | None = None, format: str = "application/json", - ) -> dict[str, Any] | bytes: + ) -> dict[str, Any] | str: """WFS DescribeFeatureType request - Return the feature type(s) as dict if found, otherwise return the raw response content as bytes + Return the feature type(s) as dict if found, otherwise return the response content as string """ - if not workspace_name: - path = self.ows_endpoints.wfs() - else: - path = self.ows_endpoints.workspace_wfs(workspace_name) - params = { - "service": "WFS", - "version": "1.1.0", - "request": "DescribeFeatureType", - "outputFormat": format, - } - if type_name: - params["typeName"] = type_name - response = self.get_request(path, params=params) - try: - return response.json() - except JSONDecodeError: - return response.content + return self.ows_service.describe_feature_type(workspace_name, type_name, format) def get_property_value( self, workspace_name: str, type_name: str, property: str, - ) -> dict | list | bytes: + ) -> dict | list | str: """WFS GetPropertyValue request - Return the properties as dict (if one feature was found), a list (if multiple features were found) - or an empty dict if no feature was found. Otherwise throw a requests.exceptions.HTTPError + Return the properties as dict (if one feature was found), a list (if multiple features were found), + an empty dict if no feature was found or the response content as string """ # FIXME: we should consider also the global wfs endpoint - path = self.ows_endpoints.workspace_wfs(workspace_name) - params = { - "service": "WFS", - "version": "2.0.0", - "request": "GetPropertyValue", - "typeNames": type_name, - "valueReference": property, - } - response = self.get_request(path, params=params) - value_collection = xmltodict.parse(response.content).get("wfs:ValueCollection") - if not value_collection: - return response.content - else: - return value_collection.get("wfs:member", {}) + return self.ows_service.get_property_value(workspace_name, type_name, property) - def create_user(self, user: str, password: str, enabled: bool = True) -> Response: + def create_user( + self, user: str, password: str, enabled: bool = True + ) -> tuple[str, int]: """ Create a GeoServer user """ - headers: dict[str, str] = {"Content-Type": "application/json"} - payload: dict[str, dict[str, Any]] = { - "user": { - "userName": user, - "password": password, - "enabled": enabled, - } - } - return self.post_request( - self.rest_endpoints.users(), json=payload, headers=headers - ) + return self.rest_service.create_user(user, password, enabled) def update_user( self, user: str, password: str | None = None, enabled: bool | None = None - ) -> Response: + ) -> tuple[str, int]: """ Update a GeoServer user """ - headers: dict[str, str] = {"Content-Type": "application/json"} - payload: dict[str, dict[str, Any]] = {"user": {}} - if password: - payload["user"]["password"] = password - if enabled is not None: - payload["user"]["enabled"] = enabled - return self.post_request( - self.rest_endpoints.user(user), json=payload, headers=headers - ) + return self.rest_service.update_user(user, password, enabled) - def delete_user(self, user: str) -> Response: + def delete_user(self, user: str) -> tuple[str, int]: """ Delete a GeoServer user """ - return self.delete_request(self.rest_endpoints.user(user)) + return self.rest_service.delete_user(user) - def create_role(self, role_name: str) -> Response: + def create_role(self, role_name: str) -> tuple[str, int]: """ - Create a GeoServer role + Create a GeoServer role if it does not already exist """ - return self.post_request(self.rest_endpoints.role(role_name)) + return self.rest_service.create_role_if_not_exists(role_name) - def delete_role(self, role_name: str) -> Response: + def delete_role(self, role_name: str) -> tuple[str, int]: """ Delete a GeoServer role """ - return self.delete_request(self.rest_endpoints.role(role_name)) + return self.rest_service.delete_role(role_name) - def create_role_if_not_exists(self, role_name: str) -> Response | None: - """ - Create a GeoServer role if it does not yet exist - """ - if self.role_exists(role_name): - return None - return self.create_role(role_name) - - def role_exists(self, role_name: str) -> bool: - """ - Check if a GeoServer role exists - """ - response = self.get_request( - self.rest_endpoints.roles(), headers={"Accept": "application/json"} - ) - roles = response.json().get("roles", []) - return role_name in roles - - def get_user_roles(self, user: str) -> list[str] | Response: + def get_user_roles(self, user: str) -> tuple[list[str] | str, int]: """ Get all roles assigned to a GeoServer user """ - response = self.get_request(self.rest_endpoints.user_roles(user)) - try: - return response.json().get("roles") - except JSONDecodeError: - return response + return self.rest_service.get_user_roles(user) def assign_role_to_user(self, user: str, role: str) -> Response: """ @@ -984,77 +746,9 @@ def delete_all_acl_rules(self) -> Response: """ return self.delete_request(self.acl_endpoints.rules()) - def create_gridset(self, epsg: int) -> Response | None: + def create_gridset(self, epsg: int) -> tuple[str, int]: """ Create a gridset for GeoWebCache for a given projection Supported EPSG codes are 2056, 21781 and 3857 """ - if self.resource_exists(self.gwc_endpoints.gridset(epsg)): - return None - file_path: Path = Path(__file__).parent / "gridsets" / f"{epsg}.xml" - headers: dict[str, str] = {"Content-Type": "application/xml"} - try: - data: bytes = file_path.read_bytes() - except FileNotFoundError: - raise ValueError(f"No gridset definition found for EPSG:{epsg}") - return self.put_request( - self.gwc_endpoints.gridset(epsg), data=data, headers=headers - ) - - def create_or_update_resource(self, path, resource_path, payload) -> Response: - """ - Create a GeoServer resource or update it if it already exists - """ - if not self.resource_exists(resource_path): - return self.post_request(path, json=payload) - else: - return self.put_request(resource_path, json=payload) - - def resource_exists(self, path: str) -> bool: - """ - Check if a resource (given its path) exists in GeoServer - """ - # GeoServer raises a 500 when posting to a datastore or feature type that already exists, so first do - # a get request - response = self.get_request(path) - return response.status_code == 200 - - def get_request( - self, - path, - params: dict[str, str] | None = None, - headers: dict[str, str] | None = None, - ) -> Response: - return self.rest_service.get(path, params=params, headers=headers) - - def post_request( - self, - path: str, - params: dict[str, str] | None = None, - headers: dict[str, str] | None = None, - json: dict[str, Any] | None = None, - data: bytes | None = None, - ) -> Response: - return self.rest_service.post( - path, params=params, headers=headers, json=json, data=data - ) - - def put_request( - self, - path: str, - params: dict[str, str] | None = None, - headers: dict[str, str] | None = None, - json: dict[str, dict[str, Any]] | None = None, - data: bytes | None = None, - ) -> Response: - return self.rest_service.put( - path, params=params, headers=headers, json=json, data=data - ) - - def delete_request( - self, - path: str, - params: dict[str, str] | None = None, - headers: dict[str, str] | None = None, - ) -> Response: - return self.rest_service.delete(path, params=params, headers=headers) + return self.rest_service.create_gridset(epsg) diff --git a/geoservercloud/models/__init__.py b/geoservercloud/models/__init__.py index c28cea9..12f9802 100644 --- a/geoservercloud/models/__init__.py +++ b/geoservercloud/models/__init__.py @@ -1,4 +1,4 @@ -from .common import I18N, KeyDollarListDict +from .common import BaseModel, EntityModel, ListModel, I18N, KeyDollarListDict from .datastore import PostGisDataStore from .datastores import DataStores from .featuretype import FeatureType @@ -9,8 +9,11 @@ from .workspaces import Workspaces __all__ = [ + "BaseModel", "DataStores", + "EntityModel", "KeyDollarListDict", + "ListModel", "FeatureType", "FeatureTypes", "I18N", diff --git a/geoservercloud/models/common.py b/geoservercloud/models/common.py index 3028ed7..ebada34 100644 --- a/geoservercloud/models/common.py +++ b/geoservercloud/models/common.py @@ -1,11 +1,34 @@ import json -import logging from typing import Any -log = logging.getLogger() + +class BaseModel: + @classmethod + def from_get_response_payload(cls, content: dict): + raise NotImplementedError + + +class EntityModel(BaseModel): + def asdict(self) -> dict[str, Any]: + raise NotImplementedError + + def post_payload(self) -> dict[str, Any]: + raise NotImplementedError + + def put_payload(self) -> dict[str, Any]: + raise NotImplementedError + + +class ListModel(BaseModel): + def aslist(self) -> list: + raise NotImplementedError class KeyDollarListDict(dict): + + key_prefix: str = "@key" + value_prefix: str = "$" + def __init__( self, input_list: list | None = None, @@ -14,13 +37,10 @@ def __init__( **kwargs ): super().__init__(*args, **kwargs) - self.key_prefix = "@key" - self.value_prefix = "$" if input_list: self.deserialize(input_list) if input_dict: self.update(input_dict) - log.debug(self) def deserialize(self, input_list: list): for item in input_list: diff --git a/geoservercloud/models/datastore.py b/geoservercloud/models/datastore.py index 107721a..0e8c36b 100644 --- a/geoservercloud/models/datastore.py +++ b/geoservercloud/models/datastore.py @@ -1,74 +1,78 @@ import json -import logging +from typing import Any -from requests.models import Response +from geoservercloud.models import EntityModel, KeyDollarListDict -from . import KeyDollarListDict - -log = logging.getLogger() - - -class PostGisDataStore: +class PostGisDataStore(EntityModel): def __init__( self, workspace_name: str, - data_store_name: str, + name: str, connection_parameters: dict, - data_store_type: str = "PostGIS", + type: str = "PostGIS", enabled: bool = True, description: str | None = None, + default: bool | None = None, + disable_on_conn_failure: bool | None = None, ) -> None: - self.workspace_name = workspace_name - self.data_store_name = data_store_name + self.workspace_name: str = workspace_name + self._name: str = name self.connection_parameters = KeyDollarListDict(input_dict=connection_parameters) - self.data_store_type = data_store_type - self.description = description - self.enabled = enabled + self.type: str = type + self.description: str | None = description + self.enabled: bool = enabled + self._default: bool | None = default + self.disable_on_conn_failure: bool | None = disable_on_conn_failure @property - def name(self): - return self.data_store_name + def name(self) -> str: + return self._name - def post_payload(self): - payload = { - "dataStore": { - "name": self.data_store_name, - "type": self.data_store_type, - "connectionParameters": { - "entry": self.connection_parameters.serialize() - }, - } + def asdict(self) -> dict[str, Any]: + content: dict[str, Any] = { + "name": self._name, + "type": self.type, + "connectionParameters": {"entry": dict(self.connection_parameters)}, + "workspace": self.workspace_name, } if self.description: - payload["dataStore"]["description"] = self.description + content["description"] = self.description if self.enabled: - payload["dataStore"]["enabled"] = self.enabled - return payload + content["enabled"] = self.enabled + if self._default is not None: + content["_default"] = self._default + if self.disable_on_conn_failure is not None: + content["disableOnConnFailure"] = self.disable_on_conn_failure + return content + + def post_payload(self) -> dict[str, Any]: + content = self.asdict() + content["connectionParameters"] = { + "entry": self.connection_parameters.serialize() + } + content["workspace"] = {"name": self.workspace_name} + return {"dataStore": content} - def put_payload(self): - payload = self.post_payload() - return payload + def put_payload(self) -> dict[str, Any]: + return self.post_payload() @classmethod - def from_dict(cls, content: dict): - connection_parameters = cls.parse_connection_parameters(content) + def from_get_response_payload(cls, content: dict): + data_store = content["dataStore"] + connection_parameters = KeyDollarListDict( + input_list=data_store["connectionParameters"]["entry"] + ) return cls( - content.get("dataStore", {}).get("workspace", {}).get("name", None), - content.get("dataStore", {}).get("name", None), + data_store["workspace"]["name"], + data_store["name"], connection_parameters, - content.get("dataStore", {}).get("type", "PostGIS"), - content.get("dataStore", {}).get("enabled", True), - content.get("dataStore", {}).get("description", None), - ) - - @classmethod - def parse_connection_parameters(cls, content): - return KeyDollarListDict( - content.get("dataStore", {}) - .get("connectionParameters", {}) - .get("entry", []) + data_store.get("type", "PostGIS"), + data_store.get("enabled", True), + data_store.get("description", None), + data_store.get("_default", None), + data_store.get("disableOnConnFailure", None), ) - def __repr__(self): + def __repr__(self) -> str: return json.dumps(self.put_payload(), indent=4) diff --git a/geoservercloud/models/datastores.py b/geoservercloud/models/datastores.py index fdd8811..b682e17 100644 --- a/geoservercloud/models/datastores.py +++ b/geoservercloud/models/datastores.py @@ -1,32 +1,19 @@ -import logging +from geoservercloud.models import ListModel -from requests.models import Response -log = logging.getLogger() - - -class DataStores: - - def __init__(self, workspace_name: str, datastores: list[str] = []) -> None: - self.workspace_name = workspace_name - self._datastores = datastores - - @property - def datastores(self): - return self._datastores +class DataStores(ListModel): + def __init__(self, datastores: list[dict[str, str]] = []) -> None: + self._datastores: list[dict[str, str]] = datastores @classmethod - def from_dict(cls, content: dict): - datastores = [] - workspace_name = ( - content.get("dataStores", {}).get("workspace", {}).get("name", None) - ) + def from_get_response_payload(cls, content: dict): + datastores: str | dict = content["dataStores"] + if not datastores: + return cls() + return cls(datastores["dataStore"]) # type: ignore - for store in content.get("dataStores", {}).get("dataStore", []): - datastores.append(store["name"]) - for data_store_name in datastores: - log.debug(f"Name: {data_store_name}") - return cls(workspace_name, datastores) + def __repr__(self) -> str: + return str(self._datastores) - def __repr__(self): - return str(self.datastores) + def aslist(self) -> list[dict[str, str]]: + return self._datastores diff --git a/geoservercloud/models/featuretype.py b/geoservercloud/models/featuretype.py index 3ad5712..bd94b29 100644 --- a/geoservercloud/models/featuretype.py +++ b/geoservercloud/models/featuretype.py @@ -1,12 +1,10 @@ import json -from requests.models import Response - -from geoservercloud.models import I18N +from geoservercloud.models import EntityModel, I18N # TODO: import more default values from Templates -class FeatureType: +class FeatureType(EntityModel): def __init__( self, namespace_name: str, diff --git a/geoservercloud/models/featuretypes.py b/geoservercloud/models/featuretypes.py index 463ddad..e27ea2c 100644 --- a/geoservercloud/models/featuretypes.py +++ b/geoservercloud/models/featuretypes.py @@ -1,10 +1,9 @@ import json -from requests.models import Response +from geoservercloud.models import ListModel -class FeatureTypes: - +class FeatureTypes(ListModel): def __init__(self, featuretypes: list = []) -> None: self._featuretypes = featuretypes diff --git a/geoservercloud/models/style.py b/geoservercloud/models/style.py index af67889..fbde64e 100644 --- a/geoservercloud/models/style.py +++ b/geoservercloud/models/style.py @@ -1,11 +1,11 @@ import json import xmltodict -from requests.models import Response +from geoservercloud.models import EntityModel -class Style: +class Style(EntityModel): def __init__( self, name: str, diff --git a/geoservercloud/models/styles.py b/geoservercloud/models/styles.py index 92af7ec..26c1aaa 100644 --- a/geoservercloud/models/styles.py +++ b/geoservercloud/models/styles.py @@ -1,8 +1,7 @@ -from requests.models import Response +from geoservercloud.models import ListModel -class Styles: - +class Styles(ListModel): def __init__(self, styles: list[str], workspace: str | None = None) -> None: self._workspace = workspace self._styles = styles diff --git a/geoservercloud/models/workspace.py b/geoservercloud/models/workspace.py index 29448f0..504b03b 100644 --- a/geoservercloud/models/workspace.py +++ b/geoservercloud/models/workspace.py @@ -1,31 +1,31 @@ import json -import logging +from typing import Any -from requests.models import Response +from geoservercloud.models import EntityModel -log = logging.getLogger() - - -class Workspace: +class Workspace(EntityModel): def __init__(self, name: str, isolated: bool = False) -> None: - self.name = name - self.isolated = isolated + self.name: str = name + self.isolated: bool = isolated + + def asdict(self) -> dict[str, Any]: + return { + "name": self.name, + "isolated": self.isolated, + } - def put_payload(self): - payload = {"workspace": {"name": self.name}} - if self.isolated: - payload["workspace"]["isolated"] = self.isolated - return payload + def put_payload(self) -> dict[str, dict[str, Any]]: + return {"workspace": self.asdict()} - def post_payload(self): + def post_payload(self) -> dict[str, dict[str, str]]: return self.put_payload() @classmethod - def from_dict(cls, content: dict): + def from_get_response_payload(cls, content: dict): return cls( - content.get("workspace", {}).get("name", None), - content.get("workspace", {}).get("isolated", False), + content["workspace"]["name"], + content["workspace"]["isolated"], ) def __repr__(self): diff --git a/geoservercloud/models/workspaces.py b/geoservercloud/models/workspaces.py index 7d33561..c8f4a6b 100644 --- a/geoservercloud/models/workspaces.py +++ b/geoservercloud/models/workspaces.py @@ -1,33 +1,25 @@ -import logging +from geoservercloud.models import ListModel -from requests.models import Response -log = logging.getLogger() - - -class Workspaces: - - def __init__(self, workspaces: list = []) -> None: +class Workspaces(ListModel): + def __init__(self, workspaces: list[dict[str, str]] = []) -> None: self._workspaces = workspaces - def find(self, workspace_name: str): - return self.workspaces.get(workspace_name, None) - - @property - def workspaces(self): - return self._workspaces + def find(self, workspace_name: str) -> dict[str, str] | None: + for ws in self._workspaces: + if ws["name"] == workspace_name: + return ws @classmethod - def from_dict(cls, content: dict): + def from_get_response_payload(cls, content: dict): - workspaces = [] - # Map the response to a list of Workspace instances - for ws in content.get("workspaces", {}).get("workspace", []): - workspaces.append(ws["name"]) + workspaces: str | dict = content["workspaces"] + if not workspaces: + return cls() + return cls(workspaces["workspace"]) # type: ignore - # Now 'workspaces' is a list of Workspace instances - log.debug("Parsed Workspaces:") - for workspace in workspaces: - log.debug(f"Name: {workspace}") + def __repr__(self) -> str: + return str(self._workspaces) - return cls(workspaces) + def aslist(self) -> list[dict[str, str]]: + return self._workspaces diff --git a/geoservercloud/services/__init__.py b/geoservercloud/services/__init__.py index d885ea3..e408a06 100644 --- a/geoservercloud/services/__init__.py +++ b/geoservercloud/services/__init__.py @@ -1,10 +1,7 @@ -from .endpoints import AclEndpoints, GwcEndpoints, OwsEndpoints, RestEndpoints +from .owsservice import OwsService from .restservice import RestService __all__ = [ + "OwsService", "RestService", - "AclEndpoints", - "OwsEndpoints", - "GwcEndpoints", - "RestEndpoints", ] diff --git a/geoservercloud/services/endpoints.py b/geoservercloud/services/endpoints.py deleted file mode 100644 index 6a7f557..0000000 --- a/geoservercloud/services/endpoints.py +++ /dev/null @@ -1,174 +0,0 @@ -class AclEndpoints: - def __init__(self, base_url: str = "/acl") -> None: - self.base_url: str = base_url - - def adminrules(self) -> str: - return f"{self.base_url}/api/adminrules" - - def adminrule(self, id: int) -> str: - return f"{self.base_url}/api/adminrules/id/{id}" - - def rules(self) -> str: - return f"{self.base_url}/api/rules" - - -class GwcEndpoints: - def __init__(self, base_url: str = "/gwc/rest") -> None: - self.base_url: str = base_url - - def reload(self) -> str: - return f"{self.base_url}/reload" - - def layers(self, workspace_name: str) -> str: - return f"{self.base_url}/layers.json" - - def layer(self, workspace_name: str, layer_name: str) -> str: - return f"{self.base_url}/layers/{workspace_name}:{layer_name}.json" - - def gridsets(self) -> str: - return f"{self.base_url}/gridsets.json" - - def gridset(self, epsg: int) -> str: - return f"{self.base_url}/gridsets/EPSG:{str(epsg)}.xml" - - -class OwsEndpoints: - def __init__(self, base_url: str = "") -> None: - self.base_url: str = base_url - - def ows(self) -> str: - return f"{self.base_url}/ows" - - def wms(self) -> str: - return f"{self.base_url}/wms" - - def wfs(self) -> str: - return f"{self.base_url}/wfs" - - def wcs(self) -> str: - return f"{self.base_url}/wcs" - - def wmts(self) -> str: - return f"{self.base_url}/gwc/service/wmts" - - def workspace_ows(self, workspace_name: str) -> str: - return f"{self.base_url}/{workspace_name}/ows" - - def workspace_wms(self, workspace_name: str) -> str: - return f"{self.base_url}/{workspace_name}/wms" - - def workspace_wfs(self, workspace_name: str) -> str: - return f"{self.base_url}/{workspace_name}/wfs" - - def workspace_wcs(self, workspace_name: str) -> str: - return f"{self.base_url}/{workspace_name}/wcs" - - -class RestEndpoints: - def __init__(self, base_url: str = "/rest") -> None: - self.base_url: str = base_url - - def styles(self) -> str: - return f"{self.base_url}/styles.json" - - def style(self, style_name: str) -> str: - return f"{self.base_url}/styles/{style_name}.json" - - def workspaces(self) -> str: - return f"{self.base_url}/workspaces.json" - - def workspace(self, workspace_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}.json" - - def workspace_styles(self, workspace_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/styles.json" - - def workspace_style(self, workspace_name: str, style_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/styles/{style_name}.json" - - def workspace_layer(self, workspace_name: str, layer_name: str) -> str: - return f"{self.base_url}/layers/{workspace_name}:{layer_name}.json" - - def workspace_wms_settings(self, workspace_name: str) -> str: - return f"{self.base_url}/services/wms/workspaces/{workspace_name}/settings.json" - - def workspace_wfs_settings(self, workspace_name: str) -> str: - return f"{self.base_url}/services/wfs/workspaces/{workspace_name}/settings.json" - - def datastores(self, workspace_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/datastores.json" - - def datastore(self, workspace_name: str, datastore_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}.json" - - def featuretypes(self, workspace_name: str, datastore_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}/featuretypes.json" - - def featuretype( - self, workspace_name: str, datastore_name: str, featuretype_name: str - ) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}/featuretypes/{featuretype_name}.json" - - def layergroup(self, workspace_name: str, layergroup_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/layergroups/{layergroup_name}.json" - - def layergroups(self, workspace_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/layergroups.json" - - def coveragestores(self, workspace_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/coveragestores.json" - - def coveragestore(self, workspace_name: str, coveragestore_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}.json" - - def coverages(self, workspace_name: str, coveragestore_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}/coverages.json" - - def coverage( - self, workspace_name: str, coveragestore_name: str, coverage_name: str - ) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}/coverages/{coverage_name}.json" - - def wmsstores(self, workspace_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/wmsstores.json" - - def wmsstore(self, workspace_name: str, wmsstore_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/wmsstores/{wmsstore_name}.json" - - def wmtsstores(self, workspace_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores.json" - - def wmtsstore(self, workspace_name: str, wmtsstore_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}.json" - - def wmtslayers(self, workspace_name: str, wmtsstore_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}/layers.json" - - def wmtslayer( - self, workspace_name: str, wmtsstore_name: str, wmtslayer_name: str - ) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}/layers/{wmtslayer_name}.json" - - def namespaces(self) -> str: - return f"{self.base_url}/namespaces.json" - - def namespace(self, namespace_name: str) -> str: - return f"{self.base_url}/namespaces/{namespace_name}.json" - - def users(self) -> str: - return f"{self.base_url}/security/usergroup/users.json" - - def user(self, username: str) -> str: - return f"{self.base_url}/security/usergroup/user/{username}.json" - - def roles(self) -> str: - return f"{self.base_url}/security/roles.json" - - def user_roles(self, username: str) -> str: - return f"{self.base_url}/security/roles/user/{username}.json" - - def role(self, role_name: str) -> str: - return f"{self.base_url}/security/roles/role/{role_name}.json" - - def role_user(self, role_name: str, username: str) -> str: - return f"{self.base_url}/security/roles/role/{role_name}/user/{username}.json" diff --git a/geoservercloud/services/owsservice.py b/geoservercloud/services/owsservice.py new file mode 100644 index 0000000..6e2596b --- /dev/null +++ b/geoservercloud/services/owsservice.py @@ -0,0 +1,214 @@ +from json import JSONDecodeError +from typing import Any +from owslib.map.wms130 import WebMapService_1_3_0 +from owslib.wmts import WebMapTileService +from requests import Response +import xmltodict + +from geoservercloud.services.restclient import RestClient + + +class OwsService: + def __init__(self, url: str, auth: tuple[str, str]) -> None: + self.url: str = url + self.auth: tuple[str, str] = auth + self.ows_endpoints = self.OwsEndpoints() + self.rest_client = RestClient(url, auth) + + def create_wms(self, workspace_name: str | None = None) -> WebMapService_1_3_0: + print(self.ows_endpoints.wms()) + if workspace_name is None: + return WebMapService_1_3_0( + f"{self.url}{self.ows_endpoints.wms()}", + username=self.auth[0], + password=self.auth[1], + ) + return WebMapService_1_3_0( + f"{self.url}{self.ows_endpoints.workspace_wms(workspace_name)}", + username=self.auth[0], + password=self.auth[1], + ) + + def create_wmts(self) -> WebMapTileService: + return WebMapTileService( + self.ows_endpoints.wmts(), + version="1.0.0", + username=self.auth[0], + password=self.auth[1], + ) + + def get_wms_capabilities( + self, workspace_name: str, accept_languages: str | None = None + ) -> dict[str, Any]: + path: str = self.ows_endpoints.workspace_wms(workspace_name) + params: dict[str, str] = { + "service": "WMS", + "version": "1.3.0", + "request": "GetCapabilities", + } + if accept_languages: + params["AcceptLanguages"] = accept_languages + response: Response = self.rest_client.get(path, params=params) + return xmltodict.parse(response.content) + + def get_wms_layers( + self, workspace_name: str, accept_languages: str | None = None + ) -> Any | dict[str, Any]: + + capabilities: dict[str, Any] = self.get_wms_capabilities( + workspace_name, accept_languages + ) + try: + return capabilities["WMS_Capabilities"]["Capability"]["Layer"] + except KeyError: + return capabilities + + def get_legend_graphic( + self, + layer: list[str], + format: str = "image/png", + language: str | None = None, + style: str | None = None, + workspace_name: str | None = None, + ) -> Response: + """ + WMS GetLegendGraphic request + """ + path: str + if not workspace_name: + path = self.ows_endpoints.wms() + else: + path = self.ows_endpoints.workspace_wms(workspace_name) + params: dict[str, Any] = { + "service": "WMS", + "version": "1.3.0", + "request": "GetLegendGraphic", + "format": format, + "layer": layer, + } + if language: + params["language"] = language + if style: + params["style"] = style + return self.rest_client.get(path, params=params) + + def get_wfs_capabilities(self, workspace_name: str) -> dict[str, Any]: + params: dict[str, str] = { + "service": "WFS", + "version": "1.1.0", + "request": "GetCapabilities", + } + print(self.ows_endpoints.workspace_wfs(workspace_name)) + response: Response = self.rest_client.get( + self.ows_endpoints.workspace_wfs(workspace_name), params=params + ) + return xmltodict.parse(response.content) + + def get_wfs_layers(self, workspace_name: str) -> Any | dict[str, Any]: + capabilities: dict[str, Any] = self.get_wfs_capabilities(workspace_name) + try: + return capabilities["wfs:WFS_Capabilities"]["FeatureTypeList"] + except KeyError: + return capabilities + + def get_feature( + self, + workspace_name: str, + type_name: str, + feature_id: int | None = None, + max_feature: int | None = None, + format: str = "application/json", + ) -> dict[str, Any] | str: + path = self.ows_endpoints.workspace_wfs(workspace_name) + params = { + "service": "WFS", + "version": "1.1.0", + "request": "GetFeature", + "typeName": type_name, + "outputFormat": format, + } + if feature_id: + params["featureID"] = str(feature_id) + if max_feature: + params["maxFeatures"] = str(max_feature) + response = self.rest_client.get(path, params=params) + try: + return response.json() + except JSONDecodeError: + return response.content.decode() + + def describe_feature_type( + self, + workspace_name: str | None = None, + type_name: str | None = None, + format: str = "application/json", + ) -> dict[str, Any] | str: + if not workspace_name: + path = self.ows_endpoints.wfs() + else: + path = self.ows_endpoints.workspace_wfs(workspace_name) + params = { + "service": "WFS", + "version": "1.1.0", + "request": "DescribeFeatureType", + "outputFormat": format, + } + if type_name: + params["typeName"] = type_name + response = self.rest_client.get(path, params=params) + try: + return response.json() + except JSONDecodeError: + return response.content.decode() + + def get_property_value( + self, + workspace_name: str, + type_name: str, + property: str, + ) -> dict | list | str: + path = self.ows_endpoints.workspace_wfs(workspace_name) + params = { + "service": "WFS", + "version": "2.0.0", + "request": "GetPropertyValue", + "typeNames": type_name, + "valueReference": property, + } + response = self.rest_client.get(path, params=params) + value_collection = xmltodict.parse(response.content).get("wfs:ValueCollection") + if not value_collection: + return response.content.decode() + else: + return value_collection.get("wfs:member", {}) + + class OwsEndpoints: + def __init__(self, base_url: str = "") -> None: + self.base_url: str = base_url + + def ows(self) -> str: + return f"{self.base_url}/ows" + + def wms(self) -> str: + return f"{self.base_url}/wms" + + def wfs(self) -> str: + return f"{self.base_url}/wfs" + + def wcs(self) -> str: + return f"{self.base_url}/wcs" + + def wmts(self) -> str: + return f"{self.base_url}/gwc/service/wmts" + + def workspace_ows(self, workspace_name: str) -> str: + return f"{self.base_url}/{workspace_name}/ows" + + def workspace_wms(self, workspace_name: str) -> str: + return f"{self.base_url}/{workspace_name}/wms" + + def workspace_wfs(self, workspace_name: str) -> str: + return f"{self.base_url}/{workspace_name}/wfs" + + def workspace_wcs(self, workspace_name: str) -> str: + return f"{self.base_url}/{workspace_name}/wcs" diff --git a/geoservercloud/services/restclient.py b/geoservercloud/services/restclient.py new file mode 100644 index 0000000..34c8d92 --- /dev/null +++ b/geoservercloud/services/restclient.py @@ -0,0 +1,98 @@ +from typing import Any + +import requests + +TIMEOUT = 120 + + +class RestClient: + """ + HTTP client responsible for issuing requests + + Attributes + ---------- + url : str + base GeoServer URL + auth : tuple[str, str] + username and password for GeoServer + """ + + def __init__(self, url: str, auth: tuple[str, str]) -> None: + self.url: str = url + self.auth: tuple[str, str] = auth + + def get( + self, + path: str, + params: dict[str, str] | None = None, + headers: dict[str, str] | None = None, + ) -> requests.Response: + response: requests.Response = requests.get( + f"{self.url}{path}", + params=params, + headers=headers, + auth=self.auth, + timeout=TIMEOUT, + ) + if response.status_code != 404: + response.raise_for_status() + return response + + def post( + self, + path: str, + params: dict[str, str] | None = None, + headers: dict[str, str] | None = None, + json: dict[str, dict[str, Any]] | None = None, + data: bytes | None = None, + ) -> requests.Response: + + response: requests.Response = requests.post( + f"{self.url}{path}", + params=params, + headers=headers, + json=json, + data=data, + auth=self.auth, + timeout=TIMEOUT, + ) + if response.status_code != 409: + response.raise_for_status() + return response + + def put( + self, + path: str, + params: dict[str, str] | None = None, + headers: dict[str, str] | None = None, + json: dict[str, dict[str, Any]] | None = None, + data: bytes | None = None, + ) -> requests.Response: + response: requests.Response = requests.put( + f"{self.url}{path}", + params=params, + headers=headers, + json=json, + data=data, + auth=self.auth, + timeout=TIMEOUT, + ) + response.raise_for_status() + return response + + def delete( + self, + path: str, + params: dict[str, str] | None = None, + headers: dict[str, str] | None = None, + ) -> requests.Response: + response: requests.Response = requests.delete( + f"{self.url}{path}", + params=params, + headers=headers, + auth=self.auth, + timeout=TIMEOUT, + ) + if response.status_code != 404: + response.raise_for_status() + return response diff --git a/geoservercloud/services/restservice.py b/geoservercloud/services/restservice.py index 14e67b3..802fb6a 100644 --- a/geoservercloud/services/restservice.py +++ b/geoservercloud/services/restservice.py @@ -1,87 +1,518 @@ -from typing import Any +from json import JSONDecodeError +from pathlib import Path +from typing import Any, Type -import requests +from requests import Response +from owslib.wmts import WebMapTileService -TIMEOUT = 120 +from geoservercloud import utils +from geoservercloud.models import ( + BaseModel, + DataStores, + PostGisDataStore, + Workspace, + Workspaces, +) +from geoservercloud.services.restclient import RestClient +from geoservercloud.templates import Templates class RestService: + """ + Service responsible for serializing and deserializing payloads and routing requests to GeoServer REST API + + Attributes + ---------- + url : str + base GeoServer URL + auth : tuple[str, str] + username and password for GeoServer + """ + def __init__(self, url: str, auth: tuple[str, str]) -> None: self.url: str = url self.auth: tuple[str, str] = auth + self.rest_client = RestClient(url, auth) + self.acl_endpoints = self.AclEndpoints() + self.gwc_endpoints = self.GwcEndpoints() + self.rest_endpoints = self.RestEndpoints() - def get( - self, - path: str, - params: dict[str, str] | None = None, - headers: dict[str, str] | None = None, - ) -> requests.Response: - response: requests.Response = requests.get( - f"{self.url}{path}", - params=params, - headers=headers, - auth=self.auth, - timeout=TIMEOUT, - ) - if response.status_code != 404: - response.raise_for_status() - return response - - def post( + @staticmethod + def get_wmts_layer_bbox( + url: str, layer_name: str + ) -> tuple[float, float, float, float] | None: + wmts = WebMapTileService(url) + try: + return wmts[layer_name].boundingBoxWGS84 + except (KeyError, AttributeError): + return None + + def get_workspaces(self) -> tuple[Workspaces | str, int]: + response: Response = self.rest_client.get(self.rest_endpoints.workspaces()) + return self.deserialize_response(response, Workspaces) + + def get_workspace(self, name: str) -> tuple[Workspace | str, int]: + response: Response = self.rest_client.get(self.rest_endpoints.workspace(name)) + return self.deserialize_response(response, Workspace) + + def create_workspace(self, workspace: Workspace) -> tuple[str, int]: + path: str = self.rest_endpoints.workspaces() + response: Response = self.rest_client.post(path, json=workspace.post_payload()) + if response.status_code == 409: + path = self.rest_endpoints.workspace(workspace.name) + response = self.rest_client.put(path, json=workspace.put_payload()) + return response.content.decode(), response.status_code + + def delete_workspace(self, workspace: Workspace) -> tuple[str, int]: + path: str = self.rest_endpoints.workspace(workspace.name) + params: dict[str, str] = {"recurse": "true"} + response: Response = self.rest_client.delete(path, params=params) + return response.content.decode(), response.status_code + + def publish_workspace(self, workspace: Workspace) -> tuple[str, int]: + data: dict[str, dict[str, Any]] = Templates.workspace_wms(workspace.name) + response: Response = self.rest_client.put( + self.rest_endpoints.workspace_wms_settings(workspace.name), json=data + ) + return response.content.decode(), response.status_code + + def set_default_locale_for_service( + self, workspace: Workspace, locale: str | None + ) -> None: + data: dict[str, dict[str, Any]] = { + "wms": { + "defaultLocale": locale, + } + } + self.rest_client.put( + self.rest_endpoints.workspace_wms_settings(workspace.name), json=data + ) + + def get_datastores(self, workspace_name: str) -> tuple[DataStores | str, int]: + response: Response = self.rest_client.get( + self.rest_endpoints.datastores(workspace_name) + ) + return self.deserialize_response(response, DataStores) + + def get_pg_datastore( + self, workspace_name: str, datastore_name: str + ) -> tuple[PostGisDataStore | str, int]: + response: Response = self.rest_client.get( + self.rest_endpoints.datastore(workspace_name, datastore_name) + ) + return self.deserialize_response(response, PostGisDataStore) + + def create_pg_datastore( + self, workspace_name: str, datastore: PostGisDataStore + ) -> tuple[str, int]: + if not self.resource_exists( + self.rest_endpoints.datastore(workspace_name, datastore.name) + ): + response: Response = self.rest_client.post( + self.rest_endpoints.datastores(workspace_name), + json=datastore.post_payload(), + ) + else: + response: Response = self.rest_client.put( + self.rest_endpoints.datastore(workspace_name, datastore.name), + json=datastore.put_payload(), + ) + return response.content.decode(), response.status_code + + def create_jndi_datastore( + self, workspace_name: str, datastore: PostGisDataStore + ) -> tuple[str, int]: + if not self.resource_exists( + self.rest_endpoints.datastore(workspace_name, datastore.name) + ): + response: Response = self.rest_client.post( + self.rest_endpoints.datastores(workspace_name), + json=datastore.post_payload(), + ) + else: + response: Response = self.rest_client.put( + self.rest_endpoints.datastore(workspace_name, datastore.name), + json=datastore.put_payload(), + ) + return response.content.decode(), response.status_code + + def create_wmts_store( self, - path: str, - params: dict[str, str] | None = None, - headers: dict[str, str] | None = None, - json: dict[str, dict[str, Any]] | None = None, - data: bytes | None = None, - ) -> requests.Response: - - response: requests.Response = requests.post( - f"{self.url}{path}", - params=params, - headers=headers, - json=json, - data=data, - auth=self.auth, - timeout=TIMEOUT, - ) - if response.status_code != 409: - response.raise_for_status() - return response - - def put( + workspace_name: str, + name: str, + capabilities: str, + ) -> tuple[str, int]: + payload = Templates.wmts_store(workspace_name, name, capabilities) + if not self.resource_exists( + self.rest_endpoints.wmtsstore(workspace_name, name) + ): + response: Response = self.rest_client.post( + self.rest_endpoints.wmtsstores(workspace_name), json=payload + ) + else: + response: Response = self.rest_client.put( + self.rest_endpoints.wmtsstore(workspace_name, name), json=payload + ) + return response.content.decode(), response.status_code + + def create_wmts_layer( self, - path: str, - params: dict[str, str] | None = None, - headers: dict[str, str] | None = None, - json: dict[str, dict[str, Any]] | None = None, - data: bytes | None = None, - ) -> requests.Response: - response: requests.Response = requests.put( - f"{self.url}{path}", - params=params, - headers=headers, - json=json, - data=data, - auth=self.auth, - timeout=TIMEOUT, - ) - response.raise_for_status() - return response - - def delete( + workspace_name: str, + wmts_store: str, + native_layer: str, + published_layer: str, + epsg: int = 4326, + international_title: dict[str, str] | None = None, + international_abstract: dict[str, str] | None = None, + ) -> tuple[str, int]: + resource_path: str = self.rest_endpoints.wmtslayer( + workspace_name, wmts_store, published_layer + ) + if self.resource_exists(resource_path): + self.rest_client.delete( + resource_path, + params={"recurse": "true"}, + ) + capabilities_url: str = ( + self.rest_client.get( + self.rest_endpoints.wmtsstore(workspace_name, wmts_store) + ) + .json() + .get("wmtsStore") + .get("capabilitiesURL") + ) + wgs84_bbox: tuple[float, float, float, float] | None = self.get_wmts_layer_bbox( + capabilities_url, native_layer + ) + + payload: dict[str, dict[str, Any]] = Templates.wmts_layer( + published_layer, + native_layer, + wgs84_bbox=wgs84_bbox, + epsg=epsg, + international_title=international_title, + international_abstract=international_abstract, + ) + + response: Response = self.rest_client.post( + self.rest_endpoints.wmtslayers(workspace_name, wmts_store), json=payload + ) + return response.content.decode(), response.status_code + + def get_gwc_layer( + self, workspace_name: str, layer: str + ) -> tuple[dict[str, Any] | str, int]: + response: Response = self.rest_client.get( + self.gwc_endpoints.layer(workspace_name, layer) + ) + try: + content = response.json() + except: + content = response.content.decode() + return content, response.status_code + + def publish_gwc_layer( + self, workspace_name: str, layer: str, epsg: int + ) -> tuple[str, int]: + # Reload config to make sure GWC is aware of GeoServer layers + self.rest_client.post( + self.gwc_endpoints.reload(), + headers={"Content-Type": "application/json"}, + data="reload_configuration=1", # type: ignore + ) + # Do not re-publish an existing layer + # TODO: fix template so that we can PUT an existing layer (/!\ check with an OGC client that the + # layer is not corrupted after the second PUT) + content, code = self.get_gwc_layer(workspace_name, layer) + if code == 200: + return "", code + payload = Templates.gwc_layer(workspace_name, layer, f"EPSG:{epsg}") + response: Response = self.rest_client.put( + self.gwc_endpoints.layer(workspace_name, layer), + json=payload, + ) + return response.content.decode(), response.status_code + + def create_gridset(self, epsg: int) -> tuple[str, int]: + """ + Create a gridset for GeoWebCache for a given projection + Supported EPSG codes are 2056, 21781 and 3857 + """ + if self.resource_exists(self.gwc_endpoints.gridset(epsg)): + return "", 200 + file_path: Path = Path(__file__).parent.parent / "gridsets" / f"{epsg}.xml" + headers: dict[str, str] = {"Content-Type": "application/xml"} + try: + data: bytes = file_path.read_bytes() + except FileNotFoundError: + raise ValueError(f"No gridset definition found for EPSG:{epsg}") + response: Response = self.rest_client.put( + self.gwc_endpoints.gridset(epsg), data=data, headers=headers + ) + return response.content.decode(), response.status_code + + def create_feature_type( self, - path: str, - params: dict[str, str] | None = None, - headers: dict[str, str] | None = None, - ) -> requests.Response: - response: requests.Response = requests.delete( - f"{self.url}{path}", - params=params, - headers=headers, - auth=self.auth, - timeout=TIMEOUT, - ) - if response.status_code != 404: - response.raise_for_status() - return response + layer: str, + workspace_name: str, + datastore: str, + title: str | dict = "Default title", + abstract: str | dict = "Default abstract", + attributes: dict = Templates.geom_point_attribute(), + epsg: int = 4326, + ) -> tuple[str, int]: + # TODO: use FeatureType.post_payload() + payload: dict[str, dict[str, Any]] = Templates.feature_type( + layer=layer, + workspace=workspace_name, + datastore=datastore, + attributes=utils.convert_attributes(attributes), + epsg=epsg, + ) + if type(title) is dict: + payload["featureType"]["internationalTitle"] = title + else: + payload["featureType"]["title"] = title + if type(abstract) is dict: + payload["featureType"]["internationalAbstract"] = abstract + else: + payload["featureType"]["abstract"] = abstract + + if not self.resource_exists( + self.rest_endpoints.featuretype(workspace_name, datastore, layer) + ): + response: Response = self.rest_client.post( + self.rest_endpoints.featuretypes(workspace_name, datastore), + json=payload, + ) + else: + response: Response = self.rest_client.put( + self.rest_endpoints.featuretype(workspace_name, datastore, layer), + json=payload, + ) + return response.content.decode(), response.status_code + + def create_user( + self, user: str, password: str, enabled: bool = True + ) -> tuple[str, int]: + headers: dict[str, str] = {"Content-Type": "application/json"} + payload: dict[str, dict[str, Any]] = { + "user": { + "userName": user, + "password": password, + "enabled": enabled, + } + } + response: Response = self.rest_client.post( + self.rest_endpoints.users(), json=payload, headers=headers + ) + return response.content.decode(), response.status_code + + def update_user( + self, user: str, password: str | None = None, enabled: bool | None = None + ) -> tuple[str, int]: + headers: dict[str, str] = {"Content-Type": "application/json"} + payload: dict[str, dict[str, Any]] = {"user": {}} + if password: + payload["user"]["password"] = password + if enabled is not None: + payload["user"]["enabled"] = enabled + response: Response = self.rest_client.post( + self.rest_endpoints.user(user), json=payload, headers=headers + ) + return response.content.decode(), response.status_code + + def delete_user(self, user: str) -> tuple[str, int]: + response: Response = self.rest_client.delete(self.rest_endpoints.user(user)) + return response.content.decode(), response.status_code + + def create_role(self, role_name: str) -> tuple[str, int]: + response: Response = self.rest_client.post(self.rest_endpoints.role(role_name)) + return response.content.decode(), response.status_code + + def delete_role(self, role_name: str) -> tuple[str, int]: + response: Response = self.rest_client.delete( + self.rest_endpoints.role(role_name) + ) + return response.content.decode(), response.status_code + + def create_role_if_not_exists(self, role_name: str) -> tuple[str, int]: + if self.role_exists(role_name): + return "", 200 + return self.create_role(role_name) + + def role_exists(self, role_name: str) -> bool: + response: Response = self.rest_client.get( + self.rest_endpoints.roles(), headers={"Accept": "application/json"} + ) + roles = response.json().get("roles", []) + return role_name in roles + + def get_user_roles(self, user: str) -> tuple[list[str] | str, int]: + """ + Get all roles assigned to a GeoServer user + """ + response: Response = self.rest_client.get(self.rest_endpoints.user_roles(user)) + try: + content = response.json().get("roles", []) + except JSONDecodeError: + content = response.content.decode() + return content, response.status_code + + def resource_exists(self, path: str) -> bool: + response: Response = self.rest_client.get(path) + return response.status_code == 200 + + def deserialize_response( + self, response: Response, data_type: Type[BaseModel] + ) -> tuple[Any, int]: + try: + content = response.json() + except JSONDecodeError: + return response.content.decode(), response.status_code + return data_type.from_get_response_payload(content), response.status_code + + class AclEndpoints: + def __init__(self, base_url: str = "/acl") -> None: + self.base_url: str = base_url + + def adminrules(self) -> str: + return f"{self.base_url}/api/adminrules" + + def adminrule(self, id: int) -> str: + return f"{self.base_url}/api/adminrules/id/{id}" + + def rules(self) -> str: + return f"{self.base_url}/api/rules" + + class GwcEndpoints: + def __init__(self, base_url: str = "/gwc/rest") -> None: + self.base_url: str = base_url + + def reload(self) -> str: + return f"{self.base_url}/reload" + + def layers(self, workspace_name: str) -> str: + return f"{self.base_url}/layers.json" + + def layer(self, workspace_name: str, layer_name: str) -> str: + return f"{self.base_url}/layers/{workspace_name}:{layer_name}.json" + + def gridsets(self) -> str: + return f"{self.base_url}/gridsets.json" + + def gridset(self, epsg: int) -> str: + return f"{self.base_url}/gridsets/EPSG:{str(epsg)}.xml" + + class RestEndpoints: + def __init__(self, base_url: str = "/rest") -> None: + self.base_url: str = base_url + + def styles(self) -> str: + return f"{self.base_url}/styles.json" + + def style(self, style_name: str) -> str: + return f"{self.base_url}/styles/{style_name}.json" + + def workspaces(self) -> str: + return f"{self.base_url}/workspaces.json" + + def workspace(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}.json" + + def workspace_styles(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/styles.json" + + def workspace_style(self, workspace_name: str, style_name: str) -> str: + return ( + f"{self.base_url}/workspaces/{workspace_name}/styles/{style_name}.json" + ) + + def workspace_layer(self, workspace_name: str, layer_name: str) -> str: + return f"{self.base_url}/layers/{workspace_name}:{layer_name}.json" + + def workspace_wms_settings(self, workspace_name: str) -> str: + return f"{self.base_url}/services/wms/workspaces/{workspace_name}/settings.json" + + def workspace_wfs_settings(self, workspace_name: str) -> str: + return f"{self.base_url}/services/wfs/workspaces/{workspace_name}/settings.json" + + def datastores(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/datastores.json" + + def datastore(self, workspace_name: str, datastore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}.json" + + def featuretypes(self, workspace_name: str, datastore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}/featuretypes.json" + + def featuretype( + self, workspace_name: str, datastore_name: str, featuretype_name: str + ) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}/featuretypes/{featuretype_name}.json" + + def layergroup(self, workspace_name: str, layergroup_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/layergroups/{layergroup_name}.json" + + def layergroups(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/layergroups.json" + + def coveragestores(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/coveragestores.json" + + def coveragestore(self, workspace_name: str, coveragestore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}.json" + + def coverages(self, workspace_name: str, coveragestore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}/coverages.json" + + def coverage( + self, workspace_name: str, coveragestore_name: str, coverage_name: str + ) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}/coverages/{coverage_name}.json" + + def wmsstores(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmsstores.json" + + def wmsstore(self, workspace_name: str, wmsstore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmsstores/{wmsstore_name}.json" + + def wmtsstores(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores.json" + + def wmtsstore(self, workspace_name: str, wmtsstore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}.json" + + def wmtslayers(self, workspace_name: str, wmtsstore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}/layers.json" + + def wmtslayer( + self, workspace_name: str, wmtsstore_name: str, wmtslayer_name: str + ) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}/layers/{wmtslayer_name}.json" + + def namespaces(self) -> str: + return f"{self.base_url}/namespaces.json" + + def namespace(self, namespace_name: str) -> str: + return f"{self.base_url}/namespaces/{namespace_name}.json" + + def users(self) -> str: + return f"{self.base_url}/security/usergroup/users.json" + + def user(self, username: str) -> str: + return f"{self.base_url}/security/usergroup/user/{username}.json" + + def roles(self) -> str: + return f"{self.base_url}/security/roles.json" + + def user_roles(self, username: str) -> str: + return f"{self.base_url}/security/roles/user/{username}.json" + + def role(self, role_name: str) -> str: + return f"{self.base_url}/security/roles/role/{role_name}.json" + + def role_user(self, role_name: str, username: str) -> str: + return ( + f"{self.base_url}/security/roles/role/{role_name}/user/{username}.json" + ) diff --git a/geoservercloud/templates.py b/geoservercloud/templates.py index 992c161..db00b7f 100644 --- a/geoservercloud/templates.py +++ b/geoservercloud/templates.py @@ -37,7 +37,6 @@ class Templates: - @staticmethod def workspace_wms(workspace: str) -> dict[str, dict[str, Any]]: return { diff --git a/tests/models/test_common.py b/tests/models/test_common.py index 4d63de6..ebf6d4b 100644 --- a/tests/models/test_common.py +++ b/tests/models/test_common.py @@ -2,10 +2,7 @@ import pytest -from geoservercloud.models import ( - I18N, - KeyDollarListDict, -) +from geoservercloud.models import I18N, KeyDollarListDict def test_keydollarlistdict_initialization_with_input_list(): diff --git a/tests/models/test_datastore.py b/tests/models/test_datastore.py index 793e374..b50ce5c 100644 --- a/tests/models/test_datastore.py +++ b/tests/models/test_datastore.py @@ -1,7 +1,24 @@ -from geoservercloud.models import ( - KeyDollarListDict, - PostGisDataStore, -) +import pytest + +from geoservercloud.models import KeyDollarListDict, PostGisDataStore + + +@pytest.fixture(scope="module") +def pg_payload(): + yield { + "dataStore": { + "name": "test_datastore", + "type": "PostGIS", + "enabled": True, + "connectionParameters": { + "entry": [ + {"@key": "host", "$": "localhost"}, + {"@key": "port", "$": "5432"}, + ] + }, + "workspace": {"name": "test_workspace"}, + } + } def test_postgisdatastore_initialization(): @@ -14,12 +31,12 @@ def test_postgisdatastore_initialization(): ) assert datastore.workspace_name == "test_workspace" - assert datastore.data_store_name == "test_datastore" + assert datastore.name == "test_datastore" assert datastore.connection_parameters == connection_parameters - assert datastore.data_store_type == "PostGIS" + assert datastore.type == "PostGIS" -def test_postgisdatastore_put_payload(): +def test_postgisdatastore_put_payload(pg_payload): connection_parameters = KeyDollarListDict( [{"@key": "host", "$": "localhost"}, {"@key": "port", "$": "5432"}] ) @@ -28,21 +45,7 @@ def test_postgisdatastore_put_payload(): "test_workspace", "test_datastore", connection_parameters ) - expected_payload = { - "dataStore": { - "name": "test_datastore", - "type": "PostGIS", - "enabled": True, - "connectionParameters": { - "entry": [ - {"@key": "host", "$": "localhost"}, - {"@key": "port", "$": "5432"}, - ] - }, - } - } - - assert datastore.put_payload() == expected_payload + assert datastore.put_payload() == pg_payload def test_postgisdatastore_post_payload(): @@ -57,43 +60,30 @@ def test_postgisdatastore_post_payload(): assert datastore.post_payload() == datastore.put_payload() -def test_postgisdatastore_from_dict(): - mock_response = { - "dataStore": { - "name": "test_datastore", - "type": "PostGIS", - "connectionParameters": { - "entry": [ - {"@key": "host", "$": "localhost"}, - {"@key": "port", "$": "5432"}, - ] - }, - } - } +def test_postgisdatastore_from_get_response_payload(pg_payload): - datastore = PostGisDataStore.from_dict(mock_response) + datastore = PostGisDataStore.from_get_response_payload(pg_payload) - assert datastore.data_store_name == "test_datastore" - assert datastore.data_store_type == "PostGIS" + assert datastore.name == "test_datastore" + assert datastore.type == "PostGIS" + assert isinstance(datastore.connection_parameters, KeyDollarListDict) assert datastore.connection_parameters["host"] == "localhost" assert datastore.connection_parameters["port"] == "5432" -def test_postgisdatastore_parse_connection_parameters(): - content = { - "dataStore": { - "connectionParameters": { - "entry": [ - {"@key": "host", "$": "localhost"}, - {"@key": "port", "$": "5432"}, - ] +def test_postgisdatastore_asdict(pg_payload): + datastore = PostGisDataStore.from_get_response_payload(pg_payload) + + assert datastore.asdict() == { + "name": "test_datastore", + "type": "PostGIS", + "enabled": True, + "connectionParameters": { + "entry": { + "host": "localhost", + "port": "5432", } - } + }, + "workspace": "test_workspace", } - - connection_params = PostGisDataStore.parse_connection_parameters(content) - - assert isinstance(connection_params, KeyDollarListDict) - assert connection_params["host"] == "localhost" - assert connection_params["port"] == "5432" diff --git a/tests/models/test_datastores.py b/tests/models/test_datastores.py index 1dcf5a8..7e9ab53 100644 --- a/tests/models/test_datastores.py +++ b/tests/models/test_datastores.py @@ -1,44 +1,49 @@ -from geoservercloud.models import DataStores - +from pytest import fixture -def test_datastores_initialization(): - workspace_name = "test_workspace" - datastores = ["store1", "store2"] +from geoservercloud.models import DataStores - ds = DataStores(workspace_name, datastores) - assert ds.workspace_name == "test_workspace" - assert ds.datastores == datastores +@fixture(scope="module") +def mock_datastore(): + return { + "name": "DataStore1", + "href": "http://example.com/ds1", + } -def test_datastores_from_dict(): - mock_response = { +@fixture(scope="module") +def mock_response(mock_datastore): + return { "dataStores": { - "workspace": {"name": "test_workspace"}, - "dataStore": [{"name": "store1"}, {"name": "store2"}], + "dataStore": [mock_datastore], } } - ds = DataStores.from_dict(mock_response) - assert ds.workspace_name == "test_workspace" - assert ds.datastores == ["store1", "store2"] +def test_datastores_initialization(mock_datastore): + ds = DataStores([mock_datastore]) + assert ds.aslist() == [mock_datastore] -def test_datastores_from_dict_empty(): - mock_response = { - "dataStores": {"workspace": {"name": "empty_workspace"}, "dataStore": []} - } - ds = DataStores.from_dict(mock_response) +def test_datastores_from_get_response_payload(mock_datastore, mock_response): + + ds = DataStores.from_get_response_payload(mock_response) + + assert ds.aslist() == [mock_datastore] + + +def test_datastores_from_get_response_payload_empty(): + mock_response = {"dataStores": ""} + + ds = DataStores.from_get_response_payload(mock_response) - assert ds.workspace_name == "empty_workspace" - assert ds.datastores == [] + assert ds.aslist() == [] -def test_datastores_repr(): - ds = DataStores("test_workspace", ["store1", "store2"]) +def test_datastores_repr(mock_datastore): + ds = DataStores([mock_datastore]) - expected_repr = "['store1', 'store2']" + expected_repr = "[{'name': 'DataStore1', 'href': 'http://example.com/ds1'}]" assert repr(ds) == expected_repr diff --git a/tests/models/test_workspace.py b/tests/models/test_workspace.py index c60da78..a8900d7 100644 --- a/tests/models/test_workspace.py +++ b/tests/models/test_workspace.py @@ -24,19 +24,10 @@ def test_workspace_post_payload(): assert workspace.post_payload() == expected_payload -def test_workspace_from_dict_isolated(): +def test_workspace_from_get_response_payload(): mock_response = {"workspace": {"name": "test_workspace", "isolated": True}} - workspace = Workspace.from_dict(mock_response) + workspace = Workspace.from_get_response_payload(mock_response) assert workspace.name == "test_workspace" assert workspace.isolated is True - - -def test_workspace_from_dict_not_isolated(): - mock_response = {"workspace": {"name": "test_workspace"}} - - workspace = Workspace.from_dict(mock_response) - - assert workspace.name == "test_workspace" - assert workspace.isolated is False diff --git a/tests/models/test_workspaces.py b/tests/models/test_workspaces.py index c85bbb4..ea53694 100644 --- a/tests/models/test_workspaces.py +++ b/tests/models/test_workspaces.py @@ -1,29 +1,39 @@ +from pytest import fixture + from geoservercloud.models import Workspaces -def test_workspaces_initialization(): - initial_workspaces = {"Workspace1": "http://example.com/ws1"} +@fixture(scope="module") +def initial_workspaces(): + return [ + { + "name": "Workspace1", + "href": "http://example.com/ws1", + } + ] + + +def test_workspaces_initialization(initial_workspaces): workspaces = Workspaces(initial_workspaces) - assert workspaces.workspaces == initial_workspaces + assert workspaces.aslist() == initial_workspaces -def test_workspaces_find_existing(): - initial_workspaces = {"Workspace1": "http://example.com/ws1"} +def test_workspaces_find_existing(initial_workspaces): workspaces = Workspaces(initial_workspaces) - assert workspaces.find("Workspace1") == "http://example.com/ws1" + assert workspaces.find("Workspace1") == initial_workspaces[0] -def test_workspaces_find_non_existing(): - workspaces = Workspaces({"Workspace1": "http://example.com/ws1"}) +def test_workspaces_find_non_existing(initial_workspaces): + workspaces = Workspaces(initial_workspaces) assert workspaces.find("NonExistingWorkspace") is None -def test_workspaces_from_dict_empty(): - mock_response = {"workspaces": {}} +def test_workspaces_from_get_response_payload_empty(): + mock_response = {"workspaces": ""} - workspaces = Workspaces.from_dict(mock_response) + workspaces = Workspaces.from_get_response_payload(mock_response) - assert workspaces.workspaces == [] + assert workspaces.aslist() == [] diff --git a/tests/resources/test_resource.py b/tests/resources/test_resource.py deleted file mode 100644 index 029a215..0000000 --- a/tests/resources/test_resource.py +++ /dev/null @@ -1,43 +0,0 @@ -import responses - -from geoservercloud import GeoServerCloud - - -def test_create_resource(geoserver: GeoServerCloud) -> None: - path = "/new/resource" - resource_path = "/new/resource/resource.json" - payload = {"something": "new"} - with responses.RequestsMock() as rsps: - rsps.get( - url=f"{geoserver.url}{resource_path}", - status=404, - ) - rsps.post( - url=f"{geoserver.url}{path}", - status=201, - match=[responses.matchers.json_params_matcher(payload)], - ) - - response = geoserver.create_or_update_resource(path, resource_path, payload) - - assert response.status_code == 201 - - -def test_update_resource(geoserver: GeoServerCloud) -> None: - path = "/some/resource" - resource_path = "/some/resource/resource.json" - payload = {"something": "something"} - with responses.RequestsMock() as rsps: - rsps.get( - url=f"{geoserver.url}{resource_path}", - status=200, - ) - rsps.put( - url=f"{geoserver.url}{resource_path}", - status=200, - match=[responses.matchers.json_params_matcher(payload)], - ) - - response = geoserver.create_or_update_resource(path, resource_path, payload) - - assert response.status_code == 200 diff --git a/tests/test_cascaded_wmts.py b/tests/test_cascaded_wmts.py index 1bf8615..291f3c1 100644 --- a/tests/test_cascaded_wmts.py +++ b/tests/test_cascaded_wmts.py @@ -81,14 +81,16 @@ def test_create_wmts_store( f"{geoserver.url}/rest/workspaces/{WORKSPACE}/wmtsstores.json", match=[responses.matchers.json_params_matcher(wmts_store_payload)], status=201, + body=b"test_wmtsstore", ) - response = geoserver.create_wmts_store( + content, code = geoserver.create_wmts_store( workspace_name=WORKSPACE, name=STORE, capabilities=CAPABILITIES_URL, ) - assert response.status_code == 201 + assert content == STORE + assert code == 201 def test_update_wmts_store( @@ -103,14 +105,16 @@ def test_update_wmts_store( f"{geoserver.url}/rest/workspaces/{WORKSPACE}/wmtsstores/{STORE}.json", match=[responses.matchers.json_params_matcher(wmts_store_payload)], status=200, + body=b"", ) - response = geoserver.create_wmts_store( + content, code = geoserver.create_wmts_store( workspace_name=WORKSPACE, name=STORE, capabilities=CAPABILITIES_URL, ) - assert response.status_code == 200 + assert content == "" + assert code == 200 def test_create_wmts_layer( @@ -136,16 +140,17 @@ def test_create_wmts_layer( f"{geoserver.url}/rest/workspaces/{WORKSPACE}/wmtsstores/{STORE}/layers.json", match=[responses.matchers.json_params_matcher(wmts_layer_payload)], status=201, + body=b"test_layer", ) - response = geoserver.create_wmts_layer( + content, code = geoserver.create_wmts_layer( workspace_name=WORKSPACE, wmts_store=STORE, native_layer=NATIVE_LAYER, published_layer=LAYER, ) - assert response - assert response.status_code == 201 + assert content == LAYER + assert code == 201 def test_create_wmts_layer_already_exists( @@ -176,15 +181,17 @@ def test_create_wmts_layer_already_exists( f"{geoserver.url}/rest/workspaces/{WORKSPACE}/wmtsstores/{STORE}/layers.json", match=[responses.matchers.json_params_matcher(wmts_layer_payload)], status=201, + body=b"test_layer", ) - response = geoserver.create_wmts_layer( + content, code = geoserver.create_wmts_layer( workspace_name=WORKSPACE, wmts_store=STORE, native_layer=NATIVE_LAYER, published_layer=LAYER, ) - assert response.status_code == 201 + assert content == LAYER + assert code == 201 def test_create_wmts_layer_international_title( @@ -213,7 +220,7 @@ def test_create_wmts_layer_international_title( match=[responses.matchers.json_params_matcher(wmts_layer_payload)], status=201, ) - response = geoserver.create_wmts_layer( + content, code = geoserver.create_wmts_layer( workspace_name=WORKSPACE, wmts_store=STORE, native_layer=NATIVE_LAYER, @@ -222,4 +229,4 @@ def test_create_wmts_layer_international_title( international_abstract={"en": "Abstract"}, ) - assert response.status_code == 201 + assert code == 201 diff --git a/tests/test_datastore.py b/tests/test_datastore.py index 999130d..45e5d69 100644 --- a/tests/test_datastore.py +++ b/tests/test_datastore.py @@ -45,6 +45,7 @@ def pg_payload() -> Generator[dict[str, dict[str, Any]], Any, None]: {"@key": "Expose primary keys", "$": "true"}, ] }, + "workspace": {"name": WORKSPACE}, } } @@ -75,66 +76,110 @@ def jndi_payload() -> Generator[dict[str, dict[str, Any]], Any, None]: {"@key": "Expose primary keys", "$": "true"}, ] }, + "workspace": {"name": WORKSPACE}, } } @pytest.fixture(scope="module") -def datastores_response() -> Generator[dict[str, Any], Any, None]: +def datastore_get_response() -> Generator[dict[str, Any], Any, None]: + yield { + "dataStore": { + "name": STORE, + "description": DESCRIPTION, + "type": "PostGIS (JNDI)", + "enabled": True, + "workspace": { + "name": WORKSPACE, + "href": f"http://localhost:8080/geoserver/rest/workspaces/{WORKSPACE}.json", + }, + "connectionParameters": { + "entry": [ + {"@key": "schema", "$": SCHEMA}, + {"@key": "jndiReferenceName", "$": JNDI}, + {"@key": "Expose primary keys", "$": "true"}, + {"@key": "dbtype", "$": "postgis"}, + {"@key": "namespace", "$": "http://{WORKSPACE}"}, + ] + }, + "_default": False, + "disableOnConnFailure": False, + "featureTypes": f"http://localhost:8080/geoserver/rest/workspaces/{WORKSPACE}/datastores/{STORE}/featuretypes.json", + } + } + + +@pytest.fixture(scope="module") +def datastores_get_response() -> Generator[dict[str, Any], Any, None]: yield { "dataStores": { "dataStore": [ { "name": STORE, - "href": f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores/{STORE}.json", + "href": f"http://localhost:8080/geoserver/rest/workspaces/{WORKSPACE}/datastores/{STORE}.json", } - ], + ] } } def test_get_datastores( - geoserver: GeoServerCloud, datastores_response: dict[str, Any] + geoserver: GeoServerCloud, datastores_get_response: dict[str, Any] ) -> None: with responses.RequestsMock() as rsps: rsps.get( url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores.json", status=200, - json=datastores_response, + json=datastores_get_response, ) - datastores = geoserver.get_datastores(workspace_name=WORKSPACE) - assert datastores == ["test_store"] + datastores, status_code = geoserver.get_datastores(workspace_name=WORKSPACE) + assert datastores == datastores_get_response["dataStores"]["dataStore"] + assert status_code == 200 -# Test the get_postgis_datastore method with a valid response -def test_get_postgis_datastore_valid( - geoserver: GeoServerCloud, pg_payload: dict[str, dict[str, Any]] +def test_get_pg_datastore_ok( + geoserver: GeoServerCloud, datastore_get_response: dict[str, Any] ) -> None: + expected_datastore = { + "name": STORE, + "description": DESCRIPTION, + "type": "PostGIS (JNDI)", + "enabled": True, + "workspace": WORKSPACE, + "connectionParameters": { + "entry": { + "schema": "test_schema", + "jndiReferenceName": "java:comp/env/jdbc/data", + "Expose primary keys": "true", + "dbtype": "postgis", + "namespace": "http://{WORKSPACE}", + } + }, + "_default": False, + "disableOnConnFailure": False, + } with responses.RequestsMock() as rsps: rsps.get( - url=f"{geoserver.url}/rest/workspaces/{WORKSPACE}/datastores/{STORE}.json", - json=pg_payload, + url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores/{STORE}.json", status=200, + json=datastore_get_response, ) - result = geoserver.get_postgis_datastore(WORKSPACE, STORE) - assert json.loads(str(result)) == pg_payload + content, status_code = geoserver.get_pg_datastore(WORKSPACE, STORE) + assert content == expected_datastore + assert status_code == 200 -# Test the get_postgis_datastore method with a 404 error -def test_get_postgis_datastore_not_found(geoserver: GeoServerCloud) -> None: - datastore_name = "non_existing_datastore" +def test_get_pg_datastore_not_found(geoserver: GeoServerCloud) -> None: with responses.RequestsMock() as rsps: rsps.get( - url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores/{datastore_name}.json", - json={"error": "Datastore not found"}, + url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores/{STORE}.json", status=404, + body=b"No such datastore: test_workspace,test_datastore", ) - - not_existing_datastore = geoserver.get_postgis_datastore( - WORKSPACE, datastore_name - ) - assert not_existing_datastore is None + content, status_code = geoserver.get_pg_datastore(WORKSPACE, STORE) + assert content == "No such datastore: test_workspace,test_datastore" + assert status_code == 404 def test_create_pg_datastore( @@ -148,11 +193,11 @@ def test_create_pg_datastore( rsps.post( url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores.json", status=201, - json={"workspace": {"name": WORKSPACE}}, + body=b"test_store", match=[matchers.json_params_matcher(pg_payload)], ) - response = geoserver.create_pg_datastore( + content, code = geoserver.create_pg_datastore( workspace_name=WORKSPACE, datastore_name=STORE, pg_host=HOST, @@ -163,8 +208,8 @@ def test_create_pg_datastore( pg_schema=SCHEMA, ) - assert response - assert response.status_code == 201 + assert content == STORE + assert code == 201 def test_update_pg_datastore( @@ -178,10 +223,11 @@ def test_update_pg_datastore( rsps.put( url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores/{STORE}.json", status=200, + body=b"", match=[matchers.json_params_matcher(pg_payload)], ) - response = geoserver.create_pg_datastore( + content, code = geoserver.create_pg_datastore( workspace_name=WORKSPACE, datastore_name=STORE, pg_host=HOST, @@ -192,8 +238,8 @@ def test_update_pg_datastore( pg_schema=SCHEMA, ) - assert response - assert response.status_code == 200 + assert content == "" + assert code == 200 def test_create_jndi_datastore( @@ -207,10 +253,11 @@ def test_create_jndi_datastore( rsps.post( url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores.json", status=201, + body=b"test_store", match=[matchers.json_params_matcher(jndi_payload)], ) - response = geoserver.create_jndi_datastore( + content, code = geoserver.create_jndi_datastore( workspace_name=WORKSPACE, datastore_name=STORE, jndi_reference=JNDI, @@ -218,8 +265,8 @@ def test_create_jndi_datastore( description=DESCRIPTION, ) - assert response - assert response.status_code == 201 + assert content == STORE + assert code == 201 def test_update_jndi_datastore( @@ -233,10 +280,11 @@ def test_update_jndi_datastore( rsps.put( url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores/{STORE}.json", status=200, + body=b"", match=[matchers.json_params_matcher(jndi_payload)], ) - response = geoserver.create_jndi_datastore( + content, code = geoserver.create_jndi_datastore( workspace_name=WORKSPACE, datastore_name=STORE, jndi_reference=JNDI, @@ -244,5 +292,5 @@ def test_update_jndi_datastore( description=DESCRIPTION, ) - assert response - assert response.status_code == 200 + assert content == "" + assert code == 200 diff --git a/tests/test_feature_type.py b/tests/test_feature_type.py index 74d0ac3..9b9b153 100644 --- a/tests/test_feature_type.py +++ b/tests/test_feature_type.py @@ -60,3 +60,55 @@ def feature_type_payload() -> dict[str, dict[str, Any]]: }, } } + + +def test_create_feature_type( + geoserver: GeoServerCloud, feature_type_payload: dict[str, dict[str, Any]] +) -> None: + with responses.RequestsMock() as rsps: + rsps.get( + f"{geoserver.url}/rest/workspaces/{WORKSPACE}/datastores/{STORE}/featuretypes/{LAYER}.json", + status=404, + ) + rsps.post( + f"{geoserver.url}/rest/workspaces/{WORKSPACE}/datastores/{STORE}/featuretypes.json", + match=[responses.matchers.json_params_matcher(feature_type_payload)], + status=201, + body=b"", + ) + content, code = geoserver.create_feature_type( + workspace_name=WORKSPACE, + datastore=STORE, + layer=LAYER, + title={"en": "English"}, + abstract={"en": "English"}, + ) + + assert content == "" + assert code == 201 + + +def test_update_feature_type( + geoserver: GeoServerCloud, feature_type_payload: dict[str, dict[str, Any]] +) -> None: + with responses.RequestsMock() as rsps: + rsps.get( + f"{geoserver.url}/rest/workspaces/{WORKSPACE}/datastores/{STORE}/featuretypes/{LAYER}.json", + status=200, + ) + rsps.put( + f"{geoserver.url}/rest/workspaces/{WORKSPACE}/datastores/{STORE}/featuretypes/{LAYER}.json", + match=[responses.matchers.json_params_matcher(feature_type_payload)], + status=200, + body=b"", + ) + content, code = geoserver.create_feature_type( + workspace_name=WORKSPACE, + datastore=STORE, + layer=LAYER, + title={"en": "English"}, + abstract={"en": "English"}, + ) + + assert content == "" + assert code == 200 diff --git a/tests/test_gwc.py b/tests/test_gwc.py index 91ea7cb..2db5dc8 100644 --- a/tests/test_gwc.py +++ b/tests/test_gwc.py @@ -42,6 +42,7 @@ def test_publish_gwc_layer(geoserver: GeoServerCloud) -> None: rsps.put( url=f"{geoserver.url}/gwc/rest/layers/{WORKSPACE}:{LAYER}.json", status=200, + body=b"layer saved", match=[ responses.matchers.json_params_matcher( { @@ -57,9 +58,9 @@ def test_publish_gwc_layer(geoserver: GeoServerCloud) -> None: ], ) - response = geoserver.publish_gwc_layer(WORKSPACE, LAYER, EPSG) - assert response - assert response.status_code == 200 + content, code = geoserver.publish_gwc_layer(WORKSPACE, LAYER, EPSG) + assert content == "layer saved" + assert code == 200 def test_publish_gwc_layer_already_exists(geoserver: GeoServerCloud) -> None: @@ -79,8 +80,9 @@ def test_publish_gwc_layer_already_exists(geoserver: GeoServerCloud) -> None: json={"GeoServerLayer": {"name": f"{WORKSPACE}:{LAYER}"}}, ) - response = geoserver.publish_gwc_layer(WORKSPACE, LAYER, EPSG) - assert response is None + content, code = geoserver.publish_gwc_layer(WORKSPACE, LAYER, EPSG) + assert content == "" + assert code == 200 def test_create_gridset(geoserver: GeoServerCloud) -> None: @@ -91,12 +93,13 @@ def test_create_gridset(geoserver: GeoServerCloud) -> None: ) rsps.put( url=f"{geoserver.url}/gwc/rest/gridsets/EPSG:{EPSG}.xml", - status=200, + status=201, + body=b"", ) - response = geoserver.create_gridset(EPSG) - assert response - assert response.status_code == 200 + content, code = geoserver.create_gridset(EPSG) + assert content == "" + assert code == 201 def test_get_tile(geoserver: GeoServerCloud) -> None: diff --git a/tests/test_role.py b/tests/test_role.py index 4d3068e..55cb3bd 100644 --- a/tests/test_role.py +++ b/tests/test_role.py @@ -3,19 +3,6 @@ from geoservercloud import GeoServerCloud -def test_create_role(geoserver: GeoServerCloud) -> None: - role = "test_role" - with responses.RequestsMock() as rsps: - rsps.post( - url=f"{geoserver.url}/rest/security/roles/role/{role}.json", - status=201, - ) - - response = geoserver.create_role(role) - - assert response.status_code == 201 - - def test_create_role_if_not_exists_case_true(geoserver: GeoServerCloud) -> None: role = "test_role" with responses.RequestsMock() as rsps: @@ -29,9 +16,9 @@ def test_create_role_if_not_exists_case_true(geoserver: GeoServerCloud) -> None: status=201, ) - response = geoserver.create_role_if_not_exists(role) - assert response - assert response.status_code == 201 + content, code = geoserver.create_role(role) + assert content == "" + assert code == 201 def test_create_role_if_not_exists_case_false(geoserver: GeoServerCloud) -> None: @@ -43,8 +30,9 @@ def test_create_role_if_not_exists_case_false(geoserver: GeoServerCloud) -> None json={"roles": [role]}, ) - response = geoserver.create_role_if_not_exists(role) - assert response is None + content, code = geoserver.create_role(role) + assert content == "" + assert code == 200 def test_delete_role(geoserver: GeoServerCloud) -> None: @@ -53,11 +41,13 @@ def test_delete_role(geoserver: GeoServerCloud) -> None: rsps.delete( url=f"{geoserver.url}/rest/security/roles/role/{role}.json", status=200, + body=b"", ) - response = geoserver.delete_role(role) + content, code = geoserver.delete_role(role) - assert response.status_code == 200 + assert content == "" + assert code == 200 def test_get_user_roles(geoserver: GeoServerCloud) -> None: @@ -70,9 +60,10 @@ def test_get_user_roles(geoserver: GeoServerCloud) -> None: json={"roles": roles}, ) - response = geoserver.get_user_roles(user) + content, code = geoserver.get_user_roles(user) - assert response == roles + assert content == roles + assert code == 200 def test_assign_role_to_user(geoserver: GeoServerCloud) -> None: diff --git a/tests/test_user.py b/tests/test_user.py index 090d8d1..6b5f4ae 100644 --- a/tests/test_user.py +++ b/tests/test_user.py @@ -11,6 +11,7 @@ def test_create_user(geoserver: GeoServerCloud) -> None: rsps.post( url=f"{geoserver.url}/rest/security/usergroup/users.json", status=201, + body=b"", match=[ responses.matchers.json_params_matcher( { @@ -23,8 +24,9 @@ def test_create_user(geoserver: GeoServerCloud) -> None: ) ], ) - response = geoserver.create_user(TEST_USER, "test_password") - assert response.status_code == 201 + content, code = geoserver.create_user(TEST_USER, "test_password") + assert content == "" + assert code == 201 def test_update_user_password(geoserver: GeoServerCloud) -> None: @@ -32,6 +34,7 @@ def test_update_user_password(geoserver: GeoServerCloud) -> None: rsps.post( url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}.json", status=200, + body=b"", match=[ responses.matchers.json_params_matcher( { @@ -42,8 +45,9 @@ def test_update_user_password(geoserver: GeoServerCloud) -> None: ) ], ) - response = geoserver.update_user(TEST_USER, password="new_password") - assert response.status_code == 200 + content, code = geoserver.update_user(TEST_USER, "new_password") + assert content == "" + assert code == 200 def test_update_user_enabled(geoserver: GeoServerCloud) -> None: @@ -51,6 +55,7 @@ def test_update_user_enabled(geoserver: GeoServerCloud) -> None: rsps.post( url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}.json", status=200, + body=b"", match=[ responses.matchers.json_params_matcher( { @@ -61,8 +66,9 @@ def test_update_user_enabled(geoserver: GeoServerCloud) -> None: ) ], ) - response = geoserver.update_user(TEST_USER, enabled=False) - assert response.status_code == 200 + content, code = geoserver.update_user(TEST_USER, enabled=False) + assert content == "" + assert code == 200 def test_delete_user(geoserver: GeoServerCloud) -> None: @@ -70,6 +76,8 @@ def test_delete_user(geoserver: GeoServerCloud) -> None: rsps.delete( url=f"{geoserver.url}/rest/security/usergroup/user/{TEST_USER}.json", status=200, + body=b"", ) - response = geoserver.delete_user(TEST_USER) - assert response.status_code == 200 + content, code = geoserver.delete_user(TEST_USER) + assert content == "" + assert code == 200 diff --git a/tests/test_wms.py b/tests/test_wms.py index f1aadbc..1de7dee 100644 --- a/tests/test_wms.py +++ b/tests/test_wms.py @@ -215,5 +215,4 @@ def test_set_default_locale_for_service(geoserver: GeoServerCloud) -> None: ], ) - response = geoserver.set_default_locale_for_service(WORKSPACE, "en") - assert response.status_code == 200 + assert geoserver.set_default_locale_for_service(WORKSPACE, "en") == None diff --git a/tests/test_workspace.py b/tests/test_workspace.py index a2d28bf..87fb7be 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -6,38 +6,61 @@ def test_list_workspaces(geoserver: GeoServerCloud) -> None: + workspaces = [ + { + "name": "test_workspace", + "href": "http://localhost:8080/geoserver/rest/workspaces/test_workspace.json", + } + ] with responses.RequestsMock() as rsps: rsps.get( url=f"{GEOSERVER_URL}/rest/workspaces.json", status=200, - json={ - "workspaces": { - "workspace": [ - { - "name": "test_workspace", - "href": "http://localhost:8080/geoserver/rest/workspaces/test_workspace.json", - } - ] - } - }, + json={"workspaces": {"workspace": workspaces}}, + ) + assert geoserver.get_workspaces() == (workspaces, 200) + + +def test_get_workspace_ok(geoserver: GeoServerCloud) -> None: + workspace_name = "test_workspace" + workspace = {"name": workspace_name, "isolated": False} + with responses.RequestsMock() as rsps: + rsps.get( + url=f"{GEOSERVER_URL}/rest/workspaces/{workspace_name}.json", + status=200, + json={"workspace": workspace}, + ) + assert geoserver.get_workspace(workspace_name) == (workspace, 200) + + +def test_get_workspace_not_found(geoserver: GeoServerCloud) -> None: + workspace_name = "test_workspace" + with responses.RequestsMock() as rsps: + rsps.get( + url=f"{GEOSERVER_URL}/rest/workspaces/{workspace_name}.json", + status=404, + body=b"No such workspace: 'test_workspace' found", + ) + assert geoserver.get_workspace(workspace_name) == ( + "No such workspace: 'test_workspace' found", + 404, ) - workspaces = geoserver.get_workspaces() - assert workspaces.workspaces == ["test_workspace"] def test_create_workspace(geoserver: GeoServerCloud) -> None: - workspace = "test_workspace" + workspace_name = "test_workspace" isolated = True with responses.RequestsMock() as rsps: rsps.post( url=f"{GEOSERVER_URL}/rest/workspaces.json", status=201, + body=b"test_workspace", match=[ matchers.json_params_matcher( { "workspace": { - "name": workspace, + "name": workspace_name, "isolated": isolated, } } @@ -45,27 +68,50 @@ def test_create_workspace(geoserver: GeoServerCloud) -> None: ], ) - response = geoserver.create_workspace(workspace, isolated=isolated) + content, status_code = geoserver.create_workspace(workspace_name, isolated) - assert response.status_code == 201 + assert content == workspace_name + assert status_code == 201 def test_update_workspace(geoserver: GeoServerCloud) -> None: - workspace = "test_workspace" + workspace_name = "test_workspace" with responses.RequestsMock() as rsps: rsps.post( url=f"{GEOSERVER_URL}/rest/workspaces.json", status=409, + match=[ + matchers.json_params_matcher( + { + "workspace": { + "name": workspace_name, + "isolated": False, + } + } + ) + ], ) rsps.put( - url=f"{GEOSERVER_URL}/rest/workspaces/{workspace}.json", + url=f"{GEOSERVER_URL}/rest/workspaces/{workspace_name}.json", + match=[ + matchers.json_params_matcher( + { + "workspace": { + "name": workspace_name, + "isolated": False, + } + } + ) + ], status=200, + body=b"", ) - response = geoserver.create_workspace(workspace) + content, status_code = geoserver.create_workspace(workspace_name) - assert response.status_code == 200 + assert content == "" + assert status_code == 200 def test_delete_workspace(geoserver: GeoServerCloud) -> None: @@ -75,29 +121,42 @@ def test_delete_workspace(geoserver: GeoServerCloud) -> None: rsps.delete( url=f"{GEOSERVER_URL}/rest/workspaces/{workspace}.json", status=200, + body=b"", ) - response = geoserver.delete_workspace(workspace) - - assert response.status_code == 200 + content, status_code = geoserver.delete_workspace(workspace) + assert content == "" + assert status_code == 200 def test_recreate_workspace(geoserver: GeoServerCloud) -> None: - workspace = "test_workspace" + workspace_name = "test_workspace" with responses.RequestsMock() as rsps: rsps.delete( - url=f"{GEOSERVER_URL}/rest/workspaces/{workspace}.json", + url=f"{GEOSERVER_URL}/rest/workspaces/{workspace_name}.json", status=200, + body=b"", ) rsps.post( url=f"{GEOSERVER_URL}/rest/workspaces.json", status=201, + body=b"test_workspace", + match=[ + matchers.json_params_matcher( + { + "workspace": { + "name": workspace_name, + "isolated": False, + } + } + ) + ], ) - response = geoserver.recreate_workspace(workspace) - - assert response.status_code == 201 + content, status_code = geoserver.recreate_workspace(workspace_name) + assert content == workspace_name + assert status_code == 201 def test_publish_workspace(geoserver: GeoServerCloud) -> None: @@ -155,6 +214,6 @@ def test_publish_workspace(geoserver: GeoServerCloud) -> None: ], ) - response = geoserver.publish_workspace(workspace) - - assert response.status_code == 200 + content, status_code = geoserver.publish_workspace(workspace) + assert content == "" + assert status_code == 200