Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

To stac #617

Merged
merged 24 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Unreleased

Added
-----
- Support for exporting data catalogs to STAC catalog formats. (#617)


Changed
-------
Expand Down
12 changes: 12 additions & 0 deletions hydromt/data_adapter/data_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@
import pandas as pd
import xarray as xr
import yaml

# from pystac import Item as StacItem
from pystac import Catalog as StacCatalog
from upath import UPath

from hydromt.typing import ErrorHandleMethod

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -414,3 +419,10 @@ def _single_var_as_array(ds, single_var_as_array, variable_name=None):
return da
else:
return ds

@abstractmethod
def to_stac_catalog(
self,
on_error: ErrorHandleMethod = ErrorHandleMethod.COERCE,
) -> Optional[StacCatalog]:
"""Create a stac item from the data adatper to be added to a stac catalog."""
72 changes: 64 additions & 8 deletions hydromt/data_adapter/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
"""Implementation for the Pandas Dataframe adapter."""
import logging
import warnings
from datetime import datetime
from os.path import join
from typing import Optional, Union
from typing import Literal, Optional, Union

import numpy as np
import pandas as pd
from pystac import Asset as StacAsset
from pystac import Catalog as StacCatalog
from pystac import Item as StacItem

from hydromt.typing import ErrorHandleMethod

from ..nodata import NoDataStrategy, _exec_nodata_strat
from .data_adapter import DataAdapter
Expand All @@ -30,13 +36,13 @@ def __init__(
driver: Optional[str] = None,
filesystem: Optional[str] = None,
nodata: Optional[Union[dict, float, int]] = None,
rename: dict = None,
unit_mult: dict = None,
unit_add: dict = None,
meta: dict = None,
attrs: dict = None,
driver_kwargs: dict = None,
storage_options: dict = None,
rename: Optional[dict] = None,
unit_mult: Optional[dict] = None,
unit_add: Optional[dict] = None,
meta: Optional[dict] = None,
attrs: Optional[dict] = None,
driver_kwargs: Optional[dict] = None,
storage_options: Optional[dict] = None,
name: str = "", # optional for now
catalog_name: str = "", # optional for now
provider: Optional[str] = None,
Expand Down Expand Up @@ -323,3 +329,53 @@ def _set_metadata(self, df):
df[col].attrs.update(**self.attrs[col])

return df

def to_stac_catalog(
self,
on_error: Literal["raise", "skip", "coerce"] = "coerce",
savente93 marked this conversation as resolved.
Show resolved Hide resolved
) -> Optional[StacCatalog]:
"""
Convert a rasterdataset into a STAC Catalog representation.

The collection will contain an asset for each of the associated files.


Parameters
----------
- on_error (str, optional): The error handling strategy.
Options are: "raise" to raise an error on failure, "skip" to skip the
dataset on failure, and "coerce" (default) to set default values on failure.

Returns
-------
- Optional[StacCatalog]: The STAC Catalog representation of the dataset, or None
if the dataset was skipped.
"""
if on_error == ErrorHandleMethod.SKIP:
logger.warning(
f"Skipping {self.name} during stac conversion because"
"because detecting temporal extent failed."
)
return
elif on_error == ErrorHandleMethod.COERCE:
stac_catalog = StacCatalog(
self.name,
description=self.name,
)
stac_item = StacItem(
self.name,
geometry=None,
bbox=[0, 0, 0, 0],
properties=self.meta,
datetime=datetime(1, 1, 1),
)
stac_asset = StacAsset(str(self.path))
stac_item.add_asset("hydromt_path", stac_asset)

stac_catalog.add_item(stac_item)
return stac_catalog
else:
raise NotImplementedError(
"DataframeAdapter does not support full stac conversion as it lacks"
" spatio-temporal dimentions"
)
76 changes: 69 additions & 7 deletions hydromt/data_adapter/geodataframe.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
"""The Geodataframe adapter implementation."""
import logging
import warnings
from os.path import join
from pathlib import Path
from typing import NewType, Optional, Tuple, Union
from datetime import datetime
from os.path import basename, join
from typing import Optional, Union

import numpy as np
import pyproj
from pystac import Asset as StacAsset
from pystac import Catalog as StacCatalog
from pystac import Item as StacItem

from hydromt.typing import ErrorHandleMethod, GeoDataframeSource, TotalBounds

from .. import gis_utils, io
from ..nodata import NoDataStrategy, _exec_nodata_strat
Expand All @@ -16,8 +21,6 @@

__all__ = ["GeoDataFrameAdapter", "GeoDataframeSource"]

GeoDataframeSource = NewType("GeoDataframeSource", Union[str, Path])


class GeoDataFrameAdapter(DataAdapter):

Expand Down Expand Up @@ -411,7 +414,7 @@ def _set_metadata(self, gdf):

return gdf

def get_bbox(self, detect=True):
def get_bbox(self, detect=True) -> TotalBounds:
"""Return the bounding box and espg code of the dataset.

if the bounding box is not set and detect is True,
Expand Down Expand Up @@ -441,7 +444,7 @@ def get_bbox(self, detect=True):
def detect_bbox(
self,
gdf=None,
) -> Tuple[Tuple[float, float, float, float], int]:
) -> TotalBounds:
"""Detect the bounding box and crs of the dataset.

If no dataset is provided, it will be fetched acodring to the settings in the
Expand Down Expand Up @@ -471,3 +474,62 @@ def detect_bbox(
crs = gdf.geometry.crs.to_epsg()
bounds = gdf.geometry.total_bounds
return bounds, crs

def to_stac_catalog(
self,
on_error: ErrorHandleMethod = ErrorHandleMethod.COERCE,
) -> Optional[StacCatalog]:
"""
Convert a geodataframe into a STAC Catalog representation.

Since geodataframes don't support temporal dimension the `datetime`
property will always be set to 0001-01-01. The collection will contain an
asset for each of the associated files.


Parameters
----------
- on_error (str, optional): The error handling strategy.
Options are: "raise" to raise an error on failure, "skip" to skip
the dataset on failure, and "coerce" (default) to set
default values on failure.

Returns
-------
- Optional[StacCatalog]: The STAC Catalog representation of the dataset, or
None if the dataset was skipped.
"""
try:
bbox, crs = self.get_bbox(detect=True)
bbox = list(bbox)
props = {**self.meta, "crs": crs}
except (IndexError, KeyError, pyproj.exceptions.CRSError) as e:
if on_error == ErrorHandleMethod.SKIP:
logger.warning(
"Skipping {name} during stac conversion because"
"because detecting spacial extent failed."
)
return
elif on_error == ErrorHandleMethod.COERCE:
bbox = [0.0, 0.0, 0.0, 0.0]
props = self.meta
else:
raise e
else:
stac_catalog = StacCatalog(
self.name,
description=self.name,
)
stac_item = StacItem(
self.name,
geometry=None,
bbox=list(bbox),
properties=props,
datetime=datetime(1, 1, 1),
)
stac_asset = StacAsset(str(self.path))
base_name = basename(self.path)
stac_item.add_asset(base_name, stac_asset)

stac_catalog.add_item(stac_item)
return stac_catalog
87 changes: 77 additions & 10 deletions hydromt/data_adapter/geodataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
import os
import warnings
from datetime import datetime
from os.path import join
from pathlib import Path
from typing import NewType, Optional, Tuple, Union
from os.path import basename, join
from typing import Optional, Union

import numpy as np
import pandas as pd
import pyproj
import xarray as xr
from pystac import Asset as StacAsset
from pystac import Catalog as StacCatalog
from pystac import Item as StacItem

from hydromt.typing import ErrorHandleMethod, GeoDatasetSource, TimeRange, TotalBounds

from .. import gis_utils, io
from ..nodata import NoDataStrategy, _exec_nodata_strat
Expand All @@ -21,8 +25,6 @@

__all__ = ["GeoDatasetAdapter", "GeoDatasetSource"]

GeoDatasetSource = NewType("GeoDatasetSource", Union[str, Path])


class GeoDatasetAdapter(DataAdapter):

Expand Down Expand Up @@ -482,7 +484,7 @@ def _apply_unit_conversion(self, ds, logger=logger):
ds[name].attrs.update(attrs) # set original attributes
return ds

def get_bbox(self, detect=True):
def get_bbox(self, detect=True) -> TotalBounds:
"""Return the bounding box and espg code of the dataset.

if the bounding box is not set and detect is True,
Expand All @@ -503,13 +505,14 @@ def get_bbox(self, detect=True):
The ESPG code of the CRS of the coordinates returned in bbox
"""
bbox = self.extent.get("bbox", None)
crs = self.crs
if bbox is None and detect:
bbox, crs = self.detect_bbox()

crs = self.crs

return bbox, crs

def get_time_range(self, detect=True):
def get_time_range(self, detect=True) -> TimeRange:
"""Detect the time range of the dataset.

if the time range is not set and detect is True,
Expand Down Expand Up @@ -538,7 +541,7 @@ def get_time_range(self, detect=True):
def detect_bbox(
self,
ds=None,
) -> Tuple[Tuple[float, float, float, float], int]:
) -> TotalBounds:
"""Detect the bounding box and crs of the dataset.

If no dataset is provided, it will be fetched according to the settings in the
Expand Down Expand Up @@ -569,7 +572,7 @@ def detect_bbox(
bounds = ds.vector.bounds
return bounds, crs

def detect_time_range(self, ds=None) -> Tuple[datetime, datetime]:
def detect_time_range(self, ds=None) -> TimeRange:
"""Detect the temporal range of the dataset.

If no dataset is provided, it will be fetched according to the settings in the
Expand All @@ -594,3 +597,67 @@ def detect_time_range(self, ds=None) -> Tuple[datetime, datetime]:
ds[ds.vector.time_dim].min().values,
ds[ds.vector.time_dim].max().values,
)

def to_stac_catalog(
self,
on_error: ErrorHandleMethod = ErrorHandleMethod.COERCE,
) -> Optional[StacCatalog]:
"""
Convert a geodataset into a STAC Catalog representation.

The collection will contain an asset for each of the associated files.


Parameters
----------
- on_error (str, optional): The error handling strategy.
Options are: "raise" to raise an error on failure, "skip" to skip
the dataset on failure, and "coerce" (default) to set default
values on failure.

Returns
-------
- Optional[StacCatalog]: The STAC Catalog representation of the dataset, or
None if the dataset was skipped.
"""
try:
bbox, crs = self.get_bbox(detect=True)
bbox = list(bbox)
start_dt, end_dt = self.get_time_range(detect=True)
start_dt = pd.to_datetime(start_dt)
end_dt = pd.to_datetime(end_dt)
props = {**self.meta, "crs": crs}
except (IndexError, KeyError, pyproj.exceptions.CRSError) as e:
if on_error == ErrorHandleMethod.SKIP:
logger.warning(
"Skipping {name} during stac conversion because"
"because detecting spacial extent failed."
)
return
elif on_error == ErrorHandleMethod.COERCE:
bbox = [0.0, 0.0, 0.0, 0.0]
props = self.meta
start_dt = datetime(1, 1, 1)
end_dt = datetime(1, 1, 1)
else:
raise e

stac_catalog = StacCatalog(
self.name,
description=self.name,
)
stac_item = StacItem(
self.name,
geometry=None,
bbox=bbox,
properties=props,
datetime=None,
start_datetime=start_dt,
end_datetime=end_dt,
)
stac_asset = StacAsset(str(self.path))
base_name = basename(self.path)
stac_item.add_asset(base_name, stac_asset)

stac_catalog.add_item(stac_item)
return stac_catalog
Loading