Skip to content

Commit

Permalink
exhaustive schema inference
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed May 9, 2024
1 parent 2e7cef7 commit 5406f96
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 68 deletions.
62 changes: 62 additions & 0 deletions stac_geoparquet/arrow/_schema/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import json
from pathlib import Path
from typing import Any, Dict, Iterable, List, Union

import pyarrow as pa

from stac_geoparquet.arrow._util import stac_items_to_arrow


class InferredSchema:
"""
A schema representing the original STAC JSON with absolutely minimal modifications.
The only modification from the data is converting any geometry fields from GeoJSON
to WKB.
"""

inner: pa.Schema
"""The underlying Arrow schema."""

count: int
"""The total number of items scanned."""

def __init__(self) -> None:
self.inner = pa.schema([])
self.count = 0

def update_from_ndjson(
self,
path: Union[Union[str, Path], Iterable[Union[str, Path]]],
*,
chunk_size: int = 10000,
):
# Handle multi-path input
if not isinstance(path, (str, Path)):
for p in path:
self.update_from_ndjson(p)

return

# Handle single-path input
with open(path) as f:
items = []
for line in f:
item = json.loads(line)
items.append(item)

if len(items) >= chunk_size:
self.update_from_items(items)
items = []

# Handle remainder
if len(items) > 0:
self.update_from_items(items)

def update_from_items(self, items: List[Dict[str, Any]]):
self.count += len(items)
current_schema = stac_items_to_arrow(items, schema=None).schema
new_schema = pa.unify_schemas(
[self.inner, current_schema], promote_options="permissive"
)
self.inner = new_schema
100 changes: 39 additions & 61 deletions stac_geoparquet/arrow/_to_arrow.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
"""Convert STAC data into Arrow tables"""

import json
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Union, Generator
from typing import Any, Dict, Generator, Iterable, List, Optional, Sequence, Union

import ciso8601
import numpy as np
Expand All @@ -13,6 +12,9 @@
import shapely
import shapely.geometry

from stac_geoparquet.arrow._schema.models import InferredSchema
from stac_geoparquet.arrow._util import stac_items_to_arrow


def _chunks(
lst: Sequence[Dict[str, Any]], n: int
Expand All @@ -26,8 +28,8 @@ def parse_stac_items_to_arrow(
items: Sequence[Dict[str, Any]],
*,
chunk_size: int = 8192,
schema: Optional[pa.Schema] = None,
downcast: bool = True,
schema: Optional[Union[pa.Schema, InferredSchema]] = None,
downcast: bool = False,
) -> pa.Table:
"""Parse a collection of STAC Items to a :class:`pyarrow.Table`.
Expand All @@ -49,60 +51,77 @@ def parse_stac_items_to_arrow(
"""

if schema is not None:
if isinstance(schema, InferredSchema):
schema = schema.inner

# If schema is provided, then for better memory usage we parse input STAC items
# to Arrow batches in chunks.
batches = []
for chunk in _chunks(items, chunk_size):
batches.append(_stac_items_to_arrow(chunk, schema=schema))
batches.append(stac_items_to_arrow(chunk, schema=schema))

table = pa.Table.from_batches(batches, schema=schema)
else:
# If schema is _not_ provided, then we must convert to Arrow all at once, or
# else it would be possible for a STAC item late in the collection (after the
# first chunk) to have a different schema and not match the schema inferred for
# the first chunk.
table = pa.Table.from_batches([_stac_items_to_arrow(items)])
table = pa.Table.from_batches([stac_items_to_arrow(items)])

return _process_arrow_table(table, downcast=downcast)


def parse_stac_ndjson_to_arrow(
path: Union[str, Path],
path: Union[Union[str, Path], Iterable[Union[str, Path]]],
*,
chunk_size: int = 8192,
schema: Optional[pa.Schema] = None,
downcast: bool = True,
) -> pa.Table:
schema: Optional[Union[pa.Schema, InferredSchema]] = None,
):
# Define outside of if/else to make mypy happy
items: List[dict] = []

# If the schema was not provided, then we need to load all data into memory at once
# to perform schema resolution.
if schema is None:
with open(path) as f:
for line in f:
items.append(json.loads(line))
inferred_schema = InferredSchema()
inferred_schema.update_from_ndjson(path, chunk_size=chunk_size)
return parse_stac_ndjson_to_arrow(
path, chunk_size=chunk_size, schema=inferred_schema
)

# Check if path is an iterable
# If so, recursively call this function on each item in the iterable
if not isinstance(path, (str, Path)):
for p in path:
yield from parse_stac_ndjson_to_arrow(
p, chunk_size=chunk_size, schema=schema
)

return parse_stac_items_to_arrow(items, chunk_size=chunk_size, schema=schema)
return

if isinstance(schema, InferredSchema):
schema = schema.inner

# Otherwise, we can stream over the input, converting each batch of `chunk_size`
# into an Arrow RecordBatch at a time. This is much more memory efficient.
with open(path) as f:
batches: List[pa.RecordBatch] = []
for line in f:
items.append(json.loads(line))

if len(items) >= chunk_size:
batches.append(_stac_items_to_arrow(items, schema=schema))
batch = stac_items_to_arrow(items, schema=schema)
yield from _process_arrow_table(
pa.Table.from_batches([batch]), downcast=False
).to_batches()
items = []

# Don't forget the last chunk in case the total number of items is not a multiple of
# chunk_size.
if len(items) > 0:
batches.append(_stac_items_to_arrow(items, schema=schema))

table = pa.Table.from_batches(batches, schema=schema)
return _process_arrow_table(table, downcast=downcast)
batch = stac_items_to_arrow(items, schema=schema)
yield from _process_arrow_table(
pa.Table.from_batches([batch]), downcast=False
).to_batches()


def _process_arrow_table(table: pa.Table, *, downcast: bool = True) -> pa.Table:
Expand All @@ -112,47 +131,6 @@ def _process_arrow_table(table: pa.Table, *, downcast: bool = True) -> pa.Table:
return table


def _stac_items_to_arrow(
items: Sequence[Dict[str, Any]], *, schema: Optional[pa.Schema] = None
) -> pa.RecordBatch:
"""Convert dicts representing STAC Items to Arrow
This converts GeoJSON geometries to WKB before Arrow conversion to allow multiple
geometry types.
All items will be parsed into a single RecordBatch, meaning that each internal array
is fully contiguous in memory for the length of `items`.
Args:
items: STAC Items to convert to Arrow
Kwargs:
schema: An optional schema that describes the format of the data. Note that this
must represent the geometry column as binary type.
Returns:
Arrow RecordBatch with items in Arrow
"""
# Preprocess GeoJSON to WKB in each STAC item
# Otherwise, pyarrow will try to parse coordinates into a native geometry type and
# if you have multiple geometry types pyarrow will error with
# `ArrowInvalid: cannot mix list and non-list, non-null values`
wkb_items = []
for item in items:
wkb_item = deepcopy(item)
# Note: this mutates the existing items. Should we
wkb_item["geometry"] = shapely.to_wkb(
shapely.geometry.shape(wkb_item["geometry"]), flavor="iso"
)
wkb_items.append(wkb_item)

if schema is not None:
array = pa.array(wkb_items, type=pa.struct(schema))
else:
array = pa.array(wkb_items)
return pa.RecordBatch.from_struct_array(array)


def _bring_properties_to_top_level(table: pa.Table) -> pa.Table:
"""Bring all the fields inside of the nested "properties" struct to the top level"""
properties_field = table.schema.field("properties")
Expand Down
52 changes: 45 additions & 7 deletions stac_geoparquet/arrow/_to_parquet.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,48 @@
import json
from typing import Any
from pathlib import Path
from typing import Any, Iterable, Optional, Union

import pyarrow as pa
import pyarrow.parquet as pq
from pyproj import CRS

from stac_geoparquet.arrow._schema.models import InferredSchema
from stac_geoparquet.arrow._to_arrow import parse_stac_ndjson_to_arrow

WGS84_CRS_JSON = CRS.from_epsg(4326).to_json_dict()


def parse_stac_ndjson_to_parquet(
input_path: Union[Union[str, Path], Iterable[Union[str, Path]]],
output_path: Union[str, Path],
*,
chunk_size: int = 65536,
schema: Optional[Union[pa.Schema, InferredSchema]] = None,
**kwargs,
):
"""Convert one or more newline-delimited JSON STAC files to GeoParquet
Args:
input_path: One or more paths to files with STAC items.
output_path: A path to the output Parquet file.
chunk_size: The chunk size. Defaults to 65536.
schema: The schema to represent the input STAC data. Defaults to None, in which
case the schema will be inferred via a full pass over the input data. In
this case, there will be two full passes over the input data: one to infer a
common schema across all data and another to read the data and iteratively
convert to GeoParquet.
"""
batches_iter = parse_stac_ndjson_to_arrow(
input_path, chunk_size=chunk_size, schema=schema
)
first_batch = next(batches_iter)
schema = first_batch.schema.with_metadata(_create_geoparquet_metadata())
with pq.ParquetWriter(output_path, schema, **kwargs) as writer:
writer.write_batch(first_batch)
for batch in batches_iter:
writer.write_batch(batch)


def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None:
"""Write an Arrow table with STAC data to GeoParquet
Expand All @@ -17,6 +52,14 @@ def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None:
table: The table to write to Parquet
where: The destination for saving.
"""
metadata = table.schema.metadata or {}
metadata.update(_create_geoparquet_metadata())
table = table.replace_schema_metadata(metadata)

pq.write_table(table, where, **kwargs)


def _create_geoparquet_metadata():
# TODO: include bbox of geometries
column_meta = {
"encoding": "WKB",
Expand All @@ -38,9 +81,4 @@ def to_parquet(table: pa.Table, where: Any, **kwargs: Any) -> None:
"columns": {"geometry": column_meta},
"primary_column": "geometry",
}

metadata = table.schema.metadata or {}
metadata.update({b"geo": json.dumps(geo_meta).encode("utf-8")})
table = table.replace_schema_metadata(metadata)

pq.write_table(table, where, **kwargs)
return {b"geo": json.dumps(geo_meta).encode("utf-8")}
62 changes: 62 additions & 0 deletions stac_geoparquet/arrow/_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from copy import deepcopy
from typing import Any, Dict, Optional, Sequence

import pyarrow as pa
import shapely
import shapely.geometry


def stac_items_to_arrow(
items: Sequence[Dict[str, Any]], *, schema: Optional[pa.Schema] = None
) -> pa.RecordBatch:
"""Convert dicts representing STAC Items to Arrow
This converts GeoJSON geometries to WKB before Arrow conversion to allow multiple
geometry types.
All items will be parsed into a single RecordBatch, meaning that each internal array
is fully contiguous in memory for the length of `items`.
Args:
items: STAC Items to convert to Arrow
Kwargs:
schema: An optional schema that describes the format of the data. Note that this
must represent the geometry column as binary type.
Returns:
Arrow RecordBatch with items in Arrow
"""
# Preprocess GeoJSON to WKB in each STAC item
# Otherwise, pyarrow will try to parse coordinates into a native geometry type and
# if you have multiple geometry types pyarrow will error with
# `ArrowInvalid: cannot mix list and non-list, non-null values`
wkb_items = []
for item in items:
wkb_item = deepcopy(item)
wkb_item["geometry"] = shapely.to_wkb(
shapely.geometry.shape(wkb_item["geometry"]), flavor="iso"
)

# If a proj:geometry key exists in top-level properties, convert that to WKB
if "proj:geometry" in wkb_item["properties"]:
wkb_item["proj:geometry"] = shapely.to_wkb(
shapely.geometry.shape(wkb_item["proj:geometry"]), flavor="iso"
)

# If a proj:geometry key exists in any asset properties, convert that to WKB
for asset_value in wkb_item["assets"].values():
if "proj:geometry" in asset_value:
asset_value["proj:geometry"] = shapely.to_wkb(
shapely.geometry.shape(asset_value["proj:geometry"]),
flavor="iso",
)

wkb_items.append(wkb_item)

if schema is not None:
array = pa.array(wkb_items, type=pa.struct(schema))
else:
array = pa.array(wkb_items)

return pa.RecordBatch.from_struct_array(array)

0 comments on commit 5406f96

Please sign in to comment.