diff --git a/stac_geoparquet/arrow/_schema/models.py b/stac_geoparquet/arrow/_schema/models.py new file mode 100644 index 0000000..579f04c --- /dev/null +++ b/stac_geoparquet/arrow/_schema/models.py @@ -0,0 +1,62 @@ +import json +from pathlib import Path +from typing import Any, Dict, Iterable, List, Union + +import pyarrow as pa + +from stac_geoparquet.arrow._util import stac_items_to_arrow + + +class InferredSchema: + """ + A schema representing the original STAC JSON with absolutely minimal modifications. + + The only modification from the data is converting any geometry fields from GeoJSON + to WKB. + """ + + inner: pa.Schema + """The underlying Arrow schema.""" + + count: int + """The total number of items scanned.""" + + def __init__(self) -> None: + self.inner = pa.schema([]) + self.count = 0 + + def update_from_ndjson( + self, + path: Union[Union[str, Path], Iterable[Union[str, Path]]], + *, + chunk_size: int = 10000, + ): + # Handle multi-path input + if not isinstance(path, (str, Path)): + for p in path: + self.update_from_ndjson(p) + + return + + # Handle single-path input + with open(path) as f: + items = [] + for line in f: + item = json.loads(line) + items.append(item) + + if len(items) >= chunk_size: + self.update_from_items(items) + items = [] + + # Handle remainder + if len(items) > 0: + self.update_from_items(items) + + def update_from_items(self, items: List[Dict[str, Any]]): + self.count += len(items) + current_schema = stac_items_to_arrow(items, schema=None).schema + new_schema = pa.unify_schemas( + [self.inner, current_schema], promote_options="permissive" + ) + self.inner = new_schema diff --git a/stac_geoparquet/arrow/_to_arrow.py b/stac_geoparquet/arrow/_to_arrow.py index b5b1f06..7075452 100644 --- a/stac_geoparquet/arrow/_to_arrow.py +++ b/stac_geoparquet/arrow/_to_arrow.py @@ -1,10 +1,9 @@ """Convert STAC data into Arrow tables""" import json -from copy import deepcopy from datetime import datetime from pathlib import Path -from typing import Any, Dict, List, Optional, Sequence, Union, Generator +from typing import Any, Dict, Generator, Iterable, List, Optional, Sequence, Union import ciso8601 import numpy as np @@ -13,6 +12,9 @@ import shapely import shapely.geometry +from stac_geoparquet.arrow._schema.models import InferredSchema +from stac_geoparquet.arrow._util import stac_items_to_arrow + def _chunks( lst: Sequence[Dict[str, Any]], n: int @@ -26,8 +28,8 @@ def parse_stac_items_to_arrow( items: Sequence[Dict[str, Any]], *, chunk_size: int = 8192, - schema: Optional[pa.Schema] = None, - downcast: bool = True, + schema: Optional[Union[pa.Schema, InferredSchema]] = None, + downcast: bool = False, ) -> pa.Table: """Parse a collection of STAC Items to a :class:`pyarrow.Table`. @@ -49,11 +51,14 @@ def parse_stac_items_to_arrow( """ if schema is not None: + if isinstance(schema, InferredSchema): + schema = schema.inner + # If schema is provided, then for better memory usage we parse input STAC items # to Arrow batches in chunks. batches = [] for chunk in _chunks(items, chunk_size): - batches.append(_stac_items_to_arrow(chunk, schema=schema)) + batches.append(stac_items_to_arrow(chunk, schema=schema)) table = pa.Table.from_batches(batches, schema=schema) else: @@ -61,48 +66,62 @@ def parse_stac_items_to_arrow( # else it would be possible for a STAC item late in the collection (after the # first chunk) to have a different schema and not match the schema inferred for # the first chunk. - table = pa.Table.from_batches([_stac_items_to_arrow(items)]) + table = pa.Table.from_batches([stac_items_to_arrow(items)]) return _process_arrow_table(table, downcast=downcast) def parse_stac_ndjson_to_arrow( - path: Union[str, Path], + path: Union[Union[str, Path], Iterable[Union[str, Path]]], *, chunk_size: int = 8192, - schema: Optional[pa.Schema] = None, - downcast: bool = True, -) -> pa.Table: + schema: Optional[Union[pa.Schema, InferredSchema]] = None, +): # Define outside of if/else to make mypy happy items: List[dict] = [] # If the schema was not provided, then we need to load all data into memory at once # to perform schema resolution. if schema is None: - with open(path) as f: - for line in f: - items.append(json.loads(line)) + inferred_schema = InferredSchema() + inferred_schema.update_from_ndjson(path, chunk_size=chunk_size) + return parse_stac_ndjson_to_arrow( + path, chunk_size=chunk_size, schema=inferred_schema + ) + + # Check if path is an iterable + # If so, recursively call this function on each item in the iterable + if not isinstance(path, (str, Path)): + for p in path: + yield from parse_stac_ndjson_to_arrow( + p, chunk_size=chunk_size, schema=schema + ) - return parse_stac_items_to_arrow(items, chunk_size=chunk_size, schema=schema) + return + + if isinstance(schema, InferredSchema): + schema = schema.inner # Otherwise, we can stream over the input, converting each batch of `chunk_size` # into an Arrow RecordBatch at a time. This is much more memory efficient. with open(path) as f: - batches: List[pa.RecordBatch] = [] for line in f: items.append(json.loads(line)) if len(items) >= chunk_size: - batches.append(_stac_items_to_arrow(items, schema=schema)) + batch = stac_items_to_arrow(items, schema=schema) + yield from _process_arrow_table( + pa.Table.from_batches([batch]), downcast=False + ).to_batches() items = [] # Don't forget the last chunk in case the total number of items is not a multiple of # chunk_size. if len(items) > 0: - batches.append(_stac_items_to_arrow(items, schema=schema)) - - table = pa.Table.from_batches(batches, schema=schema) - return _process_arrow_table(table, downcast=downcast) + batch = stac_items_to_arrow(items, schema=schema) + yield from _process_arrow_table( + pa.Table.from_batches([batch]), downcast=False + ).to_batches() def _process_arrow_table(table: pa.Table, *, downcast: bool = True) -> pa.Table: @@ -112,47 +131,6 @@ def _process_arrow_table(table: pa.Table, *, downcast: bool = True) -> pa.Table: return table -def _stac_items_to_arrow( - items: Sequence[Dict[str, Any]], *, schema: Optional[pa.Schema] = None -) -> pa.RecordBatch: - """Convert dicts representing STAC Items to Arrow - - This converts GeoJSON geometries to WKB before Arrow conversion to allow multiple - geometry types. - - All items will be parsed into a single RecordBatch, meaning that each internal array - is fully contiguous in memory for the length of `items`. - - Args: - items: STAC Items to convert to Arrow - - Kwargs: - schema: An optional schema that describes the format of the data. Note that this - must represent the geometry column as binary type. - - Returns: - Arrow RecordBatch with items in Arrow - """ - # Preprocess GeoJSON to WKB in each STAC item - # Otherwise, pyarrow will try to parse coordinates into a native geometry type and - # if you have multiple geometry types pyarrow will error with - # `ArrowInvalid: cannot mix list and non-list, non-null values` - wkb_items = [] - for item in items: - wkb_item = deepcopy(item) - # Note: this mutates the existing items. Should we - wkb_item["geometry"] = shapely.to_wkb( - shapely.geometry.shape(wkb_item["geometry"]), flavor="iso" - ) - wkb_items.append(wkb_item) - - if schema is not None: - array = pa.array(wkb_items, type=pa.struct(schema)) - else: - array = pa.array(wkb_items) - return pa.RecordBatch.from_struct_array(array) - - def _bring_properties_to_top_level(table: pa.Table) -> pa.Table: """Bring all the fields inside of the nested "properties" struct to the top level""" properties_field = table.schema.field("properties") diff --git a/stac_geoparquet/arrow/_to_parquet.py b/stac_geoparquet/arrow/_to_parquet.py index 3641001..16a6d99 100644 --- a/stac_geoparquet/arrow/_to_parquet.py +++ b/stac_geoparquet/arrow/_to_parquet.py @@ -1,13 +1,48 @@ import json -from typing import Any +from pathlib import Path +from typing import Any, Iterable, Optional, Union import pyarrow as pa import pyarrow.parquet as pq from pyproj import CRS +from stac_geoparquet.arrow._schema.models import InferredSchema +from stac_geoparquet.arrow._to_arrow import parse_stac_ndjson_to_arrow + WGS84_CRS_JSON = CRS.from_epsg(4326).to_json_dict() +def parse_stac_ndjson_to_parquet( + input_path: Union[Union[str, Path], Iterable[Union[str, Path]]], + output_path: Union[str, Path], + *, + chunk_size: int = 65536, + schema: Optional[Union[pa.Schema, InferredSchema]] = None, + **kwargs, +): + """Convert one or more newline-delimited JSON STAC files to GeoParquet + + Args: + input_path: One or more paths to files with STAC items. + output_path: A path to the output Parquet file. + chunk_size: The chunk size. Defaults to 65536. + schema: The schema to represent the input STAC data. Defaults to None, in which + case the schema will be inferred via a full pass over the input data. In + this case, there will be two full passes over the input data: one to infer a + common schema across all data and another to read the data and iteratively + convert to GeoParquet. + """ + batches_iter = parse_stac_ndjson_to_arrow( + input_path, chunk_size=chunk_size, schema=schema + ) + first_batch = next(batches_iter) + schema = first_batch.schema.with_metadata(_create_geoparquet_metadata()) + with pq.ParquetWriter(output_path, schema, **kwargs) as writer: + writer.write_batch(first_batch) + for batch in batches_iter: + writer.write_batch(batch) + + def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None: """Write an Arrow table with STAC data to GeoParquet @@ -17,6 +52,14 @@ def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None: table: The table to write to Parquet where: The destination for saving. """ + metadata = table.schema.metadata or {} + metadata.update(_create_geoparquet_metadata()) + table = table.replace_schema_metadata(metadata) + + pq.write_table(table, where, **kwargs) + + +def _create_geoparquet_metadata(): # TODO: include bbox of geometries column_meta = { "encoding": "WKB", @@ -38,9 +81,4 @@ def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None: "columns": {"geometry": column_meta}, "primary_column": "geometry", } - - metadata = table.schema.metadata or {} - metadata.update({b"geo": json.dumps(geo_meta).encode("utf-8")}) - table = table.replace_schema_metadata(metadata) - - pq.write_table(table, where, **kwargs) + return {b"geo": json.dumps(geo_meta).encode("utf-8")} diff --git a/stac_geoparquet/arrow/_util.py b/stac_geoparquet/arrow/_util.py new file mode 100644 index 0000000..25d948d --- /dev/null +++ b/stac_geoparquet/arrow/_util.py @@ -0,0 +1,62 @@ +from copy import deepcopy +from typing import Any, Dict, Optional, Sequence + +import pyarrow as pa +import shapely +import shapely.geometry + + +def stac_items_to_arrow( + items: Sequence[Dict[str, Any]], *, schema: Optional[pa.Schema] = None +) -> pa.RecordBatch: + """Convert dicts representing STAC Items to Arrow + + This converts GeoJSON geometries to WKB before Arrow conversion to allow multiple + geometry types. + + All items will be parsed into a single RecordBatch, meaning that each internal array + is fully contiguous in memory for the length of `items`. + + Args: + items: STAC Items to convert to Arrow + + Kwargs: + schema: An optional schema that describes the format of the data. Note that this + must represent the geometry column as binary type. + + Returns: + Arrow RecordBatch with items in Arrow + """ + # Preprocess GeoJSON to WKB in each STAC item + # Otherwise, pyarrow will try to parse coordinates into a native geometry type and + # if you have multiple geometry types pyarrow will error with + # `ArrowInvalid: cannot mix list and non-list, non-null values` + wkb_items = [] + for item in items: + wkb_item = deepcopy(item) + wkb_item["geometry"] = shapely.to_wkb( + shapely.geometry.shape(wkb_item["geometry"]), flavor="iso" + ) + + # If a proj:geometry key exists in top-level properties, convert that to WKB + if "proj:geometry" in wkb_item["properties"]: + wkb_item["proj:geometry"] = shapely.to_wkb( + shapely.geometry.shape(wkb_item["proj:geometry"]), flavor="iso" + ) + + # If a proj:geometry key exists in any asset properties, convert that to WKB + for asset_value in wkb_item["assets"].values(): + if "proj:geometry" in asset_value: + asset_value["proj:geometry"] = shapely.to_wkb( + shapely.geometry.shape(asset_value["proj:geometry"]), + flavor="iso", + ) + + wkb_items.append(wkb_item) + + if schema is not None: + array = pa.array(wkb_items, type=pa.struct(schema)) + else: + array = pa.array(wkb_items) + + return pa.RecordBatch.from_struct_array(array)