diff --git a/ckanext/datavic_harvester/harvesters/base.py b/ckanext/datavic_harvester/harvesters/base.py index 9b15cc4..db01fa3 100644 --- a/ckanext/datavic_harvester/harvesters/base.py +++ b/ckanext/datavic_harvester/harvesters/base.py @@ -2,8 +2,10 @@ import logging from typing import Optional, Any +from urllib.parse import urlparse import requests +import time from ckan import model from ckan.plugins import toolkit as tk @@ -15,6 +17,15 @@ log = logging.getLogger(__name__) +MAX_CONTENT_LENGTH = int( + tk.config.get("ckanext.datavic_harvester.max_content_length") or 104857600 +) +CHUNK_SIZE = 16 * 1024 +DOWNLOAD_TIMEOUT = 30 +CONFIG_FSC_EXCLUDED_DOMAINS = tk.aslist( + tk.config.get("ckanext.datavic_harvester.filesize_excluded_domains", "") +) + class DataVicBaseHarvester(HarvesterBase): def __init__(self, **kwargs): @@ -128,8 +139,11 @@ def fetch_stage(self, harvest_object: HarvestObject) -> bool: return True def _delete_package(self, package_id: str, guid: str): - tk.get_action("package_delete")(self._make_context(), {"id": package_id}) - log.info(f"Deleted package {package_id} with guid {guid}") + try: + tk.get_action("package_delete")(self._make_context(), {"id": package_id}) + log.info(f"Deleted package {package_id} with guid {guid}") + except tk.ObjectNotFound: + log.error(f"Package {package_id} not found") def _make_context(self) -> dict[str, Any]: return { @@ -139,3 +153,110 @@ def _make_context(self) -> dict[str, Any]: "model": model, "session": model.Session, } + + +class DataTooBigWarning(Exception): + pass + + +def get_resource_size(resource_url: str) -> int: + """Return external resource size in bytes + + Args: + resource_url (str): a URL for the resource’s source + + Returns: + int: resource size in bytes + """ + + length = 0 + cl = None + + if not resource_url or MAX_CONTENT_LENGTH < 0: + return length + + hostname = urlparse(resource_url).hostname + if hostname in CONFIG_FSC_EXCLUDED_DOMAINS: + return length + + try: + headers = {} + + response = _get_response(resource_url, headers) + ct = response.headers.get("content-type") + cl = response.headers.get("content-length") + cl_enabled = tk.asbool(tk.config.get( + "ckanext.datavic_harvester.content_length_enabled", False) + ) + + if ct and "text/html" in ct: + message = ( + f"Resource from url <{resource_url}> is of HTML type. " + "Skip its size calculation." + ) + log.warning(message) + return length + + if cl: + if int(cl) > MAX_CONTENT_LENGTH and MAX_CONTENT_LENGTH > 0: + response.close() + raise DataTooBigWarning() + + if cl_enabled: + response.close() + log.info( + f"Resource from url <{resource_url}> content-length is {int(cl)} bytes." + ) + return int(cl) + + for chunk in response.iter_content(CHUNK_SIZE): + length += len(chunk) + if length > MAX_CONTENT_LENGTH: + response.close() + raise DataTooBigWarning() + + response.close() + + except DataTooBigWarning: + message = ( + f"Resource from url <{resource_url}> is more than the set limit " + f"{MAX_CONTENT_LENGTH} bytes. Skip its size calculation." + ) + log.warning(message) + length = -1 # for the purpose of search possibility in the db + return length + + except requests.exceptions.HTTPError as error: + log.debug(f"HTTP error: {error}") + + except requests.exceptions.Timeout: + log.warning(f"URL time out after {DOWNLOAD_TIMEOUT}s") + + except requests.exceptions.RequestException as error: + log.warning(f"URL error: {error}") + + log.info(f"Resource from url <{resource_url}> length is {length} bytes.") + + return length + + +def _get_response(url, headers): + def get_url(): + kwargs = {"headers": headers, "timeout": 30, "stream": True} + + if "ckan.download_proxy" in tk.config: + proxy = tk.config.get("ckan.download_proxy") + kwargs["proxies"] = {"http": proxy, "https": proxy} + + return requests.get(url, **kwargs) + + response = get_url() + if response.status_code == 202: + wait = 1 + while wait < 120 and response.status_code == 202: + time.sleep(wait) + response = get_url() + wait *= 3 + response.raise_for_status() + + return response diff --git a/ckanext/datavic_harvester/harvesters/dcat_json.py b/ckanext/datavic_harvester/harvesters/dcat_json.py index bb0c83f..c2e1298 100644 --- a/ckanext/datavic_harvester/harvesters/dcat_json.py +++ b/ckanext/datavic_harvester/harvesters/dcat_json.py @@ -14,7 +14,7 @@ from ckanext.harvest.model import HarvestObject from ckanext.datavic_harvester import helpers -from ckanext.datavic_harvester.harvesters.base import DataVicBaseHarvester +from ckanext.datavic_harvester.harvesters.base import DataVicBaseHarvester, get_resource_size log = logging.getLogger(__name__) @@ -34,6 +34,25 @@ def gather_stage(self, harvest_job): def import_stage(self, harvest_object): self._set_config(harvest_object.source.config) + + package_dict, dcat_dict = self._get_package_dict(harvest_object) + dcat_modified = dcat_dict.get("modified") + existing_dataset = self._get_existing_dataset(harvest_object.guid) + + if dcat_modified and existing_dataset: + dcat_modified = helpers.convert_date_to_isoformat( + dcat_modified, "modified", dcat_dict["title"] + ).lower().split("t")[0] + + pkg_modified = existing_dataset['date_modified_data_asset'] + + if pkg_modified and pkg_modified == dcat_modified: + log.info( + f"Dataset with id {existing_dataset['id']} wasn't modified " + "from the last harvest. Skipping this dataset..." + ) + return False + return super().import_stage(harvest_object) def _get_package_dict( @@ -43,7 +62,7 @@ def _get_package_dict( conversions of the data""" dcat_dict: dict[str, Any] = json.loads(harvest_object.content) - pkg_dict = converters.dcat_to_ckan(dcat_dict) + pkg_dict = converters.dcat_to_ckan(dcat_dict) soup: BeautifulSoup = BeautifulSoup(pkg_dict["notes"], "html.parser") @@ -184,11 +203,17 @@ def _set_required_fields_defaults( if not self._get_extra(pkg_dict, "protective_marking"): pkg_dict["protective_marking"] = "official" - if not self._get_extra(pkg_dict, "organization_visibility"): - pkg_dict["organization_visibility"] = "current" + if not self._get_extra(pkg_dict, "organization_visibility") \ + and "default_visibility" in self.config: + pkg_dict["organization_visibility"] = self.config["default_visibility"][ + "organization_visibility" + ] + else: + pkg_dict["organization_visibility"] = self._get_extra( + pkg_dict, "organization_visibility" + ) or "current" - if not self._get_extra(pkg_dict, "workflow_status"): - pkg_dict["workflow_status"] = "draft" + pkg_dict["workflow_status"] = "published" issued: Optional[str] = dcat_dict.get("issued") if issued and not self._get_extra(pkg_dict, "date_created_data_asset"): @@ -212,6 +237,8 @@ def _set_required_fields_defaults( pkg_dict["tag_string"] = dcat_dict.get("keyword", []) + pkg_dict.setdefault("update_frequency", "unknown") + def _get_existing_dataset(self, guid: str) -> Optional[dict[str, Any]]: """Return a package with specific guid extra if exists""" @@ -245,3 +272,14 @@ def _get_mocked_full_metadata(self): here: str = path.abspath(path.dirname(__file__)) with open(path.join(here, "../data/dcat_json_full_metadata.txt")) as f: return f.read() + + def modify_package_dict(self, package_dict, dcat_dict, harvest_object): + ''' + Allows custom harvesters to modify the package dict before + creating or updating the actual package. + ''' + resources = package_dict["resources"] + for resource in resources: + resource["size"] = get_resource_size(resource["url"]) + resource["filesize"] = resource["size"] + return package_dict diff --git a/ckanext/datavic_harvester/harvesters/delwp.py b/ckanext/datavic_harvester/harvesters/delwp.py index 5918568..8af4131 100644 --- a/ckanext/datavic_harvester/harvesters/delwp.py +++ b/ckanext/datavic_harvester/harvesters/delwp.py @@ -17,7 +17,7 @@ from ckanext.harvest.model import HarvestObject, HarvestObjectExtra import ckanext.datavic_harvester.helpers as helpers -from ckanext.datavic_harvester.harvesters.base import DataVicBaseHarvester +from ckanext.datavic_harvester.harvesters.base import DataVicBaseHarvester, get_resource_size log = logging.getLogger(__name__) @@ -183,7 +183,8 @@ def gather_stage(self, harvest_job): def _get_guids_to_package_ids(self, source_id: str) -> dict[str, str]: query = ( model.Session.query(HarvestObject.guid, HarvestObject.package_id) - .filter(HarvestObject.current == True) + # .filter(HarvestObject.current == True) # I've commented it, because + # otherwise we were getting duplicates. .filter(HarvestObject.harvest_source_id == source_id) ) @@ -274,6 +275,19 @@ def import_stage(self, harvest_object: HarvestObject) -> bool | str: pkg_dict = self._get_pkg_dict(harvest_object) + if not pkg_dict["notes"] or not pkg_dict["owner_org"]: + log.info( + f"Description or organization field is missing for object {harvest_object.id}, skipping..." + ) + return False + + # Remove restricted Datasets + if pkg_dict["private"]: + log.info( + f"Dataset is Restricted for object {harvest_object.id}, skipping..." + ) + return False + if status not in ["new", "change"]: return True @@ -350,6 +364,14 @@ def _get_pkg_dict(self, harvest_object): else "" ) + access_notes = """ + Aerial imagery and elevation datasets\n + You can access high-resolution aerial imagery and elevation (LiDAR point cloud) datasets by contacting a business that holds a commercial license.\n + We have two types of commercial licensing:\n + Data Service Providers (DSPs) provide access to the source imagery or elevation data.\n + Value Added Retailers (VARs ) use the imagery and elevation data to create new products and services. This includes advisory services and new knowledge products. + """ + pkg_dict = {} pkg_dict["personal_information"] = "no" @@ -357,8 +379,6 @@ def _get_pkg_dict(self, harvest_object): pkg_dict["access"] = "yes" pkg_dict["organization_visibility"] = "all" pkg_dict["workflow_status"] = "published" - pkg_dict["license_id"] = self.config.get("license_id", "cc-by") - pkg_dict["private"] = self._is_pkg_private(metashare_dict) pkg_dict["title"] = metashare_dict.get("title") pkg_dict["notes"] = metashare_dict.get("abstract", "") pkg_dict["tags"] = helpers.get_tags(remote_topiccat) if remote_topiccat else [] @@ -373,9 +393,6 @@ def _get_pkg_dict(self, harvest_object): if not pkg_dict.get("name"): pkg_dict["name"] = self._get_package_name(harvest_object, pkg_dict["title"]) - if full_metadata_url: - pkg_dict["full_metadata_url"] = full_metadata_url - if uuid: pkg_dict["primary_purpose_of_collection"] = uuid @@ -409,6 +426,31 @@ def _get_pkg_dict(self, harvest_object): pkg_dict["resources"] = self._fetch_resources(metashare_dict) + pkg_dict["private"] = self._is_pkg_private( + metashare_dict, + pkg_dict["resources"] + ) + + pkg_dict["license_id"] = self.config.get("license_id", "cc-by") + + if pkg_dict["private"]: + pkg_dict["license_id"] = "other-closed" + + if self._is_delwp_raster_data(pkg_dict["resources"]): + pkg_dict["full_metadata_url"] = f"https://metashare.maps.vic.gov.au/geonetwork/srv/api/records/{uuid}/formatters/cip-pdf?root=export&output=pdf" + pkg_dict["access_description"] = access_notes + elif full_metadata_url: + pkg_dict["full_metadata_url"] = full_metadata_url + + for key, value in [ + ("harvest_source_id", harvest_object.source.id), + ("harvest_source_title", harvest_object.source.title), + ("harvest_source_type", harvest_object.source.type), + ("delwp_restricted", pkg_dict["private"]) + ]: + pkg_dict.setdefault("extras", []) + pkg_dict["extras"].append({"key": key, "value": value}) + return pkg_dict def _create_custom_package_create_schema(self) -> dict[str, Any]: @@ -419,13 +461,50 @@ def _create_custom_package_create_schema(self) -> dict[str, Any]: return package_schema - def _is_pkg_private(self, remote_dict: dict[str, Any]) -> bool: - """Check if the dataset should be private by `resclassification` field - value""" - return remote_dict.get("resclassification") in ( - "limitedDistribution", - "restricted", - ) + def _is_delwp_vector_data(self, resources: list[dict[str, Any]]) -> bool: + for res in resources: + if res["format"].lower() in [ + "dwg", + "dxf", + "gdb", + "shp", + "mif", + "tab", + "extended tab", + "mapinfo", + ]: + return True + + return False + + def _is_delwp_raster_data(self, resources: list[dict[str, Any]]) -> bool: + for res in resources: + if res["format"].lower() in [ + "ecw", + "geotiff", + "jpeg", + "jp2", + "jpeg 2000", + "tiff", + "lass", + "xyz", + ]: + return True + + return False + + def _is_pkg_private( + self, + remote_dict: dict[str, Any], + resources: list[dict[str, Any]] + ) -> bool: + """Check if the dataset should be private""" + if (self._is_delwp_vector_data(resources) and + remote_dict.get("mdclassification") == "unclassified" and + remote_dict.get("resclassification") == "unclassified"): + return False + + return True def _get_organisation( self, @@ -571,6 +650,9 @@ def _get_resources_by_formats( res["name"] = f"{res['name']} {res_format}".replace("_", "") + res["size"] = get_resource_size(res_url) + res["filesize"] = res["size"] + if attribution: res["attribution"] = attribution diff --git a/ckanext/datavic_harvester/harvesters/ods.py b/ckanext/datavic_harvester/harvesters/ods.py index 21c65bc..cb074aa 100644 --- a/ckanext/datavic_harvester/harvesters/ods.py +++ b/ckanext/datavic_harvester/harvesters/ods.py @@ -15,6 +15,7 @@ import ckan.plugins.toolkit as tk from ckanext.harvest_basket.harvesters import ODSHarvester +from .base import get_resource_size class DataVicODSHarvester(ODSHarvester): @@ -38,4 +39,8 @@ def _fetch_resources(self, source_url, resource_urls, pkg_data): for res in resources: if res["format"] == "CSV": res["url"] = f'{res["url"]}?delimiter=%2C' + + res["size"] = get_resource_size(res["url"]) + res["filesize"] = res["size"] + return resources