From 962f950884fbf6f6b3a1ba66ed96b7ace675c7ae Mon Sep 17 00:00:00 2001 From: Anton Bakker Date: Wed, 22 Nov 2023 14:59:44 +0100 Subject: [PATCH] update geodense + refactor for code sharing with geodense --- pyproject.toml | 2 +- .../callback.py | 62 -- .../cityjson/models.py | 4 +- .../constants.py | 8 + .../crs_transform.py | 292 +++++++++ src/coordinates_transformation_api/main.py | 43 +- src/coordinates_transformation_api/models.py | 93 +-- src/coordinates_transformation_api/types.py | 18 + src/coordinates_transformation_api/util.py | 595 ++++-------------- tests/test_geojson_bbox.py | 2 +- tests/test_geojson_transformation.py | 2 +- 11 files changed, 504 insertions(+), 617 deletions(-) delete mode 100644 src/coordinates_transformation_api/callback.py create mode 100644 src/coordinates_transformation_api/constants.py create mode 100644 src/coordinates_transformation_api/crs_transform.py create mode 100644 src/coordinates_transformation_api/types.py diff --git a/pyproject.toml b/pyproject.toml index aaefdbb..517a6bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dependencies = [ "pyproj == 3.6.0", "pydantic-settings == 2.0.2", "email-validator == 2.0.0", - "geodense@git+https://github.com/GeodetischeInfrastructuur/geodense#egg=804ef492aae1de651825df66c35d06c007ccdb4b", + "geodense@git+https://github.com/GeodetischeInfrastructuur/geodense@925a3ae377be64e784745850d78689702ed2cd69", ] requires-python = ">=3.11.4" dynamic = ["version"] diff --git a/src/coordinates_transformation_api/callback.py b/src/coordinates_transformation_api/callback.py deleted file mode 100644 index b5e70e2..0000000 --- a/src/coordinates_transformation_api/callback.py +++ /dev/null @@ -1,62 +0,0 @@ -from typing import Callable, cast - -from pyproj import CRS, Transformer - -# note get_transform_callback is in it's own file to prevent cyclical import between cityjson.py and util.py -# since cityjson.py requires get_transform_callback and util.py requires CityJsonV113 from cityjson.py -coordinates_type = tuple[float, float] | tuple[float, float, float] | list[float] -TWO_DIMENSIONAL = 2 -THREE_DIMENSIONAL = 3 - - -def get_transformer(source_crs: str, target_crs: str) -> Transformer: - source_crs_crs = CRS.from_authority(*source_crs.split(":")) - target_crs_crs = CRS.from_authority(*target_crs.split(":")) - return Transformer.from_crs(source_crs_crs, target_crs_crs, always_xy=True) - - -def get_transform_callback( - source_crs: str, - target_crs: str, - precision: int | None = None, - epoch: float | None = None, -) -> Callable[[coordinates_type], tuple[float, ...],]: - """TODO: improve type annotation/handling geojson/cityjson transformation, with the current implementation mypy is not complaining""" - - def my_round(val: float, precision: int | None) -> float | int: - if precision is None: - return val - else: - return round(val, precision) - - def callback(val: coordinates_type) -> tuple[float, ...]: - transformer = get_transformer(source_crs, target_crs) - - if transformer.target_crs is None: - raise ValueError("transformer.target_crs is None") - dim = len(transformer.target_crs.axis_info) - if ( - dim is not None - and dim != len(val) - and TWO_DIMENSIONAL > dim > THREE_DIMENSIONAL - ): - # check so we can safely cast to tuple[float, float], tuple[float, float, float] - raise ValueError( - f"number of dimensions of target-crs should be 2 or 3, is {dim}" - ) - val = cast(tuple[float, float] | tuple[float, float, float], val[0:dim]) - input = tuple([*val, float(epoch) if epoch is not None else None]) - - # GeoJSON and CityJSON by definition has coordinates always in lon-lat-height (or x-y-z) order. Transformer has been created with `always_xy=True`, - # to ensure input and output coordinates are in in lon-lat-height (or x-y-z) order. - # Regarding the epoch: this is stripped from the result of the transformer. It's used as a input parameter for the transformation but is not - # 'needed' in the result, because there is no conversion of time, e.i. a epoch value of 2010.0 will stay 2010.0 in the result. Therefor the result - # of the transformer is 'stripped' with [0:dim] - return tuple( - [ - float(my_round(x, precision)) - for x in transformer.transform(*input)[0:dim] - ] - ) - - return callback diff --git a/src/coordinates_transformation_api/cityjson/models.py b/src/coordinates_transformation_api/cityjson/models.py index 9825bb4..f2bc480 100644 --- a/src/coordinates_transformation_api/cityjson/models.py +++ b/src/coordinates_transformation_api/cityjson/models.py @@ -14,7 +14,7 @@ from pydantic import AnyUrl, BaseModel, EmailStr, Field, StringConstraints from pyproj import CRS -from coordinates_transformation_api.callback import get_transform_callback +from coordinates_transformation_api.crs_transform import get_transform_crs_fun CityJSONBoundary = Union[ list[list[list[int]]], @@ -1291,7 +1291,7 @@ def get_x_unit_crs(self: CityjsonV113, crs_str: str) -> str: def crs_transform( self: CityjsonV113, source_crs: str, target_crs: str, epoch: float | None = None ) -> None: - callback = get_transform_callback(source_crs, target_crs, epoch=epoch) + callback = get_transform_crs_fun(source_crs, target_crs, epoch=epoch) imp_digits = math.ceil(abs(math.log(self.transform.scale[0], 10))) self.decompress() self.vertices = [ diff --git a/src/coordinates_transformation_api/constants.py b/src/coordinates_transformation_api/constants.py new file mode 100644 index 0000000..9bc59fa --- /dev/null +++ b/src/coordinates_transformation_api/constants.py @@ -0,0 +1,8 @@ +DEFAULT_PRECISION = 4 +DENSIFY_CRS = "EPSG:4258" +DEVIATION_VALID_BBOX = [ + 3.1201, + 50.2191, + 7.5696, + 54.1238, +] # bbox in epsg:4258 - area valid for doing density check (based on deviation param) diff --git a/src/coordinates_transformation_api/crs_transform.py b/src/coordinates_transformation_api/crs_transform.py new file mode 100644 index 0000000..677c665 --- /dev/null +++ b/src/coordinates_transformation_api/crs_transform.py @@ -0,0 +1,292 @@ +from collections import Counter +from collections.abc import Generator +from itertools import chain +from typing import Any, Callable, cast + +from geodense.geojson import CrsFeatureCollection +from geodense.lib import ( # type: ignore # type: ignore + THREE_DIMENSIONAL, + TWO_DIMENSIONAL, + GeojsonObject, + apply_function_on_geojson_geometries, +) +from geodense.types import GeojsonGeomNoGeomCollection +from geojson_pydantic import Feature, GeometryCollection +from geojson_pydantic.geometries import _GeometryBase +from geojson_pydantic.types import BBox +from pyproj import CRS, Transformer +from shapely import GeometryCollection as ShpGeometryCollection +from shapely.geometry import shape + +from coordinates_transformation_api.constants import DEFAULT_PRECISION +from coordinates_transformation_api.models import Crs as MyCrs +from coordinates_transformation_api.types import CoordinatesType + + +def get_precision(target_crs_crs: MyCrs) -> int: + unit = target_crs_crs.get_x_unit_crs() + if unit == "degree": + return DEFAULT_PRECISION + 5 + return DEFAULT_PRECISION + + +def get_shapely_objects( + body: GeojsonObject, +) -> list[Any]: + def merge_geometry_collections_shapelyfication(input_shp_geoms: list) -> list: + indices = list( + map( + lambda x: x["index"][0] if hasattr(x, "index") else None, + input_shp_geoms, + ) + ) + counter = Counter(indices) + geom_coll_indices = [x for x in counter if counter[x] > 1] + output_shp_geoms = [ + x["result"] + for x in input_shp_geoms + if not hasattr(x, "index") or x["index"][0] not in geom_coll_indices + ] + for i in geom_coll_indices: + geom_collection_geoms = [ + x["result"] for x in input_shp_geoms if x["index"][0] == i + ] + output_shp_geoms.append(ShpGeometryCollection(geom_collection_geoms)) + return output_shp_geoms + + transform_fun = get_shapely_object_fun() + result = apply_function_on_geojson_geometries(body, transform_fun) + return merge_geometry_collections_shapelyfication(result) + + +def get_shapely_object_fun() -> Callable: + def shapely_object( + geometry_dict: dict[str, Any], result: list, indices: list[int] | None = None + ) -> None: + shp_obj = shape(geometry_dict) + result_item = {"result": shp_obj} + if indices is not None: + result_item["index"] = indices + result.append(result_item) + + return shapely_object + + +def get_update_geometry_bbox_fun() -> Callable: + def update_bbox( + geometry: GeojsonGeomNoGeomCollection, + _result: list, + _indices: list[int] | None = None, + ) -> None: + coordinates = get_coordinates_from_geometry(geometry) + geometry.bbox = get_bbox_from_coordinates(coordinates) + + return update_bbox + + +def get_crs_transform_fun( + source_crs: str, target_crs: str, epoch: float | None = None +) -> Callable: + target_crs_crs: MyCrs = MyCrs.from_crs_str(target_crs) + precision = get_precision(target_crs_crs) + + def my_fun( + geom: GeojsonGeomNoGeomCollection, + _result: list, + _indices: list[int] | None = None, + ) -> ( + None + ): # add _result, _indices args since required by transform_geometries_req_body + callback = get_transform_crs_fun(source_crs, target_crs, precision, epoch=epoch) + geom.coordinates = traverse_geojson_coordinates( + cast(list[list[Any]] | list[float] | list[int], geom.coordinates), + callback=callback, + ) + + return my_fun + + +def get_validate_json_coords_fun() -> Callable: + def validate_json_coords( + geometry: GeojsonGeomNoGeomCollection, + result: list, + _indices: list[int] | None = None, + ) -> None: + def coords_has_inf(coordinates: Any) -> bool: # noqa: ANN401 + gen = ( + x + for x in explode(coordinates) + if abs(x[0]) == float("inf") or abs(x[1]) == float("inf") + ) + return next(gen, None) is not None + + coords = get_coordinates_from_geometry(geometry) + result.append(coords_has_inf(coords)) + # TODO: HANDLE result in calling code + # if coords_has_inf(coordinates): + # raise_response_validation_error( + # "Out of range float values are not JSON compliant", ["responseBody"] + # ) + + return validate_json_coords + + +def update_bbox_geojson_object( # noqa: C901 + geojson_obj: GeojsonObject, +) -> None: + def rec_fun( # noqa: C901 + geojson_obj: GeojsonObject, + ) -> list: + if isinstance(geojson_obj, CrsFeatureCollection): + fc_coords: list = [] + for ft in geojson_obj.features: + fc_coords.append(rec_fun(ft)) + if geojson_obj.bbox is not None: + geojson_obj.bbox = get_bbox_from_coordinates(fc_coords) + return fc_coords + elif isinstance(geojson_obj, Feature): + ft_coords: list = [] + if geojson_obj.geometry is None: + return ft_coords + ft_coords = rec_fun(geojson_obj.geometry) + if geojson_obj.bbox is not None: + geojson_obj.bbox = get_bbox_from_coordinates(ft_coords) + return ft_coords + elif isinstance(geojson_obj, GeometryCollection): + gc_coords: list = [] + for geom in geojson_obj.geometries: + gc_coords.append(rec_fun(geom)) + if geojson_obj.bbox is not None: + geojson_obj.bbox = get_bbox_from_coordinates(gc_coords) + return gc_coords + elif isinstance(geojson_obj, _GeometryBase): + geom_coords: list = get_coordinates_from_geometry(geojson_obj) + if geojson_obj.bbox is not None: + geojson_obj.bbox = get_bbox_from_coordinates(geom_coords) + return geom_coords + else: + raise ValueError( + f"received unexpected type in geojson_obj var: {type(geojson_obj)}" + ) + + _ = rec_fun(geojson_obj) + + +def traverse_geojson_coordinates( + geojson_coordinates: list[list] | list[float] | list[int], + callback: Callable[ + [CoordinatesType], + tuple[float, ...], + ], +) -> Any: # noqa: ANN401 + """traverse GeoJSON coordinates object and apply callback function to coordinates-nodes + + Args: + obj: GeoJSON coordinates object + callback (): callback function to transform coordinates-nodes + + Returns: + GeoJSON coordinates object + """ + if all(isinstance(x, (float, int)) for x in geojson_coordinates): + return callback(cast(list[float], geojson_coordinates)) + else: + coords = cast(list[list], geojson_coordinates) + return [ + traverse_geojson_coordinates(elem, callback=callback) for elem in coords + ] + + +def get_coordinates_from_geometry( + item: _GeometryBase, +) -> list: + geom = cast(_GeometryBase, item) + return list( + chain(explode(geom.coordinates)) + ) # TODO: check if chain(list()) is required... + + +def explode(coords: Any) -> Generator[Any, Any, None]: # noqa: ANN401 + """Explode a GeoJSON geometry's coordinates object and yield coordinate tuples. + As long as the input is conforming, the type of the geometry doesn't matter. + Source: https://gis.stackexchange.com/a/90554 + """ + for e in coords: + if isinstance( + e, + ( + float, + int, + ), + ): + yield coords + break + else: + yield from explode(e) + + +def get_bbox_from_coordinates(coordinates: Any) -> BBox: # noqa: ANN401 + coordinate_tuples = list(zip(*list(explode(coordinates)))) + if len(coordinate_tuples) == TWO_DIMENSIONAL: + x, y = coordinate_tuples + return min(x), min(y), max(x), max(y) + elif len(coordinate_tuples) == THREE_DIMENSIONAL: + x, y, z = coordinate_tuples + return min(x), min(y), min(z), max(x), max(y), max(z) + else: + raise ValueError( + f"expected length of coordinate tuple is either 2 or 3, got {len(coordinate_tuples)}" + ) + + +def get_transformer(source_crs: str, target_crs: str) -> Transformer: + source_crs_crs = CRS.from_authority(*source_crs.split(":")) + target_crs_crs = CRS.from_authority(*target_crs.split(":")) + return Transformer.from_crs(source_crs_crs, target_crs_crs, always_xy=True) + + +def get_transform_crs_fun( + source_crs: str, + target_crs: str, + precision: int | None = None, + epoch: float | None = None, +) -> Callable[[CoordinatesType], tuple[float, ...],]: + """TODO: improve type annotation/handling geojson/cityjson transformation, with the current implementation mypy is not complaining""" + + transformer = get_transformer(source_crs, target_crs) + + def my_round(val: float, precision: int | None) -> float | int: + if precision is None: + return val + else: + return round(val, precision) + + def transform_crs(val: CoordinatesType) -> tuple[float, ...]: + if transformer.target_crs is None: + raise ValueError("transformer.target_crs is None") + dim = len(transformer.target_crs.axis_info) + if ( + dim is not None + and dim != len(val) + and TWO_DIMENSIONAL > dim > THREE_DIMENSIONAL + ): + # check so we can safely cast to tuple[float, float], tuple[float, float, float] + raise ValueError( + f"number of dimensions of target-crs should be 2 or 3, is {dim}" + ) + val = cast(tuple[float, float] | tuple[float, float, float], val[0:dim]) + input = tuple([*val, float(epoch) if epoch is not None else None]) + + # GeoJSON and CityJSON by definition has coordinates always in lon-lat-height (or x-y-z) order. Transformer has been created with `always_xy=True`, + # to ensure input and output coordinates are in in lon-lat-height (or x-y-z) order. + # Regarding the epoch: this is stripped from the result of the transformer. It's used as a input parameter for the transformation but is not + # 'needed' in the result, because there is no conversion of time, e.i. a epoch value of 2010.0 will stay 2010.0 in the result. Therefor the result + # of the transformer is 'stripped' with [0:dim] + return tuple( + [ + float(my_round(x, precision)) + for x in transformer.transform(*input)[0:dim] + ] + ) + + return transform_crs diff --git a/src/coordinates_transformation_api/main.py b/src/coordinates_transformation_api/main.py index 067bbac..fdd84a4 100644 --- a/src/coordinates_transformation_api/main.py +++ b/src/coordinates_transformation_api/main.py @@ -11,14 +11,11 @@ from fastapi.responses import JSONResponse, PlainTextResponse from fastapi.routing import APIRoute from fastapi.staticfiles import StaticFiles +from geodense.geojson import CrsFeatureCollection from geojson_pydantic import Feature from geojson_pydantic.geometries import Geometry, GeometryCollection from coordinates_transformation_api import assets -from coordinates_transformation_api.callback import ( - coordinates_type, - get_transform_callback, -) from coordinates_transformation_api.cityjson.models import CityjsonV113 from coordinates_transformation_api.fastapi_rfc7807 import middleware from coordinates_transformation_api.limit_middleware.middleware import ( @@ -28,7 +25,6 @@ from coordinates_transformation_api.models import ( Conformance, Crs, - CrsFeatureCollection, DensityCheckReport, LandingPage, Link, @@ -36,21 +32,20 @@ ) from coordinates_transformation_api.settings import app_settings from coordinates_transformation_api.util import ( - THREE_DIMENSIONAL, accept_html, - apply_function_on_geometries_of_request_body, + convert_point_coords_to_wkt, crs_transform, densify_request_body, density_check_request_body, extract_authority_code, format_as_uri, - get_precision, get_source_crs_body, - get_validate_json_coords_fun, init_oas, raise_response_validation_error, raise_validation_error, + transform_coordinates, validate_coords_source_crs, + validate_crs_transformed_geojson, validate_crss, validate_input_max_segment_deviation_length, ) @@ -234,7 +229,7 @@ async def transform( # noqa: PLR0913, ANN201 alias="coordinates", regex=r"^(\d+\.?\d*),(\d+\.?\d*)(,\d+\.?\d*)?$" ), source_crs: str = Query(alias="source-crs", default=None), - target_crs: str = Query(alias="target-crs", defaANN201ult=None), + target_crs: str = Query(alias="target-crs", default=None), epoch: float = Query(alias="epoch", default=None), content_crs: str = Header(alias="content-crs", default=None), accept_crs: str = Header(alias="accept-crs", default=None), @@ -273,16 +268,9 @@ async def transform( # noqa: PLR0913, ANN201 raise ValueError( f"could not instantiate CRS object for CRS with id {target_crs}" ) - precision = get_precision(target_crs_crs) - - coordinates_list: coordinates_type = list( - float(x) for x in coordinates.split(",") - ) # convert to list since we do not know dimensionality of coordinates - callback = get_transform_callback( - source_crs, target_crs, precision=precision, epoch=epoch + transformed_coordinates = transform_coordinates( + coordinates, source_crs, target_crs, epoch, target_crs_crs ) - # TODO: fix following type ignore - transformed_coordinates = callback(coordinates_list) if float("inf") in [abs(x) for x in transformed_coordinates]: raise_response_validation_error( @@ -290,14 +278,9 @@ async def transform( # noqa: PLR0913, ANN201 ) if accept == str(TransformGetAcceptHeaders.wkt.value): - if len(transformed_coordinates) == THREE_DIMENSIONAL: - return PlainTextResponse( - f"POINT Z ({' '.join([str(x) for x in transformed_coordinates])})", - headers=set_response_headers(t_crs, epoch), - ) - - return PlainTextResponse( - f"POINT({' '.join([str(x) for x in transformed_coordinates])})", + wkt_string = convert_point_coords_to_wkt(coordinates) + PlainTextResponse( + wkt_string, headers=set_response_headers(t_crs, epoch), ) else: # default case serve json @@ -472,11 +455,7 @@ async def post_transform( # noqa: ANN201, PLR0913 ) else: crs_transform(body, s_crs, t_crs, epoch) - - validate_json_coords_fun = get_validate_json_coords_fun() - _ = apply_function_on_geometries_of_request_body( - body, validate_json_coords_fun - ) # TODO: replace with try except block - saves an JSON serialization ?? + validate_crs_transformed_geojson(body) return JSONResponse( content=body.model_dump(exclude_none=True), diff --git a/src/coordinates_transformation_api/models.py b/src/coordinates_transformation_api/models.py index 69cb4b1..871b8ec 100644 --- a/src/coordinates_transformation_api/models.py +++ b/src/coordinates_transformation_api/models.py @@ -1,9 +1,35 @@ from enum import Enum -from typing import Literal, Optional +from typing import Optional -from geojson_pydantic import FeatureCollection -from pydantic import BaseModel, Field, computed_field -from pyproj import CRS +from pydantic import BaseModel, computed_field +from pyproj import CRS as ProjCrs # noqa: N811 + + +class Link(BaseModel): + title: str + type: str + rel: str + href: str + + +class LandingPage(BaseModel): + title: str + description: str + links: list[Link] + + +class Conformance(BaseModel): + conformsTo: list[str] = [] # noqa: N815 + + +class DensityCheckReport(BaseModel): + passes_check: bool + report: Optional[list[tuple[list[int], float]]] + + +class TransformGetAcceptHeaders(Enum): + json = "application/json" + wkt = "text/plain" class Axis(BaseModel): @@ -27,7 +53,7 @@ class Crs(BaseModel): def from_crs_str(cls, crs_st: str) -> "Crs": # noqa: ANN102 # Do some math here and later set the values auth, identifier = crs_st.split(":") - crs = CRS.from_authority(auth, identifier) + crs = ProjCrs.from_authority(auth, identifier) axes = [ Axis( name=a.name, @@ -75,60 +101,3 @@ def get_x_unit_crs(self: "Crs") -> str: f"Unexpected unit in x axis (x, e, lon) CRS {self.crs_auth_identifier} - expected values: degree, meter, actual value: {unit_name}" ) return unit_name - - -class Link(BaseModel): - title: str - type: str - rel: str - href: str - - -class LandingPage(BaseModel): - title: str - description: str - links: list[Link] - - -class Conformance(BaseModel): - conformsTo: list[str] = [] # noqa: N815 - - -class DensityCheckReport(BaseModel): - passes_check: bool - report: Optional[list[tuple[list[int], float]]] - - -class TransformGetAcceptHeaders(Enum): - json = "application/json" - wkt = "text/plain" - - -class GeoJsonCrsProp(BaseModel): - # OGC URN scheme - 8.2 in OGC 05-103 - # urn:ogc:def:crs:{crs_auth}:{crs_version}:{crs_identifier} - name: str = Field(pattern=r"^urn:ogc:def:crs:.*?:.*?:.*?$") - - -class GeoJsonCrs(BaseModel): - properties: GeoJsonCrsProp - type: Literal["name"] - - -class CrsFeatureCollection(FeatureCollection): - crs: Optional[GeoJsonCrs] = None - - def get_crs_auth_code(self: "CrsFeatureCollection") -> str | None: - if self.crs is None: - return None - source_crs_urn_string = self.crs.properties.name - source_crs_urn_list = source_crs_urn_string.split(":") - crs_authority = source_crs_urn_list[4] - crs_identifier = source_crs_urn_list[6] - return f"{crs_authority}:{crs_identifier}" - - def set_crs_auth_code(self: "CrsFeatureCollection", crs_auth_code: str) -> None: - crs_auth, crs_identifier = crs_auth_code.split(":") - if self.crs is None: - raise ValueError(f"self.crs is none of CrsFeatureCollection: {self}") - self.crs.properties.name = f"urn:ogc:def:crs:{crs_auth}::{crs_identifier}" diff --git a/src/coordinates_transformation_api/types.py b/src/coordinates_transformation_api/types.py new file mode 100644 index 0000000..be169bb --- /dev/null +++ b/src/coordinates_transformation_api/types.py @@ -0,0 +1,18 @@ +from geojson_pydantic.types import ( + LineStringCoords, + MultiLineStringCoords, + MultiPointCoords, + MultiPolygonCoords, + PolygonCoords, + Position, +) + +GeojsonCoordinates = ( + Position + | PolygonCoords + | LineStringCoords + | MultiPointCoords + | MultiLineStringCoords + | MultiPolygonCoords +) +CoordinatesType = tuple[float, float] | tuple[float, float, float] | list[float] diff --git a/src/coordinates_transformation_api/util.py b/src/coordinates_transformation_api/util.py index eac3c5d..2f4e5b3 100644 --- a/src/coordinates_transformation_api/util.py +++ b/src/coordinates_transformation_api/util.py @@ -3,85 +3,46 @@ import logging import math import re -from collections import Counter -from collections.abc import Iterable from importlib import resources as impresources -from itertools import chain -from typing import Any, Callable, TypedDict, cast +from typing import Any import yaml from fastapi import Request from fastapi.exceptions import RequestValidationError, ResponseValidationError -from geodense.lib import ( # type: ignore - check_density_geometry_coordinates, + +from geodense.geojson import CrsFeatureCollection +from geodense.lib import ( # type: ignore # type: ignore + THREE_DIMENSIONAL, + GeojsonObject, + apply_function_on_geojson_geometries, densify_geojson_object, + get_density_check_fun, ) from geodense.models import ( DenseConfig, ) -from geojson_pydantic import ( - Feature, - LineString, - MultiLineString, - MultiPoint, - MultiPolygon, - Point, - Polygon, -) -from geojson_pydantic.geometries import Geometry, GeometryCollection, _GeometryBase -from geojson_pydantic.types import ( - BBox, - LineStringCoords, - MultiLineStringCoords, - MultiPointCoords, - MultiPolygonCoords, - PolygonCoords, - Position, -) from pydantic import ValidationError from pydantic_core import InitErrorDetails, PydanticCustomError from pyproj import CRS -from shapely import GeometryCollection as ShpGeometryCollection from shapely import STRtree, box -from shapely.geometry import shape from coordinates_transformation_api import assets -from coordinates_transformation_api.callback import ( - get_transform_callback, -) from coordinates_transformation_api.cityjson.models import CityjsonV113 +from coordinates_transformation_api.constants import DENSIFY_CRS, DEVIATION_VALID_BBOX +from coordinates_transformation_api.crs_transform import ( + get_crs_transform_fun, + get_precision, + get_shapely_objects, + get_transform_crs_fun, + get_validate_json_coords_fun, + update_bbox_geojson_object, +) from coordinates_transformation_api.models import Crs as MyCrs -from coordinates_transformation_api.models import CrsFeatureCollection from coordinates_transformation_api.settings import app_settings - -GeojsonGeomNoGeomCollection = ( - Point | MultiPoint | LineString | MultiLineString | Polygon | MultiPolygon -) - -GeojsonCoordinates = ( - Position - | PolygonCoords - | LineStringCoords - | MultiPointCoords - | MultiLineStringCoords - | MultiPolygonCoords -) - +from coordinates_transformation_api.types import CoordinatesType logger = logging.getLogger(__name__) -coordinates_type = tuple[float, float] | tuple[float, float, float] | list[float] -TWO_DIMENSIONAL = 2 -THREE_DIMENSIONAL = 3 - -DENSIFY_CRS = "EPSG:4258" -DEVIATION_VALID_BBOX = [ - 3.1201, - 50.2191, - 7.5696, - 54.1238, -] # bbox in epsg:4258 - area valid for doing density check (based on deviation param) - def validate_crs_transformation( source_crs, target_crs, projections_axis_info: list[MyCrs] @@ -98,22 +59,10 @@ def validate_crs_transformation( ) if source_crs_dims < target_crs_dims: - raise RequestValidationError( - errors=( - ValidationError.from_exception_data( - "ValueError", - [ - InitErrorDetails( - type=PydanticCustomError( - "value_error", - f"number of dimensions of target-crs should be equal or less then that of the source-crs\n * source-crs: {source_crs}, dimensions: {source_crs_dims}\n * target-crs {target_crs}, dimensions: {target_crs_dims}", - ), - loc=("query", "target-crs"), - input=(source_crs, target_crs), - ) - ], - ) - ).errors() + raise_req_validation_error( + f"number of dimensions of target-crs should be equal or less then that of the source-crs\n * source-crs: {source_crs}, dimensions: {source_crs_dims}\n * target-crs {target_crs}, dimensions: {target_crs_dims}", + loc=("query", "target-crs"), + input=(source_crs, target_crs), ) @@ -126,43 +75,23 @@ def validate_coords_source_crs( if crs.crs_auth_identifier == source_crs ) if source_crs_dims != len(coordinates.split(",")): - raise RequestValidationError( - errors=( - ValidationError.from_exception_data( - "ValueError", - [ - InitErrorDetails( - type=PydanticCustomError( - "value_error", - "number of coordinates must match number of dimensions of source-crs", - ), - loc=("query", "coordinates"), - input=source_crs, - ) - ], - ) - ).errors() + raise_req_validation_error( + "number of coordinates must match number of dimensions of source-crs", + loc=("query", "coordinates"), + input=source_crs, ) +def camel_to_snake(s): + return "".join(["_" + c.lower() if c.isupper() else c for c in s]).lstrip("_") + + def validate_input_max_segment_deviation_length(deviation, length): if length is None and deviation is None: - raise RequestValidationError( - errors=( - ValidationError.from_exception_data( - "ValueError", - [ - InitErrorDetails( - type=PydanticCustomError( - "value_error", - "max_segment_length or max_segment_deviation should be set", - ), - loc=("query", "max_segment_length|max_segment_deviation"), - input=[None, None], - ) - ], - ) - ).errors() + raise_req_validation_error( + "max_segment_length or max_segment_deviation should be set", + loc=("query", "max_segment_length|max_segment_deviation"), + input=[None, None], ) @@ -170,63 +99,13 @@ def validate_input_crs(value, name, projections_axis_info: list[MyCrs]): if not any( crs for crs in projections_axis_info if crs.crs_auth_identifier == value ): - raise RequestValidationError( - errors=( - ValidationError.from_exception_data( - "ValueError", - [ - InitErrorDetails( - type=PydanticCustomError( - "value_error", - f"{name} should be one of {', '.join([str(x.crs_auth_identifier) for x in projections_axis_info])}", - ), - loc=("query", name), - input=value, - ) - ], - ) - ).errors() + raise_req_validation_error( + f"{name} should be one of {', '.join([str(x.crs_auth_identifier) for x in projections_axis_info])}", + loc=("query", name), + input=value, ) -class AxisInfo(TypedDict): - axis_labels: list[str] - dimensions: int - - -def get_precision(target_crs_crs: MyCrs): - precision = app_settings.precision - unit = target_crs_crs.get_x_unit_crs() - if unit == "degree": - precision += 5 - return precision - - -def traverse_geojson_coordinates( - geojson_coordinates: list[list] | list[float] | list[int], - callback: Callable[ - [coordinates_type], - tuple[float, ...], - ], -): - """traverse GeoJSON coordinates object and apply callback function to coordinates-nodes - - Args: - obj: GeoJSON coordinates object - callback (): callback function to transform coordinates-nodes - - Returns: - GeoJSON coordinates object - """ - if all(isinstance(x, (float, int)) for x in geojson_coordinates): - return callback(cast(list[float], geojson_coordinates)) - else: - coords = cast(list[list], geojson_coordinates) - return [ - traverse_geojson_coordinates(elem, callback=callback) for elem in coords - ] - - def extract_authority_code(crs: str) -> str: r = re.search("^(http://www.opengis.net/def/crs/)?(.[^/|:]*)(/.*/|:)(.*)", crs) if r is not None: @@ -247,84 +126,11 @@ def validate_crss(source_crs: str, target_crs: str, projections_axis_info): validate_crs_transformation(source_crs, target_crs, projections_axis_info) -def explode(coords): - """Explode a GeoJSON geometry's coordinates object and yield coordinate tuples. - As long as the input is conforming, the type of the geometry doesn't matter. - Source: https://gis.stackexchange.com/a/90554 - """ - for e in coords: - if isinstance( - e, - ( - float, - int, - ), - ): - yield coords - break - else: - yield from explode(e) - - -def get_bbox_from_coordinates(coordinates) -> BBox: - coordinate_tuples = list(zip(*list(explode(coordinates)))) - if len(coordinate_tuples) == TWO_DIMENSIONAL: - x, y = coordinate_tuples - return min(x), min(y), max(x), max(y) - elif len(coordinate_tuples) == THREE_DIMENSIONAL: - x, y, z = coordinate_tuples - return min(x), min(y), min(z), max(x), max(y), max(z) - else: - raise ValueError( - f"expected length of coordinate tuple is either 2 or 3, got {len(coordinate_tuples)}" - ) - - -def raise_response_validation_error(message: str, location): - raise ResponseValidationError( - errors=( - ValidationError.from_exception_data( - "ValueError", - [ - InitErrorDetails( - type=PydanticCustomError( - "value-error", - message, - ), - loc=location, - input="", - ), - ], - ) - ).errors() - ) - - -def raise_validation_error(message: str, location): - raise RequestValidationError( - errors=( - ValidationError.from_exception_data( - "ValueError", - [ - InitErrorDetails( - type=PydanticCustomError( - "missing", - message, - ), - loc=location, - input="", - ), - ], - ) - ).errors() - ) - - def get_source_crs_body( - body: Feature | CrsFeatureCollection | Geometry | GeometryCollection | CityjsonV113, + body: GeojsonObject | CityjsonV113, ) -> str | None: if isinstance(body, CrsFeatureCollection) and body.crs is not None: - source_crs = body.get_crs_auth_code() + source_crs: str | None = body.get_crs_auth_code() if source_crs is None: return None elif isinstance(body, CrsFeatureCollection) and body.crs is None: @@ -347,15 +153,6 @@ def get_source_crs_body( return source_crs -def get_coordinates_from_geometry( - item: _GeometryBase, -) -> list: - geom = cast(_GeometryBase, item) - return list( - chain(explode(geom.coordinates)) - ) # TODO: check if chain(list()) is required... - - def accept_html(request: Request) -> bool: if "accept" in request.headers: accept_header = request.headers["accept"] @@ -364,9 +161,9 @@ def accept_html(request: Request) -> bool: return False -def request_body_within_valid_bbox(body, source_crs): +def request_body_within_valid_bbox(body: GeojsonObject, source_crs: str) -> bool: if source_crs != DENSIFY_CRS: - transform_f = get_transform_callback(DENSIFY_CRS, source_crs) + transform_f = get_transform_crs_fun(DENSIFY_CRS, source_crs) bbox = [ *transform_f(DEVIATION_VALID_BBOX[:2]), *transform_f(DEVIATION_VALID_BBOX[2:]), @@ -379,203 +176,21 @@ def request_body_within_valid_bbox(body, source_crs): return True -def apply_function_on_geometries_of_request_body( # noqa: C901 - body: Feature - | CrsFeatureCollection - | GeojsonGeomNoGeomCollection - | GeometryCollection, - callback: Callable[ - [GeojsonGeomNoGeomCollection, list[Any], list[int] | None], None - ], - indices: list[int] | None = None, -) -> Any: - result: list[Any] = [] - if isinstance(body, Feature): - feature = cast(Feature, body) - if isinstance(feature.geometry, GeometryCollection): - return apply_function_on_geometries_of_request_body( - feature.geometry, callback - ) - - geom = cast(GeojsonGeomNoGeomCollection, feature.geometry) - return callback(geom, result, None) - elif isinstance(body, GeojsonGeomNoGeomCollection): # type: ignore - geom = cast(GeojsonGeomNoGeomCollection, body) - return callback(geom, result, indices) - elif isinstance(body, CrsFeatureCollection): - fc_body: CrsFeatureCollection = body - features: Iterable[Feature] = fc_body.features - for i, ft in enumerate(features): - if ft.geometry is None: - raise ValueError(f"feature does not have a geometry, feature: {ft}") - if isinstance(ft.geometry, GeometryCollection): - ft_result = apply_function_on_geometries_of_request_body( - ft.geometry, callback, [i] - ) - result.extend(ft_result) - else: - callback(ft.geometry, result, [i]) - elif isinstance(body, GeometryCollection): - gc = cast(GeometryCollection, body) - geometries: list[Geometry] = gc.geometries - for i, g in enumerate(geometries): - n_indices = None - if indices is not None: - n_indices = indices[:] - n_indices.append(i) - g_no_gc = cast( - GeojsonGeomNoGeomCollection, g - ) # geojson prohibits nested geometrycollections - maybe throw exception if this occurs - callback(g_no_gc, result, n_indices) - return result - - -def get_shapely_objects( - body: Feature | CrsFeatureCollection | Geometry | GeometryCollection, -) -> list[Any]: - def merge_geometry_collections_shapelyfication(input_shp_geoms: list) -> list: - indices = list( - map( - lambda x: x["index"][0] if hasattr(x, "index") else None, - input_shp_geoms, - ) - ) - counter = Counter(indices) - geom_coll_indices = [x for x in counter if counter[x] > 1] - output_shp_geoms = [ - x["result"] - for x in input_shp_geoms - if (hasattr(x, "index") and x["index"][0] not in geom_coll_indices) - or not hasattr(x, "index") - ] - for i in geom_coll_indices: - geom_collection_geoms = [ - x["result"] for x in input_shp_geoms if x["index"][0] == i - ] - output_shp_geoms.append(ShpGeometryCollection(geom_collection_geoms)) - return output_shp_geoms - - transform_fun = get_shapely_object_fun() - result = apply_function_on_geometries_of_request_body(body, transform_fun) - return merge_geometry_collections_shapelyfication(result) - - -def get_shapely_object_fun() -> Callable: - def shapely_object( - geometry_dict: dict[str, Any], result: list, indices: list[int] | None = None - ) -> list: - shp_obj = shape(geometry_dict) - result_item = {"result": shp_obj} - if indices is not None: - result_item["index"] = indices - result.append(result_item) - return result - - return shapely_object - - -def get_density_check_fun( - densify_config: DenseConfig, -) -> Callable: - def density_check( - geometry: GeojsonGeomNoGeomCollection, - result: list, - indices: list[int] | None = None, - ) -> None: - check_density_geometry_coordinates( - geometry.coordinates, densify_config, result, indices - ) - - return density_check - - -def get_update_geometry_bbox_fun() -> Callable: - def update_bbox( - geometry: GeojsonGeomNoGeomCollection, - _result: list, - _indices: list[int] | None = None, - ) -> None: - coordinates = get_coordinates_from_geometry(geometry) - geometry.bbox = get_bbox_from_coordinates(coordinates) - - return update_bbox - - -def update_bbox_geojson_object( # noqa: C901 - geojson_obj: Feature | CrsFeatureCollection | Geometry | GeometryCollection, -) -> None: - def rec_fun( # noqa: C901 - geojson_obj: Feature | CrsFeatureCollection | Geometry | GeometryCollection, - ) -> list: - if isinstance(geojson_obj, CrsFeatureCollection): - fc_coords: list = [] - for ft in geojson_obj.features: - fc_coords.append(rec_fun(ft)) - if geojson_obj.bbox is not None: - geojson_obj.bbox = get_bbox_from_coordinates(fc_coords) - return fc_coords - elif isinstance(geojson_obj, Feature): - ft_coords: list = [] - if geojson_obj.geometry is None: - return ft_coords - ft_coords = rec_fun(geojson_obj.geometry) - if geojson_obj.bbox is not None: - geojson_obj.bbox = get_bbox_from_coordinates(ft_coords) - return ft_coords - elif isinstance(geojson_obj, GeometryCollection): - gc_coords: list = [] - for geom in geojson_obj.geometries: - gc_coords.append(rec_fun(geom)) - if geojson_obj.bbox is not None: - geojson_obj.bbox = get_bbox_from_coordinates(gc_coords) - return gc_coords - elif isinstance(geojson_obj, _GeometryBase): - geom_coords: list = get_coordinates_from_geometry(geojson_obj) - if geojson_obj.bbox is not None: - geojson_obj.bbox = get_bbox_from_coordinates(geom_coords) - return geom_coords - - _ = rec_fun(geojson_obj) - - def crs_transform( - body: Feature | CrsFeatureCollection | Geometry | GeometryCollection, + body: GeojsonObject, s_crs: str, t_crs: str, epoch: float | None = None, ) -> None: crs_transform_fun = get_crs_transform_fun(s_crs, t_crs, epoch) - _ = apply_function_on_geometries_of_request_body(body, crs_transform_fun) + _ = apply_function_on_geojson_geometries(body, crs_transform_fun) if isinstance(body, CrsFeatureCollection): body.set_crs_auth_code(t_crs) update_bbox_geojson_object(body) -def get_validate_json_coords_fun() -> Callable: - def validate_json_coords( - geometry: GeojsonGeomNoGeomCollection, - _result: list, - _indices: list[int] | None = None, - ) -> None: - def coords_has_inf(coordinates): - gen = ( - x - for x in explode(coordinates) - if abs(x[0]) == float("inf") or abs(x[1]) == float("inf") - ) - return next(gen, None) is not None - - coordinates = get_coordinates_from_geometry(geometry) - if coords_has_inf(coordinates): - raise_response_validation_error( - "Out of range float values are not JSON compliant", ["responseBody"] - ) - - return validate_json_coords - - def density_check_request_body( - body: Feature | CrsFeatureCollection | Geometry | GeometryCollection, + body: GeojsonObject, source_crs: str, max_segment_deviation: float, max_segment_length: float, @@ -595,11 +210,13 @@ def density_check_request_body( # density check c = DenseConfig(CRS.from_authority(*DENSIFY_CRS.split(":")), max_segment_length) my_fun = get_density_check_fun(c) - report = apply_function_on_geometries_of_request_body(body, my_fun) + report = apply_function_on_geojson_geometries(body, my_fun) return report -def bbox_check_deviation_set(body, source_crs, max_segment_deviation): +def bbox_check_deviation_set( + body: GeojsonObject, source_crs, max_segment_deviation +) -> None: if max_segment_deviation is None and not request_body_within_valid_bbox( body, source_crs ): @@ -609,7 +226,7 @@ def bbox_check_deviation_set(body, source_crs, max_segment_deviation): def densify_request_body( - body: Feature | CrsFeatureCollection | Geometry, + body: GeojsonObject, source_crs: str, max_segment_deviation: float, max_segment_length: float, @@ -633,32 +250,9 @@ def densify_request_body( # densify request body c = DenseConfig(CRS.from_authority(*DENSIFY_CRS.split(":")), max_segment_length) densify_geojson_object(body, c) - crs_transform(body, DENSIFY_CRS, source_crs) # transform back -def get_crs_transform_fun(source_crs, target_crs, epoch) -> Callable: - target_crs_crs: MyCrs = MyCrs.from_crs_str(target_crs) - precision = get_precision(target_crs_crs) - - def my_fun( - geom: GeojsonGeomNoGeomCollection, - _result: list, - _indices: list[int] | None = None, - ) -> ( - None - ): # add _result, _indices args since required by transform_geometries_req_body - callback = get_transform_callback( - source_crs, target_crs, precision, epoch=epoch - ) - geom.coordinates = traverse_geojson_coordinates( - cast(list[list[Any]] | list[float] | list[int], geom.coordinates), - callback=callback, - ) - - return my_fun - - def init_oas() -> tuple[dict, str, str]: """initialize open api spec: - extract api version string from oas @@ -693,3 +287,92 @@ def convert_distance_to_deviation(d): def convert_deviation_to_distance(a): d = math.sqrt(a / (24.15 * 10**-9)) return d + + +def raise_response_validation_error(message: str, location): + raise ResponseValidationError( + errors=( + ValidationError.from_exception_data( + "ValueError", + [ + InitErrorDetails( + type=PydanticCustomError( + "value-error", + message, + ), + loc=location, + input="", + ), + ], + ) + ).errors() + ) + + +def raise_validation_error(message: str, location): + raise RequestValidationError( + errors=( + ValidationError.from_exception_data( + "ValueError", + [ + InitErrorDetails( + type=PydanticCustomError( + "missing", + message, + ), + loc=location, + input="", + ), + ], + ) + ).errors() + ) + + +def raise_req_validation_error( + error_message, error_type="ValueError", input=any, loc: tuple[int | str, ...] = () +): + error_type_snake = camel_to_snake(error_type) + raise RequestValidationError( + errors=( + ValidationError.from_exception_data( + error_type, + [ + InitErrorDetails( + type=PydanticCustomError( + error_type_snake, + error_message, + ), + loc=loc, + input=input, + ) + ], + ) + ).errors() + ) + + +def convert_point_coords_to_wkt(coords): + geom_type = "POINT" + if len(coords) == THREE_DIMENSIONAL: + geom_type = "POINT Z" + return f"{geom_type}({' '.join([str(x) for x in coords])})" + + +def transform_coordinates( + coordinates: Any, source_crs: str, target_crs: str, epoch, target_crs_crs +) -> Any: + precision = get_precision(target_crs_crs) + coordinates_list: CoordinatesType = list( + float(x) for x in coordinates.split(",") + ) # convert to list since we do not know dimensionality of coordinates + transform_crs_fun = get_transform_crs_fun( + source_crs, target_crs, precision=precision, epoch=epoch + ) + transformed_coordinates = transform_crs_fun(coordinates_list) + return transformed_coordinates + + +def validate_crs_transformed_geojson(body: GeojsonObject) -> None: + validate_json_coords_fun = get_validate_json_coords_fun() + _ = apply_function_on_geojson_geometries(body, validate_json_coords_fun) diff --git a/tests/test_geojson_bbox.py b/tests/test_geojson_bbox.py index 81dd49b..76a1420 100644 --- a/tests/test_geojson_bbox.py +++ b/tests/test_geojson_bbox.py @@ -1,10 +1,10 @@ import json -from coordinates_transformation_api.models import CrsFeatureCollection from coordinates_transformation_api.util import ( crs_transform, update_bbox_geojson_object, ) +from geodense.geojson import CrsFeatureCollection from geojson_pydantic import Feature from pydantic import ValidationError diff --git a/tests/test_geojson_transformation.py b/tests/test_geojson_transformation.py index 44e50aa..baf7bde 100644 --- a/tests/test_geojson_transformation.py +++ b/tests/test_geojson_transformation.py @@ -1,7 +1,7 @@ import json -from coordinates_transformation_api.models import CrsFeatureCollection from coordinates_transformation_api.util import crs_transform +from geodense.geojson import CrsFeatureCollection from geojson_pydantic import Feature from geojson_pydantic.geometries import Geometry, GeometryCollection, parse_geometry_obj from pydantic import ValidationError