diff --git a/eis_toolkit/raster_processing/unifying.py b/eis_toolkit/raster_processing/unifying.py index d8b2b04f..4585e207 100644 --- a/eis_toolkit/raster_processing/unifying.py +++ b/eis_toolkit/raster_processing/unifying.py @@ -4,17 +4,51 @@ from beartype.typing import List, Literal, Sequence, Tuple from rasterio import warp from rasterio.enums import Resampling +from rasterio.profiles import Profile from eis_toolkit.exceptions import InvalidParameterValueException from eis_toolkit.raster_processing.resampling import RESAMPLE_METHOD_MAP +def _calculate_snapped_grid( + raster: rasterio.io.DatasetReader, dst_crs: rasterio.crs.CRS, dst_resolution: Tuple[float, float] +) -> Tuple[warp.Affine, int, int]: + dst_transform, dst_width, dst_height = warp.calculate_default_transform( + raster.crs, dst_crs, raster.width, raster.height, *raster.bounds, resolution=dst_resolution + ) + # The created transform might not be aligned with the base raster grid, so + # we still need to snap/align the transformation to closest grid corner + x_distance_to_grid = dst_transform.c % dst_resolution[0] + y_distance_to_grid = dst_transform.f % dst_resolution[1] + + if x_distance_to_grid > dst_resolution[0] / 2: # Snap towards right + c = dst_transform.c - x_distance_to_grid + dst_resolution[0] + else: # Snap towards left + c = dst_transform.c - x_distance_to_grid + + if y_distance_to_grid > dst_resolution[1] / 2: # Snap towards up + f = dst_transform.f - y_distance_to_grid + dst_resolution[1] + else: # Snap towards bottom + f = dst_transform.f - y_distance_to_grid + + # Create new transform with updated corner coordinates + dst_transform = warp.Affine( + dst_transform.a, # Pixel size x + dst_transform.b, # Shear parameter + c, # Up-left corner x-coordinate + dst_transform.d, # Shear parameter + dst_transform.e, # Pixel size y + f, # Up-left corner y-coordinate + ) + return dst_transform, dst_width, dst_height + + def _unify_raster_grids( base_raster: rasterio.io.DatasetReader, rasters_to_unify: Sequence[rasterio.io.DatasetReader], resampling_method: Resampling, same_extent: bool, -) -> List[Tuple[np.ndarray, dict]]: +) -> List[Tuple[np.ndarray, Profile]]: dst_crs = base_raster.crs dst_width = base_raster.width @@ -22,49 +56,20 @@ def _unify_raster_grids( dst_transform = base_raster.transform dst_resolution = (base_raster.transform.a, abs(base_raster.transform.e)) - out_rasters = [(base_raster.read(), base_raster.meta.copy())] - out_meta = base_raster.meta.copy() + out_rasters = [(base_raster.read(), base_raster.profile.copy())] for raster in rasters_to_unify: + out_profile = raster.profile.copy() # If we unify without clipping, things are more complicated and we need to # calculate corner coordinates, width and height, and snap the grid to nearest corner if not same_extent: - dst_transform, dst_width, dst_height = warp.calculate_default_transform( - raster.crs, dst_crs, raster.width, raster.height, *raster.bounds, resolution=dst_resolution - ) - # The created transform might not be aligned with the base raster grid, so - # we still need to snap/align the transformation to closest grid corner - x_distance_to_grid = dst_transform.c % dst_resolution[0] - y_distance_to_grid = dst_transform.f % dst_resolution[1] - - if x_distance_to_grid > dst_resolution[0] / 2: # Snap towards right - c = dst_transform.c - x_distance_to_grid + dst_resolution[0] - else: # Snap towards left - c = dst_transform.c - x_distance_to_grid - - if y_distance_to_grid > dst_resolution[1] / 2: # Snap towards up - f = dst_transform.f - y_distance_to_grid + dst_resolution[1] - else: # Snap towards bottom - f = dst_transform.f - y_distance_to_grid - - # Create new transform with updated corner coordinates - dst_transform = warp.Affine( - dst_transform.a, # Pixel size x - dst_transform.b, # Shear parameter - c, # Up-left corner x-coordinate - dst_transform.d, # Shear parameter - dst_transform.e, # Pixel size y - f, # Up-left corner y-coordinate - ) - - out_meta["transform"] = dst_transform - out_meta["width"] = dst_width - out_meta["height"] = dst_height + dst_transform, dst_width, dst_height = _calculate_snapped_grid(raster, dst_crs, dst_resolution) # Initialize output raster arrary dst_array = np.empty((base_raster.count, dst_height, dst_width)) - dst_array.fill(base_raster.meta["nodata"]) + nodata = out_profile["nodata"] + dst_array.fill(nodata) src_array = raster.read() @@ -72,15 +77,17 @@ def _unify_raster_grids( source=src_array, src_crs=raster.crs, src_transform=raster.transform, - src_nodata=raster.meta["nodata"], + src_nodata=nodata, destination=dst_array, dst_crs=dst_crs, dst_transform=dst_transform, - dst_nodata=base_raster.meta["nodata"], + dst_nodata=nodata, resampling=resampling_method, )[0] - out_rasters.append((out_image, out_meta)) + out_profile.update({"transform": dst_transform, "width": dst_width, "height": dst_height, "crs": dst_crs}) + + out_rasters.append((out_image, out_profile)) return out_rasters @@ -91,7 +98,7 @@ def unify_raster_grids( rasters_to_unify: Sequence[rasterio.io.DatasetReader], resampling_method: Literal["nearest", "bilinear", "cubic", "average", "gauss", "max", "min"] = "nearest", same_extent: bool = False, -) -> List[Tuple[np.ndarray, dict]]: +) -> List[Tuple[np.ndarray, Profile]]: """Unifies (reprojects, resamples, aligns and optionally clips) given rasters relative to base raster. Args: @@ -104,7 +111,7 @@ def unify_raster_grids( as the base raster. Expands smaller rasters with nodata cells. Defaults to False. Returns: - List of unified rasters' data and metadata. First element is the base raster. + List of unified rasters' data and profiles. First element is the base raster. Raises: InvalidParameterValueException: Rasters to unify is empty.