diff --git a/README.md b/README.md index 06c031c..2761d58 100644 --- a/README.md +++ b/README.md @@ -12,35 +12,6 @@ For analytic questions like "find the items in the Sentinel-2 collection in June See the [STAC-GeoParquet specification](./spec/stac-geoparquet-spec.md) for details on the exact schema of the written Parquet files. -## Usage +## Documentation -Use `stac_geoparquet.to_arrow.stac_items_to_arrow` and -`stac_geoparquet.from_arrow.stac_table_to_items` to convert between STAC items -and Arrow tables. Arrow Tables of STAC items can be written to parquet with -`stac_geoparquet.to_parquet.to_parquet`. - -Note that `stac_geoparquet` lifts the keys in the item `properties` up to the top level of the DataFrame, similar to `geopandas.GeoDataFrame.from_features`. - -```python ->>> import requests ->>> import stac_geoparquet.arrow ->>> import pyarrow.parquet ->>> import pyarrow as pa - ->>> items = requests.get( -... "https://planetarycomputer.microsoft.com/api/stac/v1/collections/sentinel-2-l2a/items" -... ).json()["features"] ->>> table = pa.Table.from_batches(stac_geoparquet.arrow.parse_stac_items_to_arrow(items)) ->>> stac_geoparquet.arrow.to_parquet(table, "items.parquet") ->>> table2 = pyarrow.parquet.read_table("items.parquet") ->>> items2 = list(stac_geoparquet.arrow.stac_table_to_items(table2)) -``` - - -## pgstac integration - -`stac_geoparquet.pgstac_reader` has some helpers for working with items coming from a `pgstac.items` table. It takes care of - -- Rehydrating the dehydrated items -- Partitioning by time -- Injecting dynamic links and assets from a STAC API +[Documentation website](https://stac-utils.github.io/stac-geoparquet/) diff --git a/docs/api/legacy.md b/docs/api/legacy.md index 2084cf6..69c877c 100644 --- a/docs/api/legacy.md +++ b/docs/api/legacy.md @@ -2,6 +2,23 @@ The API listed here was the initial non-Arrow-based STAC-GeoParquet implementation, converting between JSON and GeoPandas directly. For large collections of STAC items, using the new Arrow-based functionality (under the `stac_geoparquet.arrow` namespace) will be more performant. +Note that `stac_geoparquet` lifts the keys in the item `properties` up to the top level of the DataFrame, similar to `geopandas.GeoDataFrame.from_features`. + +```python +>>> import requests +>>> import stac_geoparquet.arrow +>>> import pyarrow.parquet +>>> import pyarrow as pa + +>>> items = requests.get( +... "https://planetarycomputer.microsoft.com/api/stac/v1/collections/sentinel-2-l2a/items" +... ).json()["features"] +>>> table = pa.Table.from_batches(stac_geoparquet.arrow.parse_stac_items_to_arrow(items)) +>>> stac_geoparquet.arrow.to_parquet(table, "items.parquet") +>>> table2 = pyarrow.parquet.read_table("items.parquet") +>>> items2 = list(stac_geoparquet.arrow.stac_table_to_items(table2)) +``` + ::: stac_geoparquet.to_geodataframe ::: stac_geoparquet.to_item_collection ::: stac_geoparquet.to_dict diff --git a/docs/api/pgstac.md b/docs/api/pgstac.md new file mode 100644 index 0000000..640d9f6 --- /dev/null +++ b/docs/api/pgstac.md @@ -0,0 +1,11 @@ +# pgstac integration + +`stac_geoparquet.pgstac_reader` has some helpers for working with items coming from a `pgstac.items` table. It takes care of + +- Rehydrating the dehydrated items +- Partitioning by time +- Injecting dynamic links and assets from a STAC API + +::: stac_geoparquet.pgstac_reader.CollectionConfig + options: + show_if_no_docstring: true diff --git a/docs/drawbacks.md b/docs/drawbacks.md new file mode 100644 index 0000000..46c77f1 --- /dev/null +++ b/docs/drawbacks.md @@ -0,0 +1,58 @@ +# Drawbacks + +Trying to represent STAC data in GeoParquet has some drawbacks. + +## Unable to represent undefined values + +Parquet is unable to represent the difference between _undefined_ and _null_, and so is unable to perfectly round-trip STAC data with _undefined_ values. + +In JSON a value can have one of three states: defined, undefined, or null. The `"b"` key in the next three examples illustrates this: + +Defined: + +```json +{ + "a": 1, + "b": "foo" +} +``` + +Undefined: + +```json +{ + "a": 2 +} +``` + +Null: + +```json +{ + "a": 3, + "b": null +} +``` + +Because Parquet is a columnar format, it is only able to represent undefined at the _column_ level. So if those three JSON items above were converted to Parquet, the column `"b"` would exist because it exists in the first and third item, and the second item would have `"b"` inferred as `null`: + +| a | b | +| --- | ----- | +| 1 | "foo" | +| 2 | null | +| 3 | null | + +Then when the second item is converted back to JSON, it will be returned as + +```json +{ + "a": 2 + "b": null +} +``` + +which is not strictly equal to the input. + +## Schema difficulties + +JSON is schemaless while Parquet requires a strict schema, and it can be very difficult to unite these two systems. This is such an important consideration that we have a [documentation page](./schema.md) just to discuss this point. diff --git a/docs/schema.md b/docs/schema.md new file mode 100644 index 0000000..adbf794 --- /dev/null +++ b/docs/schema.md @@ -0,0 +1,42 @@ +# Schema considerations + +A STAC Item is a JSON object to describe an external geospatial dataset. The STAC specification defines a common core, plus a variety of extensions. Additionally, STAC Items may include custom extensions outside the common ones. Crucially, the majority of the specified fields in the core spec and extensions define optional keys. Those keys often differ across STAC collections and may even differ within a single collection across items. + +STAC's flexibility is a blessing and a curse. The flexibility of schemaless JSON allows for very easy writing as each object can be dumped separately to JSON. Every item is allowed to have a different schema. And newer items are free to have a different schema than older items in the same collection. But this write-time flexibility makes it harder to read as there are no guarantees (outside STAC's few required fields) about what fields exist. + +Parquet is the complete opposite of JSON. Parquet has a strict schema that must be known before writing can start. This puts the burden of work onto the writer instead of the reader. Reading Parquet is very efficient because the file's metadata defines the exact schema of every record. This also enables use cases like reading specific columns that would not be possible without a strict schema. + +This conversion from schemaless to strict-schema is the difficult part of converting STAC from JSON to GeoParquet, especially for large input datasets like STAC that are often larger than memory. + +## Full scan over input data + +The most foolproof way to convert STAC JSON to GeoParquet is to perform a full scan over input data. This is done automatically by [`parse_stac_ndjson_to_arrow`][stac_geoparquet.arrow.parse_stac_ndjson_to_arrow] when a schema is not provided. + +This is time consuming as it requires two full passes over the input data: once to infer a common schema and again to actually write to Parquet (though items are never fully held in memory, allowing this process to scale). + +## User-provided schema + +Alternatively, the user can pass in an Arrow schema themselves using the `schema` parameter of [`parse_stac_ndjson_to_arrow`][stac_geoparquet.arrow.parse_stac_ndjson_to_arrow]. This `schema` must match the on-disk schema of the the STAC JSON data. + +## Multiple schemas per collection + +It is also possible to write multiple Parquet files with STAC data where each Parquet file may have a different schema. This simplifies the conversion and writing process but makes reading and using the Parquet data harder. + +### Merging data with schema mismatch + +If you've created STAC GeoParquet data where the schema has updated, you can use [`pyarrow.concat_tables`][pyarrow.concat_tables] with `promote_options="permissive"` to combine multiple STAC GeoParquet files. + +```py +import pyarrow as pa +import pyarrow.parquet as pq + +table_1 = pq.read_table("stac1.parquet") +table_2 = pq.read_table("stac2.parquet") +combined_table = pa.concat_tables([table1, table2], promote_options="permissive") +``` + +## Future work + +Schema operations is an area where future work can improve reliability and ease of use of STAC GeoParquet. + +It's possible that in the future we could automatically infer an Arrow schema from the STAC specification's published JSON Schema files. If you're interested in this, open an issue and discuss. diff --git a/docs/usage.md b/docs/usage.md index 8f04b05..3ec677c 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -1 +1,67 @@ # Usage + +[Apache Arrow](https://arrow.apache.org/) is used as the in-memory interchange format between all formats. While some end-to-end helper functions are provided, the user can go through Arrow objects for maximal flexibility in the conversion process. + +All functionality that goes through Arrow is currently exported via the `stac_geoparquet.arrow` namespace. + +## `dict`/JSON - Arrow conversion + +### Convert `dict`s to Arrow + +Use [`parse_stac_items_to_arrow`][stac_geoparquet.arrow.parse_stac_items_to_arrow] to convert STAC items either in memory or on disk to a stream of Arrow record batches. This accepts either an iterable of Python `dict`s or an iterable of [`pystac.Item`][pystac.Item] objects. + +For example: + +```py +import pyarrow as pa +import pystac + +import stac_geoparquet + +item = pystac.read_file( + "https://planetarycomputer.microsoft.com/api/stac/v1/collections/sentinel-2-l2a/items/S2A_MSIL2A_20230112T104411_R008_T29NPE_20230113T053333" +) +assert isinstance(item, pystac.Item) + +record_batch_reader = stac_geoparquet.arrow.parse_stac_items_to_arrow([item]) +table = record_batch_reader.read_all() +``` + +### Convert JSON to Arrow + +[`parse_stac_ndjson_to_arrow`][stac_geoparquet.arrow.parse_stac_ndjson_to_arrow] is a helper function to take one or more JSON or newline-delimited JSON files on disk, infer the schema from all of them, and convert the data to a stream of Arrow record batches. + +### Convert Arrow to `dict`s + +Use [`stac_table_to_items`][stac_geoparquet.arrow.stac_table_to_items] to convert a table or stream of Arrow record batches of STAC data to a generator of Python `dict`s. This accepts either a `pyarrow.Table` or a `pyarrow.RecordBatchReader`, which allows conversions of larger-than-memory files in a streaming manner. + +### Convert Arrow to JSON + +Use [`stac_table_to_ndjson`][stac_geoparquet.arrow.stac_table_to_ndjson] to convert a table or stream of Arrow record batches of STAC data to a newline-delimited JSON file. This accepts either a `pyarrow.Table` or a `pyarrow.RecordBatchReader`, which allows conversions of larger-than-memory files in a streaming manner. + +## Parquet + +Use [`to_parquet`][stac_geoparquet.arrow.to_parquet] to write STAC Arrow data from memory to a path or file-like object. This is a special function to ensure that [GeoParquet](https://geoparquet.org/) 1.0 or 1.1 metadata is written to the Parquet file. + +[`parse_stac_ndjson_to_parquet`][stac_geoparquet.arrow.parse_stac_ndjson_to_parquet] is a helper that connects reading (newline-delimited) JSON on disk to writing out to a Parquet file. + +No special API is required for reading a STAC GeoParquet file back into Arrow. You can use [`pyarrow.parquet.read_table`][pyarrow.parquet.read_table] or [`pyarrow.parquet.ParquetFile`][pyarrow.parquet.ParquetFile] directly to read the STAC GeoParquet data back into Arrow. + +## Delta Lake + + +Use [`parse_stac_ndjson_to_delta_lake`][stac_geoparquet.arrow.parse_stac_ndjson_to_delta_lake] to read (newline-delimited) JSON on disk and write out to a Delta Lake table. + +No special API is required for reading a STAC Delta Lake table back into Arrow. You can use the [`DeltaTable`][deltalake.DeltaTable] class directly to read the data back into Arrow. + +!!! important + Arrow has a null data type, where every value in the column is always null, but Delta Lake does not. This means that for any column inferred to have a `null` data type, writing to Delta Lake will error with + ``` + _internal.SchemaMismatchError: Invalid data type for Delta Lake: Null + ``` + + This is a problem because if all items in a STAC Collection have a `null` JSON key, it gets inferred as an Arrow `null` type. For example, in the `3dep-lidar-copc` collection in the tests, it has `start_datetime` and `end_datetime` fields, and so according to the spec, `datetime` is always `null`. This column would need to be casted to a timestamp type before being written to Delta Lake. + + This means we cannot write this collection to Delta Lake **solely with automatic schema inference**. + + In such cases, users may need to manually update the inferred schema to cast any `null` type to another Delta Lake-compatible type. diff --git a/mkdocs.yml b/mkdocs.yml index c8bd18a..0d253da 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -17,11 +17,13 @@ extra: nav: - index.md - usage.md + - schema.md - Specification: spec/stac-geoparquet-spec.md - API Reference: - api/arrow.md - Legacy: api/legacy.md - # - api/pgstac.md + - api/pgstac.md + - drawbacks.md watch: - stac_geoparquet diff --git a/stac_geoparquet/arrow/_api.py b/stac_geoparquet/arrow/_api.py index 6e093f6..7a3d07c 100644 --- a/stac_geoparquet/arrow/_api.py +++ b/stac_geoparquet/arrow/_api.py @@ -6,6 +6,7 @@ from typing import Any, Iterable import pyarrow as pa +import pystac from stac_geoparquet.arrow._batch import StacArrowBatch, StacJsonBatch from stac_geoparquet.arrow._constants import DEFAULT_JSON_CHUNK_SIZE @@ -16,7 +17,7 @@ def parse_stac_items_to_arrow( - items: Iterable[dict[str, Any]], + items: Iterable[pystac.Item | dict[str, Any]], *, chunk_size: int = 8192, schema: pa.Schema | InferredSchema | None = None, @@ -163,7 +164,7 @@ def stac_table_to_ndjson( def stac_items_to_arrow( - items: Iterable[dict[str, Any]], *, schema: pa.Schema | None = None + items: Iterable[pystac.Item | dict[str, Any]], *, schema: pa.Schema | None = None ) -> pa.RecordBatch: """Convert dicts representing STAC Items to Arrow diff --git a/stac_geoparquet/arrow/_batch.py b/stac_geoparquet/arrow/_batch.py index e4fb59f..42d5b67 100644 --- a/stac_geoparquet/arrow/_batch.py +++ b/stac_geoparquet/arrow/_batch.py @@ -10,6 +10,7 @@ import orjson import pyarrow as pa import pyarrow.compute as pc +import pystac import shapely import shapely.geometry from numpy.typing import NDArray @@ -59,7 +60,10 @@ def __init__(self, batch: pa.RecordBatch) -> None: @classmethod def from_dicts( - cls, items: Iterable[dict[str, Any]], *, schema: pa.Schema | None = None + cls, + items: Iterable[pystac.Item | dict[str, Any]], + *, + schema: pa.Schema | None = None, ) -> Self: """Construct a StacJsonBatch from an iterable of dicts representing STAC items. @@ -83,6 +87,9 @@ def from_dicts( # `ArrowInvalid: cannot mix list and non-list, non-null values` wkb_items = [] for item in items: + if isinstance(item, pystac.Item): + item = item.to_dict(transform_hrefs=False) + wkb_item = deepcopy(item) wkb_item["geometry"] = shapely.to_wkb( shapely.geometry.shape(wkb_item["geometry"]), flavor="iso" diff --git a/stac_geoparquet/arrow/_util.py b/stac_geoparquet/arrow/_util.py index 34e353d..57adef8 100644 --- a/stac_geoparquet/arrow/_util.py +++ b/stac_geoparquet/arrow/_util.py @@ -8,11 +8,14 @@ List, Optional, Sequence, + TypeVar, Union, ) import pyarrow as pa +T = TypeVar("T") + def update_batch_schema( batch: pa.RecordBatch, @@ -23,8 +26,8 @@ def update_batch_schema( def batched_iter( - lst: Iterable[Dict[str, Any]], n: int, *, limit: Optional[int] = None -) -> Iterable[Sequence[Dict[str, Any]]]: + lst: Iterable[T], n: int, *, limit: Optional[int] = None +) -> Iterable[Sequence[T]]: """Yield successive n-sized chunks from iterable.""" if n < 1: raise ValueError("n must be at least one") diff --git a/stac_geoparquet/pgstac_reader.py b/stac_geoparquet/pgstac_reader.py index c178f37..665ac16 100644 --- a/stac_geoparquet/pgstac_reader.py +++ b/stac_geoparquet/pgstac_reader.py @@ -284,13 +284,12 @@ def make_pgstac_items( """ Make STAC items out of pgstac records. - Parameters - ---------- - records: list[tuple] - The dehydrated records from pgstac.items table. - base_item: dict[str, Any] - The base item from the ``collection_base_item`` pgstac function for this - collection. Used for rehydration + Args: + records: list[tuple] + The dehydrated records from pgstac.items table. + base_item: dict[str, Any] + The base item from the ``collection_base_item`` pgstac function for this + collection. Used for rehydration """ columns = [ "id",