Skip to content

Commit

Permalink
distance_computation edits: improve documentation, add missing raster…
Browse files Browse the repository at this point in the history
… check, comment out the old implementation
  • Loading branch information
nmaarnio committed Nov 14, 2024
1 parent acf1228 commit dc363b4
Showing 1 changed file with 96 additions and 88 deletions.
184 changes: 96 additions & 88 deletions eis_toolkit/vector_processing/distance_computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,109 +6,30 @@
from beartype.typing import Optional, Union
from numba import njit, prange
from rasterio import profiles, transform
from shapely.geometry.base import BaseGeometry, BaseMultipartGeometry

from eis_toolkit import exceptions
from eis_toolkit.utilities.checks.raster import check_raster_profile
from eis_toolkit.utilities.miscellaneous import row_points


@beartype
def distance_computation(
geodataframe: gpd.GeoDataFrame, raster_profile: Union[profiles.Profile, dict], max_distance: Optional[Number] = None
) -> np.ndarray:
"""Calculate distance from raster cell to nearest geometry.
Args:
geodataframe: The GeoDataFrame with geometries to determine distance to.
raster_profile: The raster profile of the raster in which the distances
to the nearest geometry are determined.
max_distance: The maximum distance in the output array.
Returns:
A 2D numpy array with the distances computed.
Raises:
NonMatchingCrsException: The input raster profile and geodataframe have mismatching CRS.
EmptyDataFrameException: The input geodataframe is empty.
NumericValueSignException: Max distance is defined and is not a positive number.
"""
if raster_profile.get("crs") != geodataframe.crs:
raise exceptions.NonMatchingCrsException(
"Expected coordinate systems to match between raster and GeoDataFrame."
)
if geodataframe.shape[0] == 0:
raise exceptions.EmptyDataFrameException("Expected GeoDataFrame to not be empty.")
if max_distance is not None and max_distance <= 0:
raise exceptions.NumericValueSignException("Expected max distance to be a positive number.")

check_raster_profile(raster_profile=raster_profile)

raster_width = raster_profile.get("width")
raster_height = raster_profile.get("height")
raster_transform = raster_profile.get("transform")

distance_matrix = _distance_computation(
raster_width=raster_width,
raster_height=raster_height,
raster_transform=raster_transform,
geodataframe=geodataframe,
)
if max_distance is not None:
distance_matrix[distance_matrix > max_distance] = max_distance

return distance_matrix


def _calculate_row_distances(
row: int,
cols: np.ndarray,
raster_transform: transform.Affine,
geometries_unary_union: Union[BaseGeometry, BaseMultipartGeometry],
) -> np.ndarray:
row_distances = np.array(
[
point.distance(geometries_unary_union)
for point in row_points(row=row, cols=cols, raster_transform=raster_transform)
]
)
return row_distances


def _distance_computation(
raster_width: int, raster_height: int, raster_transform: transform.Affine, geodataframe: gpd.GeoDataFrame
) -> np.ndarray:

cols = np.arange(raster_width)
rows = np.arange(raster_height)

geometries_unary_union = geodataframe.geometry.unary_union

distance_matrix = np.array(
[
_calculate_row_distances(
row=row, cols=cols, raster_transform=raster_transform, geometries_unary_union=geometries_unary_union
)
for row in rows
]
)

return distance_matrix


def distance_computation_optimized(
geodataframe: gpd.GeoDataFrame, raster_profile: Union[profiles.Profile, dict], max_distance: Optional[float] = None
) -> np.ndarray:
"""
Calculate distance from raster cell to nearest geometry.
Calculate distance from each raster cell (centre) to the nearest input geometry.
Pixels on top of input geometries are assigned distance of 0.
Uses Numba-optimized calculations.
Uses Numba to perform calculations quickly. The computation time increases (roughly)
linearly with the amount of raster pixels defined by given `raster_profile`. Supports
Polygon, MultiPolygon, LineString, MultiLineString, Point and MultiPoint geometries.
Args:
geodataframe: The GeoDataFrame with geometries to determine distance to.
raster_profile: The raster profile of the raster in which the distances
to the nearest geometry are determined.
max_distance: The maximum distance in the output array.
max_distance: The maximum distance in the output array. Pixels beyond this
distance will be assigned `max_distance` value.
Returns:
A 2D numpy array with the distances computed.
Expand All @@ -127,14 +48,16 @@ def distance_computation_optimized(
if max_distance is not None and max_distance <= 0:
raise exceptions.NumericValueSignException("Expected max distance to be a positive number.")

check_raster_profile(raster_profile=raster_profile)

raster_width = raster_profile.get("width")
raster_height = raster_profile.get("height")
raster_transform = raster_profile.get("transform")

# Generate the grid of raster cell center points
raster_points = _generate_raster_points(raster_width, raster_height, raster_transform)

# Flatten geometry segments for Numba-compatible processing
# Initialize lists needed for Numba-compatible calculations
segment_coords = [] # These will also contain points coords, if present
segment_indices = [0] # Start index
polygon_coords = []
Expand Down Expand Up @@ -296,3 +219,88 @@ def _point_in_polygon(px: Number, py: Number, polygon_coords: np.ndarray, polygo
if inside:
return True
return False


# @beartype
# def distance_computation(
# geodataframe: gpd.GeoDataFrame,
# raster_profile: Union[profiles.Profile, dict],
# max_distance: Optional[Number] = None
# ) -> np.ndarray:
# """Calculate distance from raster cell to nearest geometry.

# Args:
# geodataframe: The GeoDataFrame with geometries to determine distance to.
# raster_profile: The raster profile of the raster in which the distances
# to the nearest geometry are determined.
# max_distance: The maximum distance in the output array.

# Returns:
# A 2D numpy array with the distances computed.

# Raises:
# NonMatchingCrsException: The input raster profile and geodataframe have mismatching CRS.
# EmptyDataFrameException: The input geodataframe is empty.
# NumericValueSignException: Max distance is defined and is not a positive number.
# """
# if raster_profile.get("crs") != geodataframe.crs:
# raise exceptions.NonMatchingCrsException(
# "Expected coordinate systems to match between raster and GeoDataFrame."
# )
# if geodataframe.shape[0] == 0:
# raise exceptions.EmptyDataFrameException("Expected GeoDataFrame to not be empty.")
# if max_distance is not None and max_distance <= 0:
# raise exceptions.NumericValueSignException("Expected max distance to be a positive number.")

# check_raster_profile(raster_profile=raster_profile)

# raster_width = raster_profile.get("width")
# raster_height = raster_profile.get("height")
# raster_transform = raster_profile.get("transform")

# distance_matrix = _distance_computation(
# raster_width=raster_width,
# raster_height=raster_height,
# raster_transform=raster_transform,
# geodataframe=geodataframe,
# )
# if max_distance is not None:
# distance_matrix[distance_matrix > max_distance] = max_distance

# return distance_matrix


# def _calculate_row_distances(
# row: int,
# cols: np.ndarray,
# raster_transform: transform.Affine,
# geometries_unary_union: Union[BaseGeometry, BaseMultipartGeometry],
# ) -> np.ndarray:
# row_distances = np.array(
# [
# point.distance(geometries_unary_union)
# for point in row_points(row=row, cols=cols, raster_transform=raster_transform)
# ]
# )
# return row_distances


# def _distance_computation(
# raster_width: int, raster_height: int, raster_transform: transform.Affine, geodataframe: gpd.GeoDataFrame
# ) -> np.ndarray:

# cols = np.arange(raster_width)
# rows = np.arange(raster_height)

# geometries_unary_union = geodataframe.geometry.unary_union

# distance_matrix = np.array(
# [
# _calculate_row_distances(
# row=row, cols=cols, raster_transform=raster_transform, geometries_unary_union=geometries_unary_union
# )
# for row in rows
# ]
# )

# return distance_matrix

0 comments on commit dc363b4

Please sign in to comment.