Skip to content

Commit

Permalink
[#44] data processing objects to work only with downloaded aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
pkdash committed Mar 3, 2023
1 parent 61d9ba3 commit 7315cad
Showing 1 changed file with 38 additions and 73 deletions.
111 changes: 38 additions & 73 deletions hsclient/hydroshare.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import os
import pathlib
import pickle
import shutil
import sqlite3
import tempfile
import time
Expand Down Expand Up @@ -232,63 +231,37 @@ def _download(self, save_path: str = "", unzip_to: str = None) -> str:
return unzip_to
return downloaded_zip

def _validate_aggregation_path(self, agg_path: str):
main_file_ext = pathlib.Path(self.main_file_path).suffix
file_name = self.file(extension=main_file_ext).name
file_path = urljoin(agg_path, file_name)
if not os.path.exists(file_path) or not os.path.isfile(file_path):
file_path = urljoin(file_path, file_name)
if not os.path.exists(file_path):
raise Exception(f"Aggregation was not found at: {agg_path}")
return file_path

def _get_data_object(self, agg_path, func):
if self._data_object is not None and self.metadata.type != AggregationType.TimeSeriesAggregation:
return self._data_object

main_file_ext = pathlib.Path(self.main_file_path).suffix
if agg_path is None:
td = tempfile.mkdtemp()
try:
self._download(unzip_to=td)
# zip extracted to folder with main file name
file_name = self.file(extension=main_file_ext).name
file_path = urljoin(td, file_name, file_name)
data_object = func(file_path)
if self.metadata.type == AggregationType.MultidimensionalAggregation:
data_object.load()
data_object.close()
finally:
# we can delete the temporary directory for the data object created
# for these 2 aggregation types only. For other aggregation types, the generated data object
# needs to have access to the aggregation files in the temporary directory - so it's the caller's
# responsibility to delete the temporary directory
if self.metadata.type in (AggregationType.TimeSeriesAggregation,
AggregationType.MultidimensionalAggregation):
shutil.rmtree(td)
else:
file_path = urljoin(agg_path, self.file(extension=main_file_ext).name)
data_object = func(file_path)
if self.metadata.type == AggregationType.MultidimensionalAggregation:
data_object.close()
file_path = self._validate_aggregation_path(agg_path)
data_object = func(file_path)
if self.metadata.type == AggregationType.MultidimensionalAggregation:
data_object.load()
data_object.close()

# cache the object for the aggregation
self._data_object = data_object

return data_object

def _save_data_object(self, resource, agg_path: str = "", as_new_aggr=False, destination_path=""):
def _save_data_object(self, resource, agg_path: str, as_new_aggr=False, destination_path=""):
if self._data_object is None:
raise Exception("No data object exists for this aggregation.")

main_file_ext = pathlib.Path(self.main_file_path).suffix
temp_dir = None
if not agg_path:
temp_dir = tempfile.mkdtemp()
try:
self._download(unzip_to=temp_dir)
# zip extracted to folder with main file name
file_name = self.file(extension=main_file_ext).name
file_path = urljoin(temp_dir, file_name, file_name)
if self.metadata.type == AggregationType.MultidimensionalAggregation:
self._data_object.to_netcdf(file_path, format="NETCDF4")
except Exception:
shutil.rmtree(temp_dir)
raise
else:
file_path = urljoin(agg_path, self.file(extension=main_file_ext).name)
if self.metadata.type == AggregationType.MultidimensionalAggregation:
self._data_object.to_netcdf(file_path, format="NETCDF4")
file_path = self._validate_aggregation_path(agg_path)
if self.metadata.type == AggregationType.MultidimensionalAggregation:
self._data_object.to_netcdf(file_path, format="NETCDF4")

if self.metadata.type == AggregationType.TimeSeriesAggregation:
with closing(sqlite3.connect(file_path)) as conn:
Expand All @@ -301,7 +274,7 @@ def _save_data_object(self, resource, agg_path: str = "", as_new_aggr=False, des
conn.execute("DROP TABLE temp")
conn.commit()

aggr_path = self.main_file_path
aggr_file_path = self.main_file_path
data_object = self._data_object
aggr_type = self.metadata.type
if not as_new_aggr:
Expand All @@ -325,7 +298,7 @@ def _save_data_object(self, resource, agg_path: str = "", as_new_aggr=False, des
resource.file_upload(file_path)

# retrieve the updated aggregation
aggr = resource.aggregation(file__path=aggr_path)
aggr = resource.aggregation(file__path=aggr_file_path)

# update metadata
for kw in keywords:
Expand All @@ -342,12 +315,10 @@ def _save_data_object(self, resource, agg_path: str = "", as_new_aggr=False, des
resource.file_upload(file_path, destination_path=destination_path)

# retrieve the new aggregation
aggr_path = urljoin(destination_path, os.path.basename(aggr_path))
aggr_path = urljoin(destination_path, os.path.basename(aggr_file_path))
aggr = resource.aggregation(file__path=aggr_path)

aggr._data_object = data_object
if temp_dir is not None:
shutil.rmtree(temp_dir)
return aggr

@property
Expand Down Expand Up @@ -478,12 +449,11 @@ def delete(self) -> None:
self._hs_session.delete(path, status_code=200)
self.refresh()

def as_series(self, series_id: str, agg_path: str = None) -> 'pandas.DataFrame':
def as_series(self, series_id: str, agg_path: str) -> 'pandas.DataFrame':
"""
Creates a pandas DataFrame object out of an aggregation of type TimeSeries.
:param series_id: The series_id of the timeseries result to be converted to a Dataframe object.
:param agg_path: Not required. Include this parameter to avoid downloading the aggregation if you already have
it downloaded locally.
:param agg_path: The local path where this aggregation has been downloaded previously.
:return: A pandas.DataFrame object
"""
# TODO: if we decide that the user will prefer to use `as_data_object` method rather than this method, then
Expand All @@ -502,11 +472,10 @@ def to_series(timeseries_file: str):

return self._get_data_object(agg_path=agg_path, func=to_series)

def as_multi_dimensional_dataset(self, agg_path: str = None) -> 'xarray.Dataset':
def as_multi_dimensional_dataset(self, agg_path: str) -> 'xarray.Dataset':
"""
Creates a xarray Dataset object out of an aggregation of type NetCDF.
:param agg_path: Not required. Include this parameter to avoid downloading the aggregation if you already have
it downloaded locally.
:param agg_path: The local path where this aggregation has been downloaded previously.
:return: A xarray.Dataset object
"""
# TODO: if we decide that the user will prefer to use `as_data_object` method rather than this method, then
Expand All @@ -519,11 +488,10 @@ def as_multi_dimensional_dataset(self, agg_path: str = None) -> 'xarray.Dataset'

return self._get_data_object(agg_path=agg_path, func=xarray.open_dataset)

def as_feature_collection(self, agg_path: str = None) -> 'fiona.Collection':
def as_feature_collection(self, agg_path: str) -> 'fiona.Collection':
"""
Creates a fiona Collection object out of an aggregation of type GeoFeature.
:param agg_path: Not required. Include this parameter to avoid downloading the aggregation if you already have
it downloaded locally at aggr_path.
:param agg_path: The local path where this aggregation has been downloaded previously.
:return: A fiona.Collection object
Note: The caller is responsible for closing the fiona.Collection object to free up aggregation files used to
create this object.
Expand All @@ -537,11 +505,10 @@ def as_feature_collection(self, agg_path: str = None) -> 'fiona.Collection':
raise Exception("fiona package not found")
return self._get_data_object(agg_path=agg_path, func=fiona.open)

def as_raster_dataset(self, agg_path: str = None) -> 'rasterio.DatasetReader':
def as_raster_dataset(self, agg_path: str) -> 'rasterio.DatasetReader':
"""
Creates a rasterio DatasetReader object out of an aggregation of type GeoRaster
:param agg_path: Not required. Include this parameter to avoid downloading the aggregation if you already have
it downloaded locally at aggr_path.
:param agg_path: The local path where this aggregation has been downloaded previously.
:return: A rasterio.DatasetReader object
Note: The caller is responsible for closing the rasterio.DatasetReader object to free up aggregation files
used to create this object.
Expand All @@ -556,7 +523,7 @@ def as_raster_dataset(self, agg_path: str = None) -> 'rasterio.DatasetReader':

return self._get_data_object(agg_path=agg_path, func=rasterio.open)

def as_data_object(self, series_id: str = None, agg_path: str = None) -> \
def as_data_object(self, agg_path: str, series_id: str = "") -> \
Union['pandas.DataFrame', 'fiona.Collection', 'rasterio.DatasetReader', 'xarray.Dataset', None]:
"""Load aggregation data to a relevant data object type"""

Expand All @@ -573,13 +540,12 @@ def as_data_object(self, series_id: str = None, agg_path: str = None) -> \

raise Exception(f"Data object is not supported for '{self.metadata.type}' aggregation type")

def update_netcdf_data(self, resource, agg_path: str = "", as_new_aggr=False, destination_path="") -> 'Aggregation':
def update_netcdf_data(self, resource, agg_path: str, as_new_aggr=False, destination_path="") -> 'Aggregation':
"""
Updates the netcdf file associated with this aggregation. Then uploads the updated netcdf file
to create a new aggregation that replaces the original aggregation.
:param resource: The resource object to which this aggregation belongs.
:param agg_path: Not required. Include this parameter to avoid downloading the aggregation if you already have
it downloaded locally at aggr_path.
:param agg_path: The local path where this aggregation has been downloaded previously.
:param as_new_aggr: If True a new aggregation will be created, otherwise this aggregation will be
updated/replaced.
:param destination_path: The destination folder path where the new aggregation will be created. This folder
Expand All @@ -595,14 +561,13 @@ def update_netcdf_data(self, resource, agg_path: str = "", as_new_aggr=False, de

return self._save_data_object(resource, agg_path, as_new_aggr, destination_path)

def update_timeseries_data(self, resource, agg_path: str = "", as_new_aggr=False,
def update_timeseries_data(self, resource, agg_path: str, as_new_aggr=False,
destination_path="") -> 'Aggregation':
"""
Updates the sqlite file associated with this aggregation. Then uploads the updated sqlite file
to create a new aggregation that replaces the original aggregation.
:param resource: The resource object to which this aggregation belongs.
:param agg_path: Not required. Include this parameter to avoid downloading the aggregation if you already have
it downloaded locally at aggr_path.
:param agg_path: The local path where this aggregation has been downloaded previously.
:param as_new_aggr: If True a new aggregation will be created, otherwise this aggregation will be
updated/replaced.
:param destination_path: The destination folder path where the new aggregation will be created. This folder
Expand All @@ -618,15 +583,15 @@ def update_timeseries_data(self, resource, agg_path: str = "", as_new_aggr=False

return self._save_data_object(resource, agg_path, as_new_aggr, destination_path)

def save_data_object(self, resource, agg_path: str = "", as_new_aggr=False, destination_path="") -> 'Aggregation':
def save_data_object(self, resource, agg_path: str, as_new_aggr=False, destination_path="") -> 'Aggregation':
"""
Updates the data file(s) of this aggregation using the associated data processing object
and either updates this aggregation or creates a new aggregation using the updated data files.
"""
if self.metadata.type != AggregationType.MultidimensionalAggregation:
if self.metadata.type == AggregationType.MultidimensionalAggregation:
return self.update_netcdf_data(resource, agg_path, as_new_aggr, destination_path)

if self.metadata.type != AggregationType.TimeSeriesAggregation:
if self.metadata.type == AggregationType.TimeSeriesAggregation:
return self.update_timeseries_data(resource, agg_path, as_new_aggr, destination_path)

# TODO: Implement this functionality for Raster and GeoFeature aggregations
Expand Down

0 comments on commit 7315cad

Please sign in to comment.