diff --git a/geoservercloud/geoservercloud.py b/geoservercloud/geoservercloud.py index 2b81c1c..8cc3604 100644 --- a/geoservercloud/geoservercloud.py +++ b/geoservercloud/geoservercloud.py @@ -3,7 +3,6 @@ from pathlib import Path from typing import Any -import xmltodict from owslib.map.wms130 import WebMapService_1_3_0 from owslib.util import ResponseWrapper from owslib.wmts import WebMapTileService @@ -11,27 +10,31 @@ from geoservercloud import utils from geoservercloud.models import ( - DataStores, FeatureType, FeatureTypes, - KeyDollarListDict, PostGisDataStore, Style, Styles, Workspace, - Workspaces, -) -from geoservercloud.services import ( - AclEndpoints, - GwcEndpoints, - OwsEndpoints, - RestEndpoints, - RestService, ) +from geoservercloud.services import OwsService, RestService from geoservercloud.templates import Templates class GeoServerCloud: + """ + Facade class allowing CRUD operations on GeoServer resources + + Attributes + ---------- + url : str + base GeoServer URL + user : str + GeoServer username + password : str + GeoServer password + """ + def __init__( self, url: str = "http://localhost:9090/geoserver/cloud", @@ -44,10 +47,7 @@ def __init__( self.password: str = password self.auth: tuple[str, str] = (user, password) self.rest_service: RestService = RestService(url, self.auth) - self.acl_endpoints: AclEndpoints = AclEndpoints() - self.gwc_endpoints: GwcEndpoints = GwcEndpoints() - self.ows_endpoints: OwsEndpoints = OwsEndpoints() - self.rest_endpoints: RestEndpoints = RestEndpoints() + self.ows_service: OwsService = OwsService(url, self.auth) self.wms: WebMapService_1_3_0 | None = None self.wmts: WebMapTileService | None = None self.default_workspace: str | None = None @@ -65,99 +65,90 @@ def get_wmts_layer_bbox( def create_wms(self) -> None: if self.default_workspace: - path: str = self.ows_endpoints.workspace_wms(self.default_workspace) + self.wms = self.ows_service.create_wms(self.default_workspace) else: - path = self.ows_endpoints.wms() - self.wms = WebMapService_1_3_0( - f"{self.url}{path}", - username=self.user, - password=self.password, - timeout=240, - ) + self.wms = self.ows_service.create_wms() def create_wmts(self) -> None: - path = self.ows_endpoints.wmts() - self.wmts = WebMapTileService( - f"{self.url}{path}", - version="1.0.0", - username=self.user, - password=self.password, - ) + self.wmts = self.ows_service.create_wmts() + + def get_workspaces(self) -> tuple[list[dict[str, str]] | str, int]: + """ + Get all GeoServer workspaces + """ + workspaces, status_code = self.rest_service.get_workspaces() + if isinstance(workspaces, str): + return workspaces, status_code + return workspaces.aslist(), status_code - def get_workspaces(self) -> Workspaces: - response: Response = self.get_request(self.rest_endpoints.workspaces()) - workspaces = Workspaces.from_dict(response.json()) - return workspaces + def get_workspace(self, workspace_name: str) -> tuple[dict[str, str] | str, int]: + """ + Get a workspace by name + """ + workspace, status_code = self.rest_service.get_workspace(workspace_name) + if isinstance(workspace, str): + return workspace, status_code + return workspace.asdict(), status_code def create_workspace( self, workspace_name: str, isolated: bool = False, set_default_workspace: bool = False, - ) -> Response: + ) -> tuple[str, int]: """ Create a workspace in GeoServer, if it does not already exist. It if exists, update it """ - response: Response = self.post_request( - self.rest_endpoints.workspaces(), - json=Workspace(workspace_name, isolated).post_payload(), - ) - if response.status_code == 409: - response = self.put_request( - self.rest_endpoints.workspace(workspace_name), - json=Workspace(workspace_name, isolated).put_payload(), - ) + workspace = Workspace(workspace_name, isolated) + content, status_code = self.rest_service.create_workspace(workspace) if set_default_workspace: self.default_workspace = workspace_name - return response + return content, status_code - def delete_workspace(self, workspace_name: str) -> Response: + def delete_workspace(self, workspace_name: str) -> tuple[str, int]: """ Delete a GeoServer workspace (recursively) """ - response: Response = self.delete_request( - self.rest_endpoints.workspace(workspace_name), params={"recurse": "true"} + content, status_code = self.rest_service.delete_workspace( + Workspace(workspace_name) ) if self.default_workspace == workspace_name: self.default_workspace = None self.wms = None self.wmts = None - return response + return content, status_code def recreate_workspace( - self, workspace_name: str, set_default_workspace: bool = False - ) -> Response: + self, + workspace_name: str, + isolated: bool = False, + set_default_workspace: bool = False, + ) -> tuple[str, int]: """ Create a workspace in GeoServer, and first delete it if it already exists. """ self.delete_workspace(workspace_name) return self.create_workspace( - workspace_name, set_default_workspace=set_default_workspace + workspace_name, + isolated=isolated, + set_default_workspace=set_default_workspace, ) - def publish_workspace(self, workspace_name) -> Response: + def publish_workspace(self, workspace_name) -> tuple[str, int]: """ Publish the WMS service for a given workspace """ - data: dict[str, dict[str, Any]] = Templates.workspace_wms(workspace_name) - return self.put_request( - self.rest_endpoints.workspace_wms_settings(workspace_name), json=data - ) + return self.rest_service.publish_workspace(Workspace(workspace_name)) def set_default_locale_for_service( self, workspace_name: str, locale: str | None - ) -> Response: + ) -> None: """ Set a default language for localized WMS requests """ - data: dict[str, dict[str, Any]] = { - "wms": { - "defaultLocale": locale, - } - } - return self.put_request( - self.rest_endpoints.workspace_wms_settings(workspace_name), json=data + self.rest_service.set_default_locale_for_service( + Workspace(workspace_name), locale ) def unset_default_locale_for_service(self, workspace_name) -> None: @@ -166,26 +157,29 @@ def unset_default_locale_for_service(self, workspace_name) -> None: """ self.set_default_locale_for_service(workspace_name, None) - def get_datastores(self, workspace_name: str) -> dict[str, Any]: + def get_datastores( + self, workspace_name: str + ) -> tuple[list[dict[str, str]] | str, int]: """ Get all datastores for a given workspace """ - response = self.get_request(self.rest_endpoints.datastores(workspace_name)) - return DataStores.from_dict(response.json()).datastores + datastores, status_code = self.rest_service.get_datastores(workspace_name) + if isinstance(datastores, str): + return datastores, status_code + return datastores.aslist(), status_code - def get_postgis_datastore( + def get_pg_datastore( self, workspace_name: str, datastore_name: str - ) -> dict[str, Any] | None: + ) -> tuple[dict[str, Any] | str, int]: """ - Get a specific datastore + Get a datastore by workspace and name """ - response = self.get_request( - self.rest_endpoints.datastore(workspace_name, datastore_name) + datastore, status_code = self.rest_service.get_pg_datastore( + workspace_name, datastore_name ) - if response.status_code == 404: - return None - else: - return PostGisDataStore.from_dict(response.json()) + if isinstance(datastore, str): + return datastore, status_code + return datastore.asdict(), status_code def create_pg_datastore( self, @@ -199,11 +193,10 @@ def create_pg_datastore( pg_schema: str = "public", description: str | None = None, set_default_datastore: bool = False, - ) -> Response | None: + ) -> tuple[str, int]: """ Create a PostGIS datastore from the DB connection parameters, or update it if it already exist. """ - response: None | Response = None datastore = PostGisDataStore( workspace_name, datastore_name, @@ -218,27 +211,17 @@ def create_pg_datastore( "namespace": f"http://{workspace_name}", "Expose primary keys": "true", }, - data_store_type="PostGIS", + type="PostGIS", description=description, ) - payload = datastore.put_payload() - - if not self.resource_exists( - self.rest_endpoints.datastore(workspace_name, datastore_name) - ): - response = self.post_request( - self.rest_endpoints.datastores(workspace_name), json=payload - ) - else: - response = self.put_request( - self.rest_endpoints.datastore(workspace_name, datastore_name), - json=payload, - ) + content, status_code = self.rest_service.create_pg_datastore( + workspace_name, datastore + ) if set_default_datastore: self.default_datastore = datastore_name - return response + return content, status_code def create_jndi_datastore( self, @@ -248,11 +231,10 @@ def create_jndi_datastore( pg_schema: str = "public", description: str | None = None, set_default_datastore: bool = False, - ) -> Response | None: + ) -> tuple[str, int]: """ Create a PostGIS datastore from JNDI resource, or update it if it already exist. """ - response: None | Response = None datastore = PostGisDataStore( workspace_name, datastore_name, @@ -263,26 +245,17 @@ def create_jndi_datastore( "namespace": f"http://{workspace_name}", "Expose primary keys": "true", }, - data_store_type="PostGIS (JNDI)", + type="PostGIS (JNDI)", description=description, ) - payload = datastore.put_payload() - if not self.resource_exists( - self.rest_endpoints.datastore(workspace_name, datastore_name) - ): - response = self.post_request( - self.rest_endpoints.datastores(workspace_name), json=payload - ) - else: - response = self.put_request( - self.rest_endpoints.datastore(workspace_name, datastore_name), - json=payload, - ) + content, code = self.rest_service.create_jndi_datastore( + workspace_name, datastore + ) if set_default_datastore: self.default_datastore = datastore_name - return response + return content, code def create_wmts_store( self, @@ -554,48 +527,13 @@ def set_default_layer_style( self.rest_endpoints.workspace_layer(workspace_name, layer), json=data ) - def get_wms_capabilities( - self, workspace_name: str, accept_languages=None - ) -> dict[str, Any]: - path: str = self.ows_endpoints.workspace_wms(workspace_name) - params: dict[str, str] = { - "service": "WMS", - "version": "1.3.0", - "request": "GetCapabilities", - } - if accept_languages: - params["AcceptLanguages"] = accept_languages - response: Response = self.get_request(path, params=params) - return xmltodict.parse(response.content) - def get_wms_layers( self, workspace_name: str, accept_languages: str | None = None ) -> Any | dict[str, Any]: - capabilities: dict[str, Any] = self.get_wms_capabilities( - workspace_name, accept_languages - ) - try: - return capabilities["WMS_Capabilities"]["Capability"]["Layer"] - except KeyError: - return capabilities - - def get_wfs_capabilities(self, workspace_name: str) -> dict[str, Any]: - params: dict[str, str] = { - "service": "WFS", - "version": "1.1.0", - "request": "GetCapabilities", - } - response: Response = self.get_request( - self.ows_endpoints.workspace_wfs(workspace_name), params=params - ) - return xmltodict.parse(response.content) + return self.ows_service.get_wms_layers(workspace_name, accept_languages) def get_wfs_layers(self, workspace_name: str) -> Any | dict[str, Any]: - capabilities: dict[str, Any] = self.get_wfs_capabilities(workspace_name) - try: - return capabilities["wfs:WFS_Capabilities"]["FeatureTypeList"] - except KeyError: - return capabilities + return self.ows_service.get_wfs_layers(workspace_name) def get_map( self, @@ -674,24 +612,9 @@ def get_legend_graphic( """ WMS GetLegendGraphic request """ - path: str - if not workspace_name: - path = self.ows_endpoints.wms() - else: - path = self.ows_endpoints.workspace_wms(workspace_name) - params: dict[str, Any] = { - "service": "WMS", - "version": "1.3.0", - "request": "GetLegendGraphic", - "format": format, - "layer": layer, - } - if language: - params["language"] = language - if style: - params["style"] = style - response: Response = self.get_request(path, params=params) - return response + return self.ows_service.get_legend_graphic( + layer, format, language, style, workspace_name + ) def get_tile( self, layer, format, tile_matrix_set, tile_matrix, row, column @@ -719,81 +642,38 @@ def get_feature( feature_id: int | None = None, max_feature: int | None = None, format: str = "application/json", - ) -> dict[str, Any] | bytes: + ) -> dict[str, Any] | str: """WFS GetFeature request - Return the feature(s) as dict if found, otherwise return the raw response content as bytes + Return the feature(s) as dict if found, otherwise return the response content as string """ # FIXME: we should consider also the global wfs endpoint - path = self.ows_endpoints.workspace_wfs(workspace_name) - params = { - "service": "WFS", - "version": "1.1.0", - "request": "GetFeature", - "typeName": type_name, - "outputFormat": format, - } - if feature_id: - params["featureID"] = str(feature_id) - if max_feature: - params["maxFeatures"] = str(max_feature) - response = self.get_request(path, params=params) - try: - return response.json() - except JSONDecodeError: - return response.content + return self.ows_service.get_feature( + workspace_name, type_name, feature_id, max_feature, format + ) def describe_feature_type( self, workspace_name: str | None = None, type_name: str | None = None, format: str = "application/json", - ) -> dict[str, Any] | bytes: + ) -> dict[str, Any] | str: """WFS DescribeFeatureType request - Return the feature type(s) as dict if found, otherwise return the raw response content as bytes + Return the feature type(s) as dict if found, otherwise return the response content as string """ - if not workspace_name: - path = self.ows_endpoints.wfs() - else: - path = self.ows_endpoints.workspace_wfs(workspace_name) - params = { - "service": "WFS", - "version": "1.1.0", - "request": "DescribeFeatureType", - "outputFormat": format, - } - if type_name: - params["typeName"] = type_name - response = self.get_request(path, params=params) - try: - return response.json() - except JSONDecodeError: - return response.content + return self.ows_service.describe_feature_type(workspace_name, type_name, format) def get_property_value( self, workspace_name: str, type_name: str, property: str, - ) -> dict | list | bytes: + ) -> dict | list | str: """WFS GetPropertyValue request - Return the properties as dict (if one feature was found), a list (if multiple features were found) - or an empty dict if no feature was found. Otherwise throw a requests.exceptions.HTTPError + Return the properties as dict (if one feature was found), a list (if multiple features were found), + an empty dict if no feature was found or the response content as string """ # FIXME: we should consider also the global wfs endpoint - path = self.ows_endpoints.workspace_wfs(workspace_name) - params = { - "service": "WFS", - "version": "2.0.0", - "request": "GetPropertyValue", - "typeNames": type_name, - "valueReference": property, - } - response = self.get_request(path, params=params) - value_collection = xmltodict.parse(response.content).get("wfs:ValueCollection") - if not value_collection: - return response.content - else: - return value_collection.get("wfs:member", {}) + return self.ows_service.get_property_value(workspace_name, type_name, property) def create_user(self, user: str, password: str, enabled: bool = True) -> Response: """ @@ -1010,15 +890,6 @@ def create_or_update_resource(self, path, resource_path, payload) -> Response: else: return self.put_request(resource_path, json=payload) - def resource_exists(self, path: str) -> bool: - """ - Check if a resource (given its path) exists in GeoServer - """ - # GeoServer raises a 500 when posting to a datastore or feature type that already exists, so first do - # a get request - response = self.get_request(path) - return response.status_code == 200 - def get_request( self, path, diff --git a/geoservercloud/models/__init__.py b/geoservercloud/models/__init__.py index c28cea9..12f9802 100644 --- a/geoservercloud/models/__init__.py +++ b/geoservercloud/models/__init__.py @@ -1,4 +1,4 @@ -from .common import I18N, KeyDollarListDict +from .common import BaseModel, EntityModel, ListModel, I18N, KeyDollarListDict from .datastore import PostGisDataStore from .datastores import DataStores from .featuretype import FeatureType @@ -9,8 +9,11 @@ from .workspaces import Workspaces __all__ = [ + "BaseModel", "DataStores", + "EntityModel", "KeyDollarListDict", + "ListModel", "FeatureType", "FeatureTypes", "I18N", diff --git a/geoservercloud/models/common.py b/geoservercloud/models/common.py index 3028ed7..ebada34 100644 --- a/geoservercloud/models/common.py +++ b/geoservercloud/models/common.py @@ -1,11 +1,34 @@ import json -import logging from typing import Any -log = logging.getLogger() + +class BaseModel: + @classmethod + def from_get_response_payload(cls, content: dict): + raise NotImplementedError + + +class EntityModel(BaseModel): + def asdict(self) -> dict[str, Any]: + raise NotImplementedError + + def post_payload(self) -> dict[str, Any]: + raise NotImplementedError + + def put_payload(self) -> dict[str, Any]: + raise NotImplementedError + + +class ListModel(BaseModel): + def aslist(self) -> list: + raise NotImplementedError class KeyDollarListDict(dict): + + key_prefix: str = "@key" + value_prefix: str = "$" + def __init__( self, input_list: list | None = None, @@ -14,13 +37,10 @@ def __init__( **kwargs ): super().__init__(*args, **kwargs) - self.key_prefix = "@key" - self.value_prefix = "$" if input_list: self.deserialize(input_list) if input_dict: self.update(input_dict) - log.debug(self) def deserialize(self, input_list: list): for item in input_list: diff --git a/geoservercloud/models/datastore.py b/geoservercloud/models/datastore.py index 107721a..0e8c36b 100644 --- a/geoservercloud/models/datastore.py +++ b/geoservercloud/models/datastore.py @@ -1,74 +1,78 @@ import json -import logging +from typing import Any -from requests.models import Response +from geoservercloud.models import EntityModel, KeyDollarListDict -from . import KeyDollarListDict - -log = logging.getLogger() - - -class PostGisDataStore: +class PostGisDataStore(EntityModel): def __init__( self, workspace_name: str, - data_store_name: str, + name: str, connection_parameters: dict, - data_store_type: str = "PostGIS", + type: str = "PostGIS", enabled: bool = True, description: str | None = None, + default: bool | None = None, + disable_on_conn_failure: bool | None = None, ) -> None: - self.workspace_name = workspace_name - self.data_store_name = data_store_name + self.workspace_name: str = workspace_name + self._name: str = name self.connection_parameters = KeyDollarListDict(input_dict=connection_parameters) - self.data_store_type = data_store_type - self.description = description - self.enabled = enabled + self.type: str = type + self.description: str | None = description + self.enabled: bool = enabled + self._default: bool | None = default + self.disable_on_conn_failure: bool | None = disable_on_conn_failure @property - def name(self): - return self.data_store_name + def name(self) -> str: + return self._name - def post_payload(self): - payload = { - "dataStore": { - "name": self.data_store_name, - "type": self.data_store_type, - "connectionParameters": { - "entry": self.connection_parameters.serialize() - }, - } + def asdict(self) -> dict[str, Any]: + content: dict[str, Any] = { + "name": self._name, + "type": self.type, + "connectionParameters": {"entry": dict(self.connection_parameters)}, + "workspace": self.workspace_name, } if self.description: - payload["dataStore"]["description"] = self.description + content["description"] = self.description if self.enabled: - payload["dataStore"]["enabled"] = self.enabled - return payload + content["enabled"] = self.enabled + if self._default is not None: + content["_default"] = self._default + if self.disable_on_conn_failure is not None: + content["disableOnConnFailure"] = self.disable_on_conn_failure + return content + + def post_payload(self) -> dict[str, Any]: + content = self.asdict() + content["connectionParameters"] = { + "entry": self.connection_parameters.serialize() + } + content["workspace"] = {"name": self.workspace_name} + return {"dataStore": content} - def put_payload(self): - payload = self.post_payload() - return payload + def put_payload(self) -> dict[str, Any]: + return self.post_payload() @classmethod - def from_dict(cls, content: dict): - connection_parameters = cls.parse_connection_parameters(content) + def from_get_response_payload(cls, content: dict): + data_store = content["dataStore"] + connection_parameters = KeyDollarListDict( + input_list=data_store["connectionParameters"]["entry"] + ) return cls( - content.get("dataStore", {}).get("workspace", {}).get("name", None), - content.get("dataStore", {}).get("name", None), + data_store["workspace"]["name"], + data_store["name"], connection_parameters, - content.get("dataStore", {}).get("type", "PostGIS"), - content.get("dataStore", {}).get("enabled", True), - content.get("dataStore", {}).get("description", None), - ) - - @classmethod - def parse_connection_parameters(cls, content): - return KeyDollarListDict( - content.get("dataStore", {}) - .get("connectionParameters", {}) - .get("entry", []) + data_store.get("type", "PostGIS"), + data_store.get("enabled", True), + data_store.get("description", None), + data_store.get("_default", None), + data_store.get("disableOnConnFailure", None), ) - def __repr__(self): + def __repr__(self) -> str: return json.dumps(self.put_payload(), indent=4) diff --git a/geoservercloud/models/datastores.py b/geoservercloud/models/datastores.py index fdd8811..b682e17 100644 --- a/geoservercloud/models/datastores.py +++ b/geoservercloud/models/datastores.py @@ -1,32 +1,19 @@ -import logging +from geoservercloud.models import ListModel -from requests.models import Response -log = logging.getLogger() - - -class DataStores: - - def __init__(self, workspace_name: str, datastores: list[str] = []) -> None: - self.workspace_name = workspace_name - self._datastores = datastores - - @property - def datastores(self): - return self._datastores +class DataStores(ListModel): + def __init__(self, datastores: list[dict[str, str]] = []) -> None: + self._datastores: list[dict[str, str]] = datastores @classmethod - def from_dict(cls, content: dict): - datastores = [] - workspace_name = ( - content.get("dataStores", {}).get("workspace", {}).get("name", None) - ) + def from_get_response_payload(cls, content: dict): + datastores: str | dict = content["dataStores"] + if not datastores: + return cls() + return cls(datastores["dataStore"]) # type: ignore - for store in content.get("dataStores", {}).get("dataStore", []): - datastores.append(store["name"]) - for data_store_name in datastores: - log.debug(f"Name: {data_store_name}") - return cls(workspace_name, datastores) + def __repr__(self) -> str: + return str(self._datastores) - def __repr__(self): - return str(self.datastores) + def aslist(self) -> list[dict[str, str]]: + return self._datastores diff --git a/geoservercloud/models/featuretype.py b/geoservercloud/models/featuretype.py index 3ad5712..bd94b29 100644 --- a/geoservercloud/models/featuretype.py +++ b/geoservercloud/models/featuretype.py @@ -1,12 +1,10 @@ import json -from requests.models import Response - -from geoservercloud.models import I18N +from geoservercloud.models import EntityModel, I18N # TODO: import more default values from Templates -class FeatureType: +class FeatureType(EntityModel): def __init__( self, namespace_name: str, diff --git a/geoservercloud/models/featuretypes.py b/geoservercloud/models/featuretypes.py index 463ddad..e27ea2c 100644 --- a/geoservercloud/models/featuretypes.py +++ b/geoservercloud/models/featuretypes.py @@ -1,10 +1,9 @@ import json -from requests.models import Response +from geoservercloud.models import ListModel -class FeatureTypes: - +class FeatureTypes(ListModel): def __init__(self, featuretypes: list = []) -> None: self._featuretypes = featuretypes diff --git a/geoservercloud/models/style.py b/geoservercloud/models/style.py index af67889..fbde64e 100644 --- a/geoservercloud/models/style.py +++ b/geoservercloud/models/style.py @@ -1,11 +1,11 @@ import json import xmltodict -from requests.models import Response +from geoservercloud.models import EntityModel -class Style: +class Style(EntityModel): def __init__( self, name: str, diff --git a/geoservercloud/models/styles.py b/geoservercloud/models/styles.py index 92af7ec..26c1aaa 100644 --- a/geoservercloud/models/styles.py +++ b/geoservercloud/models/styles.py @@ -1,8 +1,7 @@ -from requests.models import Response +from geoservercloud.models import ListModel -class Styles: - +class Styles(ListModel): def __init__(self, styles: list[str], workspace: str | None = None) -> None: self._workspace = workspace self._styles = styles diff --git a/geoservercloud/models/workspace.py b/geoservercloud/models/workspace.py index 29448f0..504b03b 100644 --- a/geoservercloud/models/workspace.py +++ b/geoservercloud/models/workspace.py @@ -1,31 +1,31 @@ import json -import logging +from typing import Any -from requests.models import Response +from geoservercloud.models import EntityModel -log = logging.getLogger() - - -class Workspace: +class Workspace(EntityModel): def __init__(self, name: str, isolated: bool = False) -> None: - self.name = name - self.isolated = isolated + self.name: str = name + self.isolated: bool = isolated + + def asdict(self) -> dict[str, Any]: + return { + "name": self.name, + "isolated": self.isolated, + } - def put_payload(self): - payload = {"workspace": {"name": self.name}} - if self.isolated: - payload["workspace"]["isolated"] = self.isolated - return payload + def put_payload(self) -> dict[str, dict[str, Any]]: + return {"workspace": self.asdict()} - def post_payload(self): + def post_payload(self) -> dict[str, dict[str, str]]: return self.put_payload() @classmethod - def from_dict(cls, content: dict): + def from_get_response_payload(cls, content: dict): return cls( - content.get("workspace", {}).get("name", None), - content.get("workspace", {}).get("isolated", False), + content["workspace"]["name"], + content["workspace"]["isolated"], ) def __repr__(self): diff --git a/geoservercloud/models/workspaces.py b/geoservercloud/models/workspaces.py index 7d33561..c8f4a6b 100644 --- a/geoservercloud/models/workspaces.py +++ b/geoservercloud/models/workspaces.py @@ -1,33 +1,25 @@ -import logging +from geoservercloud.models import ListModel -from requests.models import Response -log = logging.getLogger() - - -class Workspaces: - - def __init__(self, workspaces: list = []) -> None: +class Workspaces(ListModel): + def __init__(self, workspaces: list[dict[str, str]] = []) -> None: self._workspaces = workspaces - def find(self, workspace_name: str): - return self.workspaces.get(workspace_name, None) - - @property - def workspaces(self): - return self._workspaces + def find(self, workspace_name: str) -> dict[str, str] | None: + for ws in self._workspaces: + if ws["name"] == workspace_name: + return ws @classmethod - def from_dict(cls, content: dict): + def from_get_response_payload(cls, content: dict): - workspaces = [] - # Map the response to a list of Workspace instances - for ws in content.get("workspaces", {}).get("workspace", []): - workspaces.append(ws["name"]) + workspaces: str | dict = content["workspaces"] + if not workspaces: + return cls() + return cls(workspaces["workspace"]) # type: ignore - # Now 'workspaces' is a list of Workspace instances - log.debug("Parsed Workspaces:") - for workspace in workspaces: - log.debug(f"Name: {workspace}") + def __repr__(self) -> str: + return str(self._workspaces) - return cls(workspaces) + def aslist(self) -> list[dict[str, str]]: + return self._workspaces diff --git a/geoservercloud/services/__init__.py b/geoservercloud/services/__init__.py index d885ea3..e408a06 100644 --- a/geoservercloud/services/__init__.py +++ b/geoservercloud/services/__init__.py @@ -1,10 +1,7 @@ -from .endpoints import AclEndpoints, GwcEndpoints, OwsEndpoints, RestEndpoints +from .owsservice import OwsService from .restservice import RestService __all__ = [ + "OwsService", "RestService", - "AclEndpoints", - "OwsEndpoints", - "GwcEndpoints", - "RestEndpoints", ] diff --git a/geoservercloud/services/endpoints.py b/geoservercloud/services/endpoints.py deleted file mode 100644 index 6a7f557..0000000 --- a/geoservercloud/services/endpoints.py +++ /dev/null @@ -1,174 +0,0 @@ -class AclEndpoints: - def __init__(self, base_url: str = "/acl") -> None: - self.base_url: str = base_url - - def adminrules(self) -> str: - return f"{self.base_url}/api/adminrules" - - def adminrule(self, id: int) -> str: - return f"{self.base_url}/api/adminrules/id/{id}" - - def rules(self) -> str: - return f"{self.base_url}/api/rules" - - -class GwcEndpoints: - def __init__(self, base_url: str = "/gwc/rest") -> None: - self.base_url: str = base_url - - def reload(self) -> str: - return f"{self.base_url}/reload" - - def layers(self, workspace_name: str) -> str: - return f"{self.base_url}/layers.json" - - def layer(self, workspace_name: str, layer_name: str) -> str: - return f"{self.base_url}/layers/{workspace_name}:{layer_name}.json" - - def gridsets(self) -> str: - return f"{self.base_url}/gridsets.json" - - def gridset(self, epsg: int) -> str: - return f"{self.base_url}/gridsets/EPSG:{str(epsg)}.xml" - - -class OwsEndpoints: - def __init__(self, base_url: str = "") -> None: - self.base_url: str = base_url - - def ows(self) -> str: - return f"{self.base_url}/ows" - - def wms(self) -> str: - return f"{self.base_url}/wms" - - def wfs(self) -> str: - return f"{self.base_url}/wfs" - - def wcs(self) -> str: - return f"{self.base_url}/wcs" - - def wmts(self) -> str: - return f"{self.base_url}/gwc/service/wmts" - - def workspace_ows(self, workspace_name: str) -> str: - return f"{self.base_url}/{workspace_name}/ows" - - def workspace_wms(self, workspace_name: str) -> str: - return f"{self.base_url}/{workspace_name}/wms" - - def workspace_wfs(self, workspace_name: str) -> str: - return f"{self.base_url}/{workspace_name}/wfs" - - def workspace_wcs(self, workspace_name: str) -> str: - return f"{self.base_url}/{workspace_name}/wcs" - - -class RestEndpoints: - def __init__(self, base_url: str = "/rest") -> None: - self.base_url: str = base_url - - def styles(self) -> str: - return f"{self.base_url}/styles.json" - - def style(self, style_name: str) -> str: - return f"{self.base_url}/styles/{style_name}.json" - - def workspaces(self) -> str: - return f"{self.base_url}/workspaces.json" - - def workspace(self, workspace_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}.json" - - def workspace_styles(self, workspace_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/styles.json" - - def workspace_style(self, workspace_name: str, style_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/styles/{style_name}.json" - - def workspace_layer(self, workspace_name: str, layer_name: str) -> str: - return f"{self.base_url}/layers/{workspace_name}:{layer_name}.json" - - def workspace_wms_settings(self, workspace_name: str) -> str: - return f"{self.base_url}/services/wms/workspaces/{workspace_name}/settings.json" - - def workspace_wfs_settings(self, workspace_name: str) -> str: - return f"{self.base_url}/services/wfs/workspaces/{workspace_name}/settings.json" - - def datastores(self, workspace_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/datastores.json" - - def datastore(self, workspace_name: str, datastore_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}.json" - - def featuretypes(self, workspace_name: str, datastore_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}/featuretypes.json" - - def featuretype( - self, workspace_name: str, datastore_name: str, featuretype_name: str - ) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}/featuretypes/{featuretype_name}.json" - - def layergroup(self, workspace_name: str, layergroup_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/layergroups/{layergroup_name}.json" - - def layergroups(self, workspace_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/layergroups.json" - - def coveragestores(self, workspace_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/coveragestores.json" - - def coveragestore(self, workspace_name: str, coveragestore_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}.json" - - def coverages(self, workspace_name: str, coveragestore_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}/coverages.json" - - def coverage( - self, workspace_name: str, coveragestore_name: str, coverage_name: str - ) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}/coverages/{coverage_name}.json" - - def wmsstores(self, workspace_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/wmsstores.json" - - def wmsstore(self, workspace_name: str, wmsstore_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/wmsstores/{wmsstore_name}.json" - - def wmtsstores(self, workspace_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores.json" - - def wmtsstore(self, workspace_name: str, wmtsstore_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}.json" - - def wmtslayers(self, workspace_name: str, wmtsstore_name: str) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}/layers.json" - - def wmtslayer( - self, workspace_name: str, wmtsstore_name: str, wmtslayer_name: str - ) -> str: - return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}/layers/{wmtslayer_name}.json" - - def namespaces(self) -> str: - return f"{self.base_url}/namespaces.json" - - def namespace(self, namespace_name: str) -> str: - return f"{self.base_url}/namespaces/{namespace_name}.json" - - def users(self) -> str: - return f"{self.base_url}/security/usergroup/users.json" - - def user(self, username: str) -> str: - return f"{self.base_url}/security/usergroup/user/{username}.json" - - def roles(self) -> str: - return f"{self.base_url}/security/roles.json" - - def user_roles(self, username: str) -> str: - return f"{self.base_url}/security/roles/user/{username}.json" - - def role(self, role_name: str) -> str: - return f"{self.base_url}/security/roles/role/{role_name}.json" - - def role_user(self, role_name: str, username: str) -> str: - return f"{self.base_url}/security/roles/role/{role_name}/user/{username}.json" diff --git a/geoservercloud/services/owsservice.py b/geoservercloud/services/owsservice.py new file mode 100644 index 0000000..6e2596b --- /dev/null +++ b/geoservercloud/services/owsservice.py @@ -0,0 +1,214 @@ +from json import JSONDecodeError +from typing import Any +from owslib.map.wms130 import WebMapService_1_3_0 +from owslib.wmts import WebMapTileService +from requests import Response +import xmltodict + +from geoservercloud.services.restclient import RestClient + + +class OwsService: + def __init__(self, url: str, auth: tuple[str, str]) -> None: + self.url: str = url + self.auth: tuple[str, str] = auth + self.ows_endpoints = self.OwsEndpoints() + self.rest_client = RestClient(url, auth) + + def create_wms(self, workspace_name: str | None = None) -> WebMapService_1_3_0: + print(self.ows_endpoints.wms()) + if workspace_name is None: + return WebMapService_1_3_0( + f"{self.url}{self.ows_endpoints.wms()}", + username=self.auth[0], + password=self.auth[1], + ) + return WebMapService_1_3_0( + f"{self.url}{self.ows_endpoints.workspace_wms(workspace_name)}", + username=self.auth[0], + password=self.auth[1], + ) + + def create_wmts(self) -> WebMapTileService: + return WebMapTileService( + self.ows_endpoints.wmts(), + version="1.0.0", + username=self.auth[0], + password=self.auth[1], + ) + + def get_wms_capabilities( + self, workspace_name: str, accept_languages: str | None = None + ) -> dict[str, Any]: + path: str = self.ows_endpoints.workspace_wms(workspace_name) + params: dict[str, str] = { + "service": "WMS", + "version": "1.3.0", + "request": "GetCapabilities", + } + if accept_languages: + params["AcceptLanguages"] = accept_languages + response: Response = self.rest_client.get(path, params=params) + return xmltodict.parse(response.content) + + def get_wms_layers( + self, workspace_name: str, accept_languages: str | None = None + ) -> Any | dict[str, Any]: + + capabilities: dict[str, Any] = self.get_wms_capabilities( + workspace_name, accept_languages + ) + try: + return capabilities["WMS_Capabilities"]["Capability"]["Layer"] + except KeyError: + return capabilities + + def get_legend_graphic( + self, + layer: list[str], + format: str = "image/png", + language: str | None = None, + style: str | None = None, + workspace_name: str | None = None, + ) -> Response: + """ + WMS GetLegendGraphic request + """ + path: str + if not workspace_name: + path = self.ows_endpoints.wms() + else: + path = self.ows_endpoints.workspace_wms(workspace_name) + params: dict[str, Any] = { + "service": "WMS", + "version": "1.3.0", + "request": "GetLegendGraphic", + "format": format, + "layer": layer, + } + if language: + params["language"] = language + if style: + params["style"] = style + return self.rest_client.get(path, params=params) + + def get_wfs_capabilities(self, workspace_name: str) -> dict[str, Any]: + params: dict[str, str] = { + "service": "WFS", + "version": "1.1.0", + "request": "GetCapabilities", + } + print(self.ows_endpoints.workspace_wfs(workspace_name)) + response: Response = self.rest_client.get( + self.ows_endpoints.workspace_wfs(workspace_name), params=params + ) + return xmltodict.parse(response.content) + + def get_wfs_layers(self, workspace_name: str) -> Any | dict[str, Any]: + capabilities: dict[str, Any] = self.get_wfs_capabilities(workspace_name) + try: + return capabilities["wfs:WFS_Capabilities"]["FeatureTypeList"] + except KeyError: + return capabilities + + def get_feature( + self, + workspace_name: str, + type_name: str, + feature_id: int | None = None, + max_feature: int | None = None, + format: str = "application/json", + ) -> dict[str, Any] | str: + path = self.ows_endpoints.workspace_wfs(workspace_name) + params = { + "service": "WFS", + "version": "1.1.0", + "request": "GetFeature", + "typeName": type_name, + "outputFormat": format, + } + if feature_id: + params["featureID"] = str(feature_id) + if max_feature: + params["maxFeatures"] = str(max_feature) + response = self.rest_client.get(path, params=params) + try: + return response.json() + except JSONDecodeError: + return response.content.decode() + + def describe_feature_type( + self, + workspace_name: str | None = None, + type_name: str | None = None, + format: str = "application/json", + ) -> dict[str, Any] | str: + if not workspace_name: + path = self.ows_endpoints.wfs() + else: + path = self.ows_endpoints.workspace_wfs(workspace_name) + params = { + "service": "WFS", + "version": "1.1.0", + "request": "DescribeFeatureType", + "outputFormat": format, + } + if type_name: + params["typeName"] = type_name + response = self.rest_client.get(path, params=params) + try: + return response.json() + except JSONDecodeError: + return response.content.decode() + + def get_property_value( + self, + workspace_name: str, + type_name: str, + property: str, + ) -> dict | list | str: + path = self.ows_endpoints.workspace_wfs(workspace_name) + params = { + "service": "WFS", + "version": "2.0.0", + "request": "GetPropertyValue", + "typeNames": type_name, + "valueReference": property, + } + response = self.rest_client.get(path, params=params) + value_collection = xmltodict.parse(response.content).get("wfs:ValueCollection") + if not value_collection: + return response.content.decode() + else: + return value_collection.get("wfs:member", {}) + + class OwsEndpoints: + def __init__(self, base_url: str = "") -> None: + self.base_url: str = base_url + + def ows(self) -> str: + return f"{self.base_url}/ows" + + def wms(self) -> str: + return f"{self.base_url}/wms" + + def wfs(self) -> str: + return f"{self.base_url}/wfs" + + def wcs(self) -> str: + return f"{self.base_url}/wcs" + + def wmts(self) -> str: + return f"{self.base_url}/gwc/service/wmts" + + def workspace_ows(self, workspace_name: str) -> str: + return f"{self.base_url}/{workspace_name}/ows" + + def workspace_wms(self, workspace_name: str) -> str: + return f"{self.base_url}/{workspace_name}/wms" + + def workspace_wfs(self, workspace_name: str) -> str: + return f"{self.base_url}/{workspace_name}/wfs" + + def workspace_wcs(self, workspace_name: str) -> str: + return f"{self.base_url}/{workspace_name}/wcs" diff --git a/geoservercloud/services/restclient.py b/geoservercloud/services/restclient.py new file mode 100644 index 0000000..34c8d92 --- /dev/null +++ b/geoservercloud/services/restclient.py @@ -0,0 +1,98 @@ +from typing import Any + +import requests + +TIMEOUT = 120 + + +class RestClient: + """ + HTTP client responsible for issuing requests + + Attributes + ---------- + url : str + base GeoServer URL + auth : tuple[str, str] + username and password for GeoServer + """ + + def __init__(self, url: str, auth: tuple[str, str]) -> None: + self.url: str = url + self.auth: tuple[str, str] = auth + + def get( + self, + path: str, + params: dict[str, str] | None = None, + headers: dict[str, str] | None = None, + ) -> requests.Response: + response: requests.Response = requests.get( + f"{self.url}{path}", + params=params, + headers=headers, + auth=self.auth, + timeout=TIMEOUT, + ) + if response.status_code != 404: + response.raise_for_status() + return response + + def post( + self, + path: str, + params: dict[str, str] | None = None, + headers: dict[str, str] | None = None, + json: dict[str, dict[str, Any]] | None = None, + data: bytes | None = None, + ) -> requests.Response: + + response: requests.Response = requests.post( + f"{self.url}{path}", + params=params, + headers=headers, + json=json, + data=data, + auth=self.auth, + timeout=TIMEOUT, + ) + if response.status_code != 409: + response.raise_for_status() + return response + + def put( + self, + path: str, + params: dict[str, str] | None = None, + headers: dict[str, str] | None = None, + json: dict[str, dict[str, Any]] | None = None, + data: bytes | None = None, + ) -> requests.Response: + response: requests.Response = requests.put( + f"{self.url}{path}", + params=params, + headers=headers, + json=json, + data=data, + auth=self.auth, + timeout=TIMEOUT, + ) + response.raise_for_status() + return response + + def delete( + self, + path: str, + params: dict[str, str] | None = None, + headers: dict[str, str] | None = None, + ) -> requests.Response: + response: requests.Response = requests.delete( + f"{self.url}{path}", + params=params, + headers=headers, + auth=self.auth, + timeout=TIMEOUT, + ) + if response.status_code != 404: + response.raise_for_status() + return response diff --git a/geoservercloud/services/restservice.py b/geoservercloud/services/restservice.py index 14e67b3..a314a22 100644 --- a/geoservercloud/services/restservice.py +++ b/geoservercloud/services/restservice.py @@ -1,87 +1,282 @@ -from typing import Any +from json import JSONDecodeError +from typing import Any, Type -import requests +from requests import Response -TIMEOUT = 120 +from geoservercloud.models import ( + BaseModel, + DataStores, + PostGisDataStore, + Workspace, + Workspaces, +) +from geoservercloud.services.restclient import RestClient +from geoservercloud.templates import Templates class RestService: + """ + Service responsible for serializing and deserializing payloads and routing requests to GeoServer REST API + + Attributes + ---------- + url : str + base GeoServer URL + auth : tuple[str, str] + username and password for GeoServer + """ + def __init__(self, url: str, auth: tuple[str, str]) -> None: self.url: str = url self.auth: tuple[str, str] = auth + self.rest_client = RestClient(url, auth) + self.acl_endpoints = self.AclEndpoints() + self.gwc_endpoints = self.GwcEndpoints() + self.rest_endpoints = self.RestEndpoints() + + def get_workspaces(self) -> tuple[Workspaces | str, int]: + response: Response = self.rest_client.get(self.rest_endpoints.workspaces()) + return self.deserialize_response(response, Workspaces) + + def get_workspace(self, name: str) -> tuple[Workspace | str, int]: + response: Response = self.rest_client.get(self.rest_endpoints.workspace(name)) + return self.deserialize_response(response, Workspace) - def get( - self, - path: str, - params: dict[str, str] | None = None, - headers: dict[str, str] | None = None, - ) -> requests.Response: - response: requests.Response = requests.get( - f"{self.url}{path}", - params=params, - headers=headers, - auth=self.auth, - timeout=TIMEOUT, + def create_workspace(self, workspace: Workspace) -> tuple[str, int]: + path: str = self.rest_endpoints.workspaces() + response: Response = self.rest_client.post(path, json=workspace.post_payload()) + if response.status_code == 409: + path = self.rest_endpoints.workspace(workspace.name) + response = self.rest_client.put(path, json=workspace.put_payload()) + return response.content.decode(), response.status_code + + def delete_workspace(self, workspace: Workspace) -> tuple[str, int]: + path: str = self.rest_endpoints.workspace(workspace.name) + params: dict[str, str] = {"recurse": "true"} + response: Response = self.rest_client.delete(path, params=params) + return response.content.decode(), response.status_code + + def publish_workspace(self, workspace: Workspace) -> tuple[str, int]: + data: dict[str, dict[str, Any]] = Templates.workspace_wms(workspace.name) + response: Response = self.rest_client.put( + self.rest_endpoints.workspace_wms_settings(workspace.name), json=data ) - if response.status_code != 404: - response.raise_for_status() - return response - - def post( - self, - path: str, - params: dict[str, str] | None = None, - headers: dict[str, str] | None = None, - json: dict[str, dict[str, Any]] | None = None, - data: bytes | None = None, - ) -> requests.Response: - - response: requests.Response = requests.post( - f"{self.url}{path}", - params=params, - headers=headers, - json=json, - data=data, - auth=self.auth, - timeout=TIMEOUT, + return response.content.decode(), response.status_code + + def set_default_locale_for_service( + self, workspace: Workspace, locale: str | None + ) -> None: + data: dict[str, dict[str, Any]] = { + "wms": { + "defaultLocale": locale, + } + } + self.rest_client.put( + self.rest_endpoints.workspace_wms_settings(workspace.name), json=data ) - if response.status_code != 409: - response.raise_for_status() - return response - - def put( - self, - path: str, - params: dict[str, str] | None = None, - headers: dict[str, str] | None = None, - json: dict[str, dict[str, Any]] | None = None, - data: bytes | None = None, - ) -> requests.Response: - response: requests.Response = requests.put( - f"{self.url}{path}", - params=params, - headers=headers, - json=json, - data=data, - auth=self.auth, - timeout=TIMEOUT, + + def get_datastores(self, workspace_name: str) -> tuple[DataStores | str, int]: + response: Response = self.rest_client.get( + self.rest_endpoints.datastores(workspace_name) ) - response.raise_for_status() - return response - - def delete( - self, - path: str, - params: dict[str, str] | None = None, - headers: dict[str, str] | None = None, - ) -> requests.Response: - response: requests.Response = requests.delete( - f"{self.url}{path}", - params=params, - headers=headers, - auth=self.auth, - timeout=TIMEOUT, + return self.deserialize_response(response, DataStores) + + def get_pg_datastore( + self, workspace_name: str, datastore_name: str + ) -> tuple[PostGisDataStore | str, int]: + response: Response = self.rest_client.get( + self.rest_endpoints.datastore(workspace_name, datastore_name) ) - if response.status_code != 404: - response.raise_for_status() - return response + return self.deserialize_response(response, PostGisDataStore) + + def create_pg_datastore( + self, workspace_name: str, datastore: PostGisDataStore + ) -> tuple[str, int]: + if not self.resource_exists( + self.rest_endpoints.datastore(workspace_name, datastore.name) + ): + response: Response = self.rest_client.post( + self.rest_endpoints.datastores(workspace_name), + json=datastore.post_payload(), + ) + else: + response: Response = self.rest_client.put( + self.rest_endpoints.datastore(workspace_name, datastore.name), + json=datastore.put_payload(), + ) + return response.content.decode(), response.status_code + + def create_jndi_datastore( + self, workspace_name: str, datastore: PostGisDataStore + ) -> tuple[str, int]: + if not self.resource_exists( + self.rest_endpoints.datastore(workspace_name, datastore.name) + ): + response: Response = self.rest_client.post( + self.rest_endpoints.datastores(workspace_name), + json=datastore.post_payload(), + ) + else: + response: Response = self.rest_client.put( + self.rest_endpoints.datastore(workspace_name, datastore.name), + json=datastore.put_payload(), + ) + return response.content.decode(), response.status_code + + def resource_exists(self, path: str) -> bool: + response: Response = self.rest_client.get(path) + return response.status_code == 200 + + def deserialize_response( + self, response: Response, data_type: Type[BaseModel] + ) -> tuple[Any, int]: + try: + content = response.json() + except JSONDecodeError: + return response.content.decode(), response.status_code + return data_type.from_get_response_payload(content), response.status_code + + class AclEndpoints: + def __init__(self, base_url: str = "/acl") -> None: + self.base_url: str = base_url + + def adminrules(self) -> str: + return f"{self.base_url}/api/adminrules" + + def adminrule(self, id: int) -> str: + return f"{self.base_url}/api/adminrules/id/{id}" + + def rules(self) -> str: + return f"{self.base_url}/api/rules" + + class GwcEndpoints: + def __init__(self, base_url: str = "/gwc/rest") -> None: + self.base_url: str = base_url + + def reload(self) -> str: + return f"{self.base_url}/reload" + + def layers(self, workspace_name: str) -> str: + return f"{self.base_url}/layers.json" + + def layer(self, workspace_name: str, layer_name: str) -> str: + return f"{self.base_url}/layers/{workspace_name}:{layer_name}.json" + + def gridsets(self) -> str: + return f"{self.base_url}/gridsets.json" + + def gridset(self, epsg: int) -> str: + return f"{self.base_url}/gridsets/EPSG:{str(epsg)}.xml" + + class RestEndpoints: + def __init__(self, base_url: str = "/rest") -> None: + self.base_url: str = base_url + + def styles(self) -> str: + return f"{self.base_url}/styles.json" + + def style(self, style_name: str) -> str: + return f"{self.base_url}/styles/{style_name}.json" + + def workspaces(self) -> str: + return f"{self.base_url}/workspaces.json" + + def workspace(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}.json" + + def workspace_styles(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/styles.json" + + def workspace_style(self, workspace_name: str, style_name: str) -> str: + return ( + f"{self.base_url}/workspaces/{workspace_name}/styles/{style_name}.json" + ) + + def workspace_layer(self, workspace_name: str, layer_name: str) -> str: + return f"{self.base_url}/layers/{workspace_name}:{layer_name}.json" + + def workspace_wms_settings(self, workspace_name: str) -> str: + return f"{self.base_url}/services/wms/workspaces/{workspace_name}/settings.json" + + def workspace_wfs_settings(self, workspace_name: str) -> str: + return f"{self.base_url}/services/wfs/workspaces/{workspace_name}/settings.json" + + def datastores(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/datastores.json" + + def datastore(self, workspace_name: str, datastore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}.json" + + def featuretypes(self, workspace_name: str, datastore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}/featuretypes.json" + + def featuretype( + self, workspace_name: str, datastore_name: str, featuretype_name: str + ) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/datastores/{datastore_name}/featuretypes/{featuretype_name}.json" + + def layergroup(self, workspace_name: str, layergroup_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/layergroups/{layergroup_name}.json" + + def layergroups(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/layergroups.json" + + def coveragestores(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/coveragestores.json" + + def coveragestore(self, workspace_name: str, coveragestore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}.json" + + def coverages(self, workspace_name: str, coveragestore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}/coverages.json" + + def coverage( + self, workspace_name: str, coveragestore_name: str, coverage_name: str + ) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/coveragestores/{coveragestore_name}/coverages/{coverage_name}.json" + + def wmsstores(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmsstores.json" + + def wmsstore(self, workspace_name: str, wmsstore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmsstores/{wmsstore_name}.json" + + def wmtsstores(self, workspace_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores.json" + + def wmtsstore(self, workspace_name: str, wmtsstore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}.json" + + def wmtslayers(self, workspace_name: str, wmtsstore_name: str) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}/layers.json" + + def wmtslayer( + self, workspace_name: str, wmtsstore_name: str, wmtslayer_name: str + ) -> str: + return f"{self.base_url}/workspaces/{workspace_name}/wmtsstores/{wmtsstore_name}/layers/{wmtslayer_name}.json" + + def namespaces(self) -> str: + return f"{self.base_url}/namespaces.json" + + def namespace(self, namespace_name: str) -> str: + return f"{self.base_url}/namespaces/{namespace_name}.json" + + def users(self) -> str: + return f"{self.base_url}/security/usergroup/users.json" + + def user(self, username: str) -> str: + return f"{self.base_url}/security/usergroup/user/{username}.json" + + def roles(self) -> str: + return f"{self.base_url}/security/roles.json" + + def user_roles(self, username: str) -> str: + return f"{self.base_url}/security/roles/user/{username}.json" + + def role(self, role_name: str) -> str: + return f"{self.base_url}/security/roles/role/{role_name}.json" + + def role_user(self, role_name: str, username: str) -> str: + return ( + f"{self.base_url}/security/roles/role/{role_name}/user/{username}.json" + ) diff --git a/geoservercloud/templates.py b/geoservercloud/templates.py index 992c161..db00b7f 100644 --- a/geoservercloud/templates.py +++ b/geoservercloud/templates.py @@ -37,7 +37,6 @@ class Templates: - @staticmethod def workspace_wms(workspace: str) -> dict[str, dict[str, Any]]: return { diff --git a/tests/models/test_common.py b/tests/models/test_common.py index 4d63de6..ebf6d4b 100644 --- a/tests/models/test_common.py +++ b/tests/models/test_common.py @@ -2,10 +2,7 @@ import pytest -from geoservercloud.models import ( - I18N, - KeyDollarListDict, -) +from geoservercloud.models import I18N, KeyDollarListDict def test_keydollarlistdict_initialization_with_input_list(): diff --git a/tests/models/test_datastore.py b/tests/models/test_datastore.py index 793e374..b50ce5c 100644 --- a/tests/models/test_datastore.py +++ b/tests/models/test_datastore.py @@ -1,7 +1,24 @@ -from geoservercloud.models import ( - KeyDollarListDict, - PostGisDataStore, -) +import pytest + +from geoservercloud.models import KeyDollarListDict, PostGisDataStore + + +@pytest.fixture(scope="module") +def pg_payload(): + yield { + "dataStore": { + "name": "test_datastore", + "type": "PostGIS", + "enabled": True, + "connectionParameters": { + "entry": [ + {"@key": "host", "$": "localhost"}, + {"@key": "port", "$": "5432"}, + ] + }, + "workspace": {"name": "test_workspace"}, + } + } def test_postgisdatastore_initialization(): @@ -14,12 +31,12 @@ def test_postgisdatastore_initialization(): ) assert datastore.workspace_name == "test_workspace" - assert datastore.data_store_name == "test_datastore" + assert datastore.name == "test_datastore" assert datastore.connection_parameters == connection_parameters - assert datastore.data_store_type == "PostGIS" + assert datastore.type == "PostGIS" -def test_postgisdatastore_put_payload(): +def test_postgisdatastore_put_payload(pg_payload): connection_parameters = KeyDollarListDict( [{"@key": "host", "$": "localhost"}, {"@key": "port", "$": "5432"}] ) @@ -28,21 +45,7 @@ def test_postgisdatastore_put_payload(): "test_workspace", "test_datastore", connection_parameters ) - expected_payload = { - "dataStore": { - "name": "test_datastore", - "type": "PostGIS", - "enabled": True, - "connectionParameters": { - "entry": [ - {"@key": "host", "$": "localhost"}, - {"@key": "port", "$": "5432"}, - ] - }, - } - } - - assert datastore.put_payload() == expected_payload + assert datastore.put_payload() == pg_payload def test_postgisdatastore_post_payload(): @@ -57,43 +60,30 @@ def test_postgisdatastore_post_payload(): assert datastore.post_payload() == datastore.put_payload() -def test_postgisdatastore_from_dict(): - mock_response = { - "dataStore": { - "name": "test_datastore", - "type": "PostGIS", - "connectionParameters": { - "entry": [ - {"@key": "host", "$": "localhost"}, - {"@key": "port", "$": "5432"}, - ] - }, - } - } +def test_postgisdatastore_from_get_response_payload(pg_payload): - datastore = PostGisDataStore.from_dict(mock_response) + datastore = PostGisDataStore.from_get_response_payload(pg_payload) - assert datastore.data_store_name == "test_datastore" - assert datastore.data_store_type == "PostGIS" + assert datastore.name == "test_datastore" + assert datastore.type == "PostGIS" + assert isinstance(datastore.connection_parameters, KeyDollarListDict) assert datastore.connection_parameters["host"] == "localhost" assert datastore.connection_parameters["port"] == "5432" -def test_postgisdatastore_parse_connection_parameters(): - content = { - "dataStore": { - "connectionParameters": { - "entry": [ - {"@key": "host", "$": "localhost"}, - {"@key": "port", "$": "5432"}, - ] +def test_postgisdatastore_asdict(pg_payload): + datastore = PostGisDataStore.from_get_response_payload(pg_payload) + + assert datastore.asdict() == { + "name": "test_datastore", + "type": "PostGIS", + "enabled": True, + "connectionParameters": { + "entry": { + "host": "localhost", + "port": "5432", } - } + }, + "workspace": "test_workspace", } - - connection_params = PostGisDataStore.parse_connection_parameters(content) - - assert isinstance(connection_params, KeyDollarListDict) - assert connection_params["host"] == "localhost" - assert connection_params["port"] == "5432" diff --git a/tests/models/test_datastores.py b/tests/models/test_datastores.py index 1dcf5a8..7e9ab53 100644 --- a/tests/models/test_datastores.py +++ b/tests/models/test_datastores.py @@ -1,44 +1,49 @@ -from geoservercloud.models import DataStores - +from pytest import fixture -def test_datastores_initialization(): - workspace_name = "test_workspace" - datastores = ["store1", "store2"] +from geoservercloud.models import DataStores - ds = DataStores(workspace_name, datastores) - assert ds.workspace_name == "test_workspace" - assert ds.datastores == datastores +@fixture(scope="module") +def mock_datastore(): + return { + "name": "DataStore1", + "href": "http://example.com/ds1", + } -def test_datastores_from_dict(): - mock_response = { +@fixture(scope="module") +def mock_response(mock_datastore): + return { "dataStores": { - "workspace": {"name": "test_workspace"}, - "dataStore": [{"name": "store1"}, {"name": "store2"}], + "dataStore": [mock_datastore], } } - ds = DataStores.from_dict(mock_response) - assert ds.workspace_name == "test_workspace" - assert ds.datastores == ["store1", "store2"] +def test_datastores_initialization(mock_datastore): + ds = DataStores([mock_datastore]) + assert ds.aslist() == [mock_datastore] -def test_datastores_from_dict_empty(): - mock_response = { - "dataStores": {"workspace": {"name": "empty_workspace"}, "dataStore": []} - } - ds = DataStores.from_dict(mock_response) +def test_datastores_from_get_response_payload(mock_datastore, mock_response): + + ds = DataStores.from_get_response_payload(mock_response) + + assert ds.aslist() == [mock_datastore] + + +def test_datastores_from_get_response_payload_empty(): + mock_response = {"dataStores": ""} + + ds = DataStores.from_get_response_payload(mock_response) - assert ds.workspace_name == "empty_workspace" - assert ds.datastores == [] + assert ds.aslist() == [] -def test_datastores_repr(): - ds = DataStores("test_workspace", ["store1", "store2"]) +def test_datastores_repr(mock_datastore): + ds = DataStores([mock_datastore]) - expected_repr = "['store1', 'store2']" + expected_repr = "[{'name': 'DataStore1', 'href': 'http://example.com/ds1'}]" assert repr(ds) == expected_repr diff --git a/tests/models/test_workspace.py b/tests/models/test_workspace.py index c60da78..a8900d7 100644 --- a/tests/models/test_workspace.py +++ b/tests/models/test_workspace.py @@ -24,19 +24,10 @@ def test_workspace_post_payload(): assert workspace.post_payload() == expected_payload -def test_workspace_from_dict_isolated(): +def test_workspace_from_get_response_payload(): mock_response = {"workspace": {"name": "test_workspace", "isolated": True}} - workspace = Workspace.from_dict(mock_response) + workspace = Workspace.from_get_response_payload(mock_response) assert workspace.name == "test_workspace" assert workspace.isolated is True - - -def test_workspace_from_dict_not_isolated(): - mock_response = {"workspace": {"name": "test_workspace"}} - - workspace = Workspace.from_dict(mock_response) - - assert workspace.name == "test_workspace" - assert workspace.isolated is False diff --git a/tests/models/test_workspaces.py b/tests/models/test_workspaces.py index c85bbb4..ea53694 100644 --- a/tests/models/test_workspaces.py +++ b/tests/models/test_workspaces.py @@ -1,29 +1,39 @@ +from pytest import fixture + from geoservercloud.models import Workspaces -def test_workspaces_initialization(): - initial_workspaces = {"Workspace1": "http://example.com/ws1"} +@fixture(scope="module") +def initial_workspaces(): + return [ + { + "name": "Workspace1", + "href": "http://example.com/ws1", + } + ] + + +def test_workspaces_initialization(initial_workspaces): workspaces = Workspaces(initial_workspaces) - assert workspaces.workspaces == initial_workspaces + assert workspaces.aslist() == initial_workspaces -def test_workspaces_find_existing(): - initial_workspaces = {"Workspace1": "http://example.com/ws1"} +def test_workspaces_find_existing(initial_workspaces): workspaces = Workspaces(initial_workspaces) - assert workspaces.find("Workspace1") == "http://example.com/ws1" + assert workspaces.find("Workspace1") == initial_workspaces[0] -def test_workspaces_find_non_existing(): - workspaces = Workspaces({"Workspace1": "http://example.com/ws1"}) +def test_workspaces_find_non_existing(initial_workspaces): + workspaces = Workspaces(initial_workspaces) assert workspaces.find("NonExistingWorkspace") is None -def test_workspaces_from_dict_empty(): - mock_response = {"workspaces": {}} +def test_workspaces_from_get_response_payload_empty(): + mock_response = {"workspaces": ""} - workspaces = Workspaces.from_dict(mock_response) + workspaces = Workspaces.from_get_response_payload(mock_response) - assert workspaces.workspaces == [] + assert workspaces.aslist() == [] diff --git a/tests/test_datastore.py b/tests/test_datastore.py index 999130d..45e5d69 100644 --- a/tests/test_datastore.py +++ b/tests/test_datastore.py @@ -45,6 +45,7 @@ def pg_payload() -> Generator[dict[str, dict[str, Any]], Any, None]: {"@key": "Expose primary keys", "$": "true"}, ] }, + "workspace": {"name": WORKSPACE}, } } @@ -75,66 +76,110 @@ def jndi_payload() -> Generator[dict[str, dict[str, Any]], Any, None]: {"@key": "Expose primary keys", "$": "true"}, ] }, + "workspace": {"name": WORKSPACE}, } } @pytest.fixture(scope="module") -def datastores_response() -> Generator[dict[str, Any], Any, None]: +def datastore_get_response() -> Generator[dict[str, Any], Any, None]: + yield { + "dataStore": { + "name": STORE, + "description": DESCRIPTION, + "type": "PostGIS (JNDI)", + "enabled": True, + "workspace": { + "name": WORKSPACE, + "href": f"http://localhost:8080/geoserver/rest/workspaces/{WORKSPACE}.json", + }, + "connectionParameters": { + "entry": [ + {"@key": "schema", "$": SCHEMA}, + {"@key": "jndiReferenceName", "$": JNDI}, + {"@key": "Expose primary keys", "$": "true"}, + {"@key": "dbtype", "$": "postgis"}, + {"@key": "namespace", "$": "http://{WORKSPACE}"}, + ] + }, + "_default": False, + "disableOnConnFailure": False, + "featureTypes": f"http://localhost:8080/geoserver/rest/workspaces/{WORKSPACE}/datastores/{STORE}/featuretypes.json", + } + } + + +@pytest.fixture(scope="module") +def datastores_get_response() -> Generator[dict[str, Any], Any, None]: yield { "dataStores": { "dataStore": [ { "name": STORE, - "href": f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores/{STORE}.json", + "href": f"http://localhost:8080/geoserver/rest/workspaces/{WORKSPACE}/datastores/{STORE}.json", } - ], + ] } } def test_get_datastores( - geoserver: GeoServerCloud, datastores_response: dict[str, Any] + geoserver: GeoServerCloud, datastores_get_response: dict[str, Any] ) -> None: with responses.RequestsMock() as rsps: rsps.get( url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores.json", status=200, - json=datastores_response, + json=datastores_get_response, ) - datastores = geoserver.get_datastores(workspace_name=WORKSPACE) - assert datastores == ["test_store"] + datastores, status_code = geoserver.get_datastores(workspace_name=WORKSPACE) + assert datastores == datastores_get_response["dataStores"]["dataStore"] + assert status_code == 200 -# Test the get_postgis_datastore method with a valid response -def test_get_postgis_datastore_valid( - geoserver: GeoServerCloud, pg_payload: dict[str, dict[str, Any]] +def test_get_pg_datastore_ok( + geoserver: GeoServerCloud, datastore_get_response: dict[str, Any] ) -> None: + expected_datastore = { + "name": STORE, + "description": DESCRIPTION, + "type": "PostGIS (JNDI)", + "enabled": True, + "workspace": WORKSPACE, + "connectionParameters": { + "entry": { + "schema": "test_schema", + "jndiReferenceName": "java:comp/env/jdbc/data", + "Expose primary keys": "true", + "dbtype": "postgis", + "namespace": "http://{WORKSPACE}", + } + }, + "_default": False, + "disableOnConnFailure": False, + } with responses.RequestsMock() as rsps: rsps.get( - url=f"{geoserver.url}/rest/workspaces/{WORKSPACE}/datastores/{STORE}.json", - json=pg_payload, + url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores/{STORE}.json", status=200, + json=datastore_get_response, ) - result = geoserver.get_postgis_datastore(WORKSPACE, STORE) - assert json.loads(str(result)) == pg_payload + content, status_code = geoserver.get_pg_datastore(WORKSPACE, STORE) + assert content == expected_datastore + assert status_code == 200 -# Test the get_postgis_datastore method with a 404 error -def test_get_postgis_datastore_not_found(geoserver: GeoServerCloud) -> None: - datastore_name = "non_existing_datastore" +def test_get_pg_datastore_not_found(geoserver: GeoServerCloud) -> None: with responses.RequestsMock() as rsps: rsps.get( - url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores/{datastore_name}.json", - json={"error": "Datastore not found"}, + url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores/{STORE}.json", status=404, + body=b"No such datastore: test_workspace,test_datastore", ) - - not_existing_datastore = geoserver.get_postgis_datastore( - WORKSPACE, datastore_name - ) - assert not_existing_datastore is None + content, status_code = geoserver.get_pg_datastore(WORKSPACE, STORE) + assert content == "No such datastore: test_workspace,test_datastore" + assert status_code == 404 def test_create_pg_datastore( @@ -148,11 +193,11 @@ def test_create_pg_datastore( rsps.post( url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores.json", status=201, - json={"workspace": {"name": WORKSPACE}}, + body=b"test_store", match=[matchers.json_params_matcher(pg_payload)], ) - response = geoserver.create_pg_datastore( + content, code = geoserver.create_pg_datastore( workspace_name=WORKSPACE, datastore_name=STORE, pg_host=HOST, @@ -163,8 +208,8 @@ def test_create_pg_datastore( pg_schema=SCHEMA, ) - assert response - assert response.status_code == 201 + assert content == STORE + assert code == 201 def test_update_pg_datastore( @@ -178,10 +223,11 @@ def test_update_pg_datastore( rsps.put( url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores/{STORE}.json", status=200, + body=b"", match=[matchers.json_params_matcher(pg_payload)], ) - response = geoserver.create_pg_datastore( + content, code = geoserver.create_pg_datastore( workspace_name=WORKSPACE, datastore_name=STORE, pg_host=HOST, @@ -192,8 +238,8 @@ def test_update_pg_datastore( pg_schema=SCHEMA, ) - assert response - assert response.status_code == 200 + assert content == "" + assert code == 200 def test_create_jndi_datastore( @@ -207,10 +253,11 @@ def test_create_jndi_datastore( rsps.post( url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores.json", status=201, + body=b"test_store", match=[matchers.json_params_matcher(jndi_payload)], ) - response = geoserver.create_jndi_datastore( + content, code = geoserver.create_jndi_datastore( workspace_name=WORKSPACE, datastore_name=STORE, jndi_reference=JNDI, @@ -218,8 +265,8 @@ def test_create_jndi_datastore( description=DESCRIPTION, ) - assert response - assert response.status_code == 201 + assert content == STORE + assert code == 201 def test_update_jndi_datastore( @@ -233,10 +280,11 @@ def test_update_jndi_datastore( rsps.put( url=f"{GEOSERVER_URL}/rest/workspaces/{WORKSPACE}/datastores/{STORE}.json", status=200, + body=b"", match=[matchers.json_params_matcher(jndi_payload)], ) - response = geoserver.create_jndi_datastore( + content, code = geoserver.create_jndi_datastore( workspace_name=WORKSPACE, datastore_name=STORE, jndi_reference=JNDI, @@ -244,5 +292,5 @@ def test_update_jndi_datastore( description=DESCRIPTION, ) - assert response - assert response.status_code == 200 + assert content == "" + assert code == 200 diff --git a/tests/test_wms.py b/tests/test_wms.py index f1aadbc..1de7dee 100644 --- a/tests/test_wms.py +++ b/tests/test_wms.py @@ -215,5 +215,4 @@ def test_set_default_locale_for_service(geoserver: GeoServerCloud) -> None: ], ) - response = geoserver.set_default_locale_for_service(WORKSPACE, "en") - assert response.status_code == 200 + assert geoserver.set_default_locale_for_service(WORKSPACE, "en") == None diff --git a/tests/test_workspace.py b/tests/test_workspace.py index a2d28bf..87fb7be 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -6,38 +6,61 @@ def test_list_workspaces(geoserver: GeoServerCloud) -> None: + workspaces = [ + { + "name": "test_workspace", + "href": "http://localhost:8080/geoserver/rest/workspaces/test_workspace.json", + } + ] with responses.RequestsMock() as rsps: rsps.get( url=f"{GEOSERVER_URL}/rest/workspaces.json", status=200, - json={ - "workspaces": { - "workspace": [ - { - "name": "test_workspace", - "href": "http://localhost:8080/geoserver/rest/workspaces/test_workspace.json", - } - ] - } - }, + json={"workspaces": {"workspace": workspaces}}, + ) + assert geoserver.get_workspaces() == (workspaces, 200) + + +def test_get_workspace_ok(geoserver: GeoServerCloud) -> None: + workspace_name = "test_workspace" + workspace = {"name": workspace_name, "isolated": False} + with responses.RequestsMock() as rsps: + rsps.get( + url=f"{GEOSERVER_URL}/rest/workspaces/{workspace_name}.json", + status=200, + json={"workspace": workspace}, + ) + assert geoserver.get_workspace(workspace_name) == (workspace, 200) + + +def test_get_workspace_not_found(geoserver: GeoServerCloud) -> None: + workspace_name = "test_workspace" + with responses.RequestsMock() as rsps: + rsps.get( + url=f"{GEOSERVER_URL}/rest/workspaces/{workspace_name}.json", + status=404, + body=b"No such workspace: 'test_workspace' found", + ) + assert geoserver.get_workspace(workspace_name) == ( + "No such workspace: 'test_workspace' found", + 404, ) - workspaces = geoserver.get_workspaces() - assert workspaces.workspaces == ["test_workspace"] def test_create_workspace(geoserver: GeoServerCloud) -> None: - workspace = "test_workspace" + workspace_name = "test_workspace" isolated = True with responses.RequestsMock() as rsps: rsps.post( url=f"{GEOSERVER_URL}/rest/workspaces.json", status=201, + body=b"test_workspace", match=[ matchers.json_params_matcher( { "workspace": { - "name": workspace, + "name": workspace_name, "isolated": isolated, } } @@ -45,27 +68,50 @@ def test_create_workspace(geoserver: GeoServerCloud) -> None: ], ) - response = geoserver.create_workspace(workspace, isolated=isolated) + content, status_code = geoserver.create_workspace(workspace_name, isolated) - assert response.status_code == 201 + assert content == workspace_name + assert status_code == 201 def test_update_workspace(geoserver: GeoServerCloud) -> None: - workspace = "test_workspace" + workspace_name = "test_workspace" with responses.RequestsMock() as rsps: rsps.post( url=f"{GEOSERVER_URL}/rest/workspaces.json", status=409, + match=[ + matchers.json_params_matcher( + { + "workspace": { + "name": workspace_name, + "isolated": False, + } + } + ) + ], ) rsps.put( - url=f"{GEOSERVER_URL}/rest/workspaces/{workspace}.json", + url=f"{GEOSERVER_URL}/rest/workspaces/{workspace_name}.json", + match=[ + matchers.json_params_matcher( + { + "workspace": { + "name": workspace_name, + "isolated": False, + } + } + ) + ], status=200, + body=b"", ) - response = geoserver.create_workspace(workspace) + content, status_code = geoserver.create_workspace(workspace_name) - assert response.status_code == 200 + assert content == "" + assert status_code == 200 def test_delete_workspace(geoserver: GeoServerCloud) -> None: @@ -75,29 +121,42 @@ def test_delete_workspace(geoserver: GeoServerCloud) -> None: rsps.delete( url=f"{GEOSERVER_URL}/rest/workspaces/{workspace}.json", status=200, + body=b"", ) - response = geoserver.delete_workspace(workspace) - - assert response.status_code == 200 + content, status_code = geoserver.delete_workspace(workspace) + assert content == "" + assert status_code == 200 def test_recreate_workspace(geoserver: GeoServerCloud) -> None: - workspace = "test_workspace" + workspace_name = "test_workspace" with responses.RequestsMock() as rsps: rsps.delete( - url=f"{GEOSERVER_URL}/rest/workspaces/{workspace}.json", + url=f"{GEOSERVER_URL}/rest/workspaces/{workspace_name}.json", status=200, + body=b"", ) rsps.post( url=f"{GEOSERVER_URL}/rest/workspaces.json", status=201, + body=b"test_workspace", + match=[ + matchers.json_params_matcher( + { + "workspace": { + "name": workspace_name, + "isolated": False, + } + } + ) + ], ) - response = geoserver.recreate_workspace(workspace) - - assert response.status_code == 201 + content, status_code = geoserver.recreate_workspace(workspace_name) + assert content == workspace_name + assert status_code == 201 def test_publish_workspace(geoserver: GeoServerCloud) -> None: @@ -155,6 +214,6 @@ def test_publish_workspace(geoserver: GeoServerCloud) -> None: ], ) - response = geoserver.publish_workspace(workspace) - - assert response.status_code == 200 + content, status_code = geoserver.publish_workspace(workspace) + assert content == "" + assert status_code == 200