From 469c4e2945bc9870d63a2ed5b9e0c886028d063f Mon Sep 17 00:00:00 2001 From: pkdash Date: Wed, 29 Mar 2023 18:34:59 -0400 Subject: [PATCH] [#44] preventing aggregation delete as part of aggregation update via data object --- hsclient/hydroshare.py | 99 ++++++++++++++++++------------------------ 1 file changed, 43 insertions(+), 56 deletions(-) diff --git a/hsclient/hydroshare.py b/hsclient/hydroshare.py index 40b4849..edfe9ad 100644 --- a/hsclient/hydroshare.py +++ b/hsclient/hydroshare.py @@ -15,6 +15,7 @@ from pprint import pformat from typing import Callable, Dict, List, TYPE_CHECKING, Union from urllib.parse import quote, unquote, urlparse +from uuid import uuid4 from zipfile import ZipFile if TYPE_CHECKING: @@ -179,9 +180,6 @@ def _files(self): @property def _aggregations(self): - def populate_files(_aggr): - _aggr._files - def populate_metadata(_aggr): _aggr._metadata @@ -191,9 +189,8 @@ def populate_metadata(_aggr): if is_aggregation(str(file)): self._parsed_aggregations.append(Aggregation(unquote(file.path), self._hs_session, self._checksums)) - # load files (instances of File) and metadata for all aggregations + # load metadata for all aggregations (metadata is needed to create a typed aggregation) with ThreadPoolExecutor() as executor: - executor.map(populate_files, self._parsed_aggregations) executor.map(populate_metadata, self._parsed_aggregations) # convert aggregations to aggregation type supporting data object @@ -204,11 +201,10 @@ def populate_metadata(_aggr): AggregationType.GeographicFeatureAggregation: GeoFeatureAggregation, } for aggr in aggregations_copy: - typed_aggr = None typed_aggr_cls = typed_aggregation_classes.get(aggr.metadata.type, None) if typed_aggr_cls: typed_aggr = typed_aggr_cls.create(base_aggr=aggr) - if typed_aggr: + # swapping the generic aggregation with the typed aggregation in the aggregation list self._parsed_aggregations.remove(aggr) self._parsed_aggregations.append(typed_aggr) @@ -476,6 +472,24 @@ def _validate_aggregation_for_update(self, resource: 'Resource', agg_type: Aggre if aggr is None: raise Exception("This aggregation is not part of the specified resource.") + def _update_aggregation(self, resource, *files): + temp_folder = uuid4().hex + resource.folder_create(temp_folder) + resource.file_upload(*files, destination_path=temp_folder) + # check aggregation got created in the temp folder + file_path = os.path.join(temp_folder, os.path.basename(self.main_file_path)) + original_aggr_dir_path = dirname(self.main_file_path) + aggr = resource.aggregation(file__path=file_path) + if aggr is not None: + # delete this aggregation which will be replaced with the updated aggregation + self.delete() + # move the aggregation from the temp folder to the location of the deleted aggregation + resource.aggregation_move(aggr, dst_path=original_aggr_dir_path) + + resource.folder_delete(temp_folder) + if aggr is None: + raise Exception("Failed to update aggregation") + class NetCDFAggregation(DataObjectSupportingAggregation): @@ -484,11 +498,8 @@ def create(cls, base_aggr): return super().create(aggr_cls=cls, base_aggr=base_aggr) def as_data_object(self, agg_path: str) -> 'xarray.Dataset': - if self.metadata.type != AggregationType.MultidimensionalAggregation: - raise Exception("Aggregation is not of type NetCDF") if xarray is None: raise Exception("xarray package was not found") - return self._get_data_object(agg_path=agg_path, func=xarray.open_dataset) def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False, @@ -498,18 +509,14 @@ def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: boo file_path = self._validate_aggregation_path(agg_path, for_save_data=True) self._data_object.to_netcdf(file_path, format="NETCDF4") aggr_main_file_path = self.main_file_path - data_object = self._data_object if not as_new_aggr: - destination_path = dirname(self.main_file_path) - # cache some of the metadata fields of the original aggregation to update the metadata of the # updated aggregation keywords = self.metadata.subjects additional_meta = self.metadata.additional_metadata - # TODO: keep a local backup copy of the aggregation before deleting it - self.delete() - resource.file_upload(file_path, destination_path=destination_path) + # upload the updated aggregation files + self._update_aggregation(resource, file_path) # retrieve the updated aggregation aggr = resource.aggregation(file__path=aggr_main_file_path) @@ -521,15 +528,14 @@ def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: boo aggr.metadata.additional_metadata = additional_meta aggr.save() else: - # creating a new aggregation by uploading the updated data files + # creating a new aggregation resource.file_upload(file_path, destination_path=destination_path) # retrieve the new aggregation agg_path = urljoin(destination_path, os.path.basename(aggr_main_file_path)) aggr = resource.aggregation(file__path=agg_path) - data_object = None - aggr._data_object = data_object + aggr._data_object = None return aggr @@ -571,8 +577,6 @@ def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: boo aggr_main_file_path = self.main_file_path data_object = self._data_object if not as_new_aggr: - destination_path = dirname(self.main_file_path) - # cache some of the metadata fields of the original aggregation to update the metadata of the # updated aggregation keywords = self.metadata.subjects @@ -580,11 +584,8 @@ def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: boo title = self.metadata.title abstract = self.metadata.abstract - # TODO: If the creation of the replacement aggregation fails for some reason, then with the following - # delete action we will lose this aggregation from HydroShare. Need to keep a copy of the - # original aggregation locally so that we can upload that to HydroShare if needed. - self.delete() - resource.file_upload(file_path, destination_path=destination_path) + # upload the updated aggregation files to the temp folder - to create the updated aggregation + self._update_aggregation(resource, file_path) # retrieve the updated aggregation aggr = resource.aggregation(file__path=aggr_main_file_path) @@ -643,32 +644,30 @@ def upload_shape_files(main_file_path, dst_path=""): if item.startswith(filename_starts_with): file_full_path = os.path.join(shp_file_dir_path, item) shape_files.append(file_full_path) - resource.file_upload(*shape_files, destination_path=dst_path) + + if not dst_path: + self._update_aggregation(resource, *shape_files) + else: + resource.file_upload(*shape_files, destination_path=dst_path) self._validate_aggregation_for_update(resource, AggregationType.GeographicFeatureAggregation) file_path = self._validate_aggregation_path(agg_path, for_save_data=True) aggr_main_file_path = self.main_file_path data_object = self._data_object + # need to close the fiona.Collection object to free up access to all the original shape files + data_object.close() if not as_new_aggr: - destination_path = dirname(self.main_file_path) - # cache some of the metadata fields of the original aggregation to update the metadata of the # updated aggregation keywords = self.metadata.subjects additional_meta = self.metadata.additional_metadata - # TODO: keep a local backup copy of the aggregation before deleting it - self.delete() # copy the updated shape files to the original shape file location where the user downloaded the # aggregation previously src_shp_file_dir_path = os.path.dirname(file_path) tgt_shp_file_dir_path = os.path.dirname(data_object.path) - agg_path = tgt_shp_file_dir_path filename_starts_with = f"{pathlib.Path(file_path).stem}." - # need to close the fiona.Collection object to free up access to all the original shape files - data_object.close() - for item in os.listdir(src_shp_file_dir_path): if item.startswith(filename_starts_with): src_file_full_path = os.path.join(src_shp_file_dir_path, item) @@ -676,7 +675,7 @@ def upload_shape_files(main_file_path, dst_path=""): shutil.copyfile(src_file_full_path, tgt_file_full_path) # upload the updated shape files to replace this aggregation - upload_shape_files(main_file_path=data_object.path, dst_path=destination_path) + upload_shape_files(main_file_path=data_object.path) # retrieve the updated aggregation aggr = resource.aggregation(file__path=aggr_main_file_path) @@ -687,23 +686,15 @@ def upload_shape_files(main_file_path, dst_path=""): aggr.metadata.subjects.append(kw) aggr.metadata.additional_metadata = additional_meta aggr.save() - - # load aggregation data to fiona Collection object - data_object = aggr.as_data_object(agg_path=agg_path) else: - # creating a new aggregation - # close the original fiona Collection object - data_object.close() - # upload the updated shape files to create a new geo feature aggregation upload_shape_files(main_file_path=file_path, dst_path=destination_path) # retrieve the new aggregation agg_path = urljoin(destination_path, os.path.basename(aggr_main_file_path)) aggr = resource.aggregation(file__path=agg_path) - data_object = None - aggr._data_object = data_object + aggr._data_object = None return aggr @@ -753,7 +744,6 @@ def _validate_aggregation_path(self, agg_path: str, for_save_data: bool = False) def as_data_object(self, agg_path: str) -> 'rasterio.DatasetReader': if rasterio is None: raise Exception("rasterio package was not found") - return self._get_data_object(agg_path=agg_path, func=rasterio.open) def save_data_object(self, resource: 'Resource', agg_path: str, as_new_aggr: bool = False, @@ -764,7 +754,11 @@ def upload_raster_files(dst_path=""): item_full_path = os.path.join(agg_path, item) if os.path.isfile(item_full_path): raster_files.append(item_full_path) - resource.file_upload(*raster_files, destination_path=dst_path) + + if not dst_path: + self._update_aggregation(resource, *raster_files) + else: + resource.file_upload(*raster_files, destination_path=dst_path) def get_main_file_path(): main_file_name = os.path.basename(file_path) @@ -778,8 +772,6 @@ def get_main_file_path(): self._validate_aggregation_for_update(resource, AggregationType.GeographicRasterAggregation) file_path = self._validate_aggregation_path(agg_path, for_save_data=True) - # aggr_main_file_path = self.main_file_path - # data_object = self._data_object if not as_new_aggr: destination_path = dirname(self.main_file_path) @@ -787,14 +779,10 @@ def get_main_file_path(): # updated aggregation keywords = self.metadata.subjects additional_meta = self.metadata.additional_metadata - - # TODO: keep a local backup copy of the aggregation before deleting it - self.delete() upload_raster_files(dst_path=destination_path) - # retrieve the updated aggregation - # compute the main file name aggr_main_file_path = get_main_file_path() + # retrieve the updated aggregation aggr = resource.aggregation(file__path=aggr_main_file_path) # update metadata @@ -812,8 +800,7 @@ def get_main_file_path(): agg_path = urljoin(destination_path, os.path.basename(aggr_main_file_path)) aggr = resource.aggregation(file__path=agg_path) - data_object = None - aggr._data_object = data_object + aggr._data_object = None return aggr