Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stac-sprint updates #27

Merged
merged 9 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ classifiers = ["License :: OSI Approved :: MIT License"]
dynamic = ["version", "description"]
requires-python = ">=3.8"
dependencies = [
"pystac",
"ciso8601",
"geopandas",
"packaging",
"pandas",
"pyarrow",
"pystac",
"shapely",
"packaging",
]

[tool.hatch.version]
Expand Down
109 changes: 109 additions & 0 deletions stac_geoparquet/from_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Convert STAC Items in Arrow Table format to JSON Lines or """

import json
from typing import Iterable, List

import pyarrow as pa
import pyarrow.compute as pc
import shapely


def stac_table_to_ndjson(table: pa.Table, dest: str):
"""Write a STAC Table to a newline-delimited JSON file."""
with open(dest, "w") as f:
for item_dict in stac_table_to_items(table):
json.dump(item_dict, f, separators=(",", ":"))
f.write("\n")


def stac_table_to_items(table: pa.Table) -> Iterable[dict]:
"""Convert a STAC Table to a generator of STAC Item `dict`s"""
table = _undo_stac_table_transformations(table)

# Convert WKB geometry column to GeoJSON, and then assign the geojson geometry when
# converting each row to a dictionary.
for batch in table.to_batches():
geoms = shapely.from_wkb(batch["geometry"])
geojson_strings = shapely.to_geojson(geoms)

# RecordBatch is missing a `drop()` method, so we keep all columns other than
# geometry instead
keep_column_names = [name for name in batch.column_names if name != "geometry"]
struct_batch = batch.select(keep_column_names).to_struct_array()

for row_idx in range(len(struct_batch)):
row_dict = struct_batch[row_idx].as_py()
row_dict["geometry"] = json.loads(geojson_strings[row_idx])
yield row_dict


def _undo_stac_table_transformations(table: pa.Table) -> pa.Table:
"""Undo the transformations done to convert STAC Json into an Arrow Table

Note that this function does _not_ undo the GeoJSON -> WKB geometry transformation,
as that is easier to do when converting each item in the table to a dict.
"""
table = _convert_timestamp_columns_to_string(table)
table = _lower_properties_from_top_level(table)
return table


def _convert_timestamp_columns_to_string(table: pa.Table) -> pa.Table:
"""Convert any datetime columns in the table to a string representation"""
allowed_column_names = {
"datetime", # common metadata
"start_datetime",
"end_datetime",
"created",
"updated",
"expires", # timestamps extension
"published",
"unpublished",
}
for column_name in allowed_column_names:
try:
column = table[column_name]
except KeyError:
continue

table = table.drop(column_name).append_column(
column_name, pc.strftime(column, format="%Y-%m-%dT%H:%M:%SZ")
)

return table


def _lower_properties_from_top_level(table: pa.Table) -> pa.Table:
"""Take properties columns from the top level and wrap them in a struct column"""
stac_top_level_keys = {
"stac_version",
"stac_extensions",
"type",
"id",
"bbox",
"geometry",
"collection",
"links",
"assets",
}

properties_column_names: List[str] = []
properties_column_fields: List[pa.Field] = []
for column_idx in range(table.num_columns):
column_name = table.column_names[column_idx]
if column_name in stac_top_level_keys:
continue

properties_column_names.append(column_name)
properties_column_fields.append(table.schema.field(column_idx))

properties_array_chunks = []
for batch in table.select(properties_column_names).to_batches():
struct_arr = pa.StructArray.from_arrays(
batch.columns, fields=properties_column_fields
)
Comment on lines +102 to +104
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self, this is slightly clearer

Suggested change
struct_arr = pa.StructArray.from_arrays(
batch.columns, fields=properties_column_fields
)
struct_arr = batch.to_struct_array()

properties_array_chunks.append(struct_arr)

return table.drop_columns(properties_column_names).append_column(
"properties", pa.chunked_array(properties_array_chunks)
)
97 changes: 97 additions & 0 deletions stac_geoparquet/streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from contextlib import ExitStack
from dataclasses import dataclass
from io import SEEK_END, BytesIO
from pathlib import Path
from typing import IO, List, Optional

import pyarrow as pa
import pyarrow.parquet as pq

from stac_geoparquet.to_arrow import parse_stac_ndjson_to_arrow

DEFAULT_JSON_CHUNK_SIZE = 300 * 1024 * 1024
DELIMITER_SEEK_SIZE = 64 * 1024
JSON_DELIMITER = b"\n"


@dataclass
class JsonChunkRange:
offset: int
"""The byte offset of the file where this chunk starts."""

length: int
"""The number of bytes in this chunk"""


# path = "/Users/kyle/data/sentinel-stac/out.jsonl"
# path = "/Users/kyle/data/sentinel-stac/out_1.0.0-beta.2.jsonl"
# path = "/Users/kyle/data/sentinel-stac/out_1.0.0.jsonl"
# input_file = open(path, "rb")
# output_path = 'tmp_out_streaming.parquet'


def jsonl_to_geoparquet(
input_file: IO[bytes],
output_path: Path,
*,
chunk_size: int = DEFAULT_JSON_CHUNK_SIZE,
):
json_chunks = find_json_chunks(input_file)
len(json_chunks)

schemas = []
with ExitStack() as ctx:
writer: Optional[pq.ParquetWriter] = None
for json_chunk in json_chunks:
input_file.seek(json_chunk.offset)
buf = input_file.read(json_chunk.length)
buf[:100]
table = parse_stac_ndjson_to_arrow(BytesIO(buf))
schemas.append(table.schema)

# if writer is None:
# writer = ctx.enter_context(pq.ParquetWriter(output_path, schema=table.schema))

# writer.write_table(table)

pa.unify_schemas(schemas)
len(schemas)
len(json_chunks)
schemas


def find_json_chunks(input_file: IO[bytes]) -> List[JsonChunkRange]:
total_byte_length = input_file.seek(0, SEEK_END)
input_file.seek(0)

chunk_ranges = []
previous_chunk_offset = 0
while True:
if previous_chunk_offset + DEFAULT_JSON_CHUNK_SIZE >= total_byte_length:
chunk_range = JsonChunkRange(
offset=previous_chunk_offset,
length=total_byte_length - previous_chunk_offset,
)
chunk_ranges.append(chunk_range)
break

# Advance by chunk size bytes
# TODO: don't advance past end of file
byte_offset = input_file.seek(previous_chunk_offset + DEFAULT_JSON_CHUNK_SIZE)
delim_index = -1
while delim_index == -1:
delim_search_buf = input_file.read(DELIMITER_SEEK_SIZE)
delim_index = delim_search_buf.find(JSON_DELIMITER)
byte_offset += delim_index

chunk_range = JsonChunkRange(
offset=previous_chunk_offset, length=byte_offset - previous_chunk_offset
)
chunk_ranges.append(chunk_range)
# + 1 to skip the newline character
previous_chunk_offset = byte_offset + 1

assert (
chunk_ranges[-1].offset + chunk_ranges[-1].length == total_byte_length
), "The last chunk range should end at the file length"
return chunk_ranges
144 changes: 144 additions & 0 deletions stac_geoparquet/to_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""Convert STAC data into Arrow tables
"""

from collections import defaultdict
import json
from tempfile import NamedTemporaryFile
from typing import IO, Any, Sequence, Union

import ciso8601
import pyarrow as pa
import pyarrow.compute as pc
from pyarrow.json import read_json
import shapely.geometry


def parse_stac_items_to_arrow(items: Sequence[dict[str, Any]]):
inferred_arrow_table = _stac_items_to_arrow(items)
return _updates_for_inferred_arrow_table(inferred_arrow_table)


def parse_stac_ndjson_to_arrow(path: Union[str, IO[bytes]]):
table = _stac_ndjson_to_arrow(path)
return _updates_for_inferred_arrow_table(table)


def _updates_for_inferred_arrow_table(table: pa.Table) -> pa.Table:
table = _bring_properties_to_top_level(table)
table = _convert_geometry_to_wkb(table)
table = _convert_timestamp_columns(table)
return table


def _stac_ndjson_to_arrow(path: Union[str, IO[bytes]]) -> pa.Table:
"""Parse a newline-delimited JSON file to Arrow

Args:
path: The path or opened file object (in binary mode) with newline-delimited
JSON data.

Returns:
pyarrow table matching on-disk schema
"""
table = read_json(path)
return table


def _stac_items_to_arrow(items: Sequence[dict[str, Any]]) -> pa.Table:
"""Convert dicts representing STAC Items to Arrow

First writes a tempfile with newline-delimited JSON data, then uses the pyarrow JSON
parser to load into memory.

Args:
items: _description_

Returns:
_description_
"""
# TODO: Handle STAC items with different schemas
# This will fail if any of the items is missing a field since the arrays
# will be different lengths.
d = defaultdict(list)

for item in items:
for k, v in item.items():
d[k].append(v)

arrays = {k: pa.array(v) for k, v in d.items()}
t = pa.table(arrays)
Comment on lines +59 to +69
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you even have to do this...? You're manually constructing a dictionary of arrays, when you can pass the list of dicts directly.

import pyarrow as pa
d1 = {
    'a': 1,
    'b': {
        'c': 'foo'
    }
}
d2 = {
    'a': 2,
    'b': {
        'c': 'bar'
    }
}
pa.array([d1, d2])
# <pyarrow.lib.StructArray object at 0x12665ba60>
# -- is_valid: all not null
# -- child 0 type: int64
#   [
#     1,
#     2
#   ]
# -- child 1 type: struct<c: string>
#   -- is_valid: all not null
#   -- child 0 type: string
#     [
#       "foo",
#       "bar"
#     ]

obviously this requires that the dicts have the same schema, but this is a requirement for now anyways

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

obviously this requires that the dicts have the same schema, but this is a requirement for now anyways

Actually, this handles schema resolution automatically, just like pa.json.read_json does. The only downside is that all the input data has to fit in memory at once for the schema resolution:

import pyarrow as pa
d1 = {
    'a': 1,
    'b': {
        'c': 'foo'
    }
}
d2 = {
    'a': 2,
    'b': {
        'c': 'bar',
        'd': 'baz'
    }
}
pa.array([d1, d2])
# <pyarrow.lib.StructArray object at 0x105db5000>
# -- is_valid: all not null
# -- child 0 type: int64
#   [
#     1,
#     2
#   ]
# -- child 1 type: struct<c: string, d: string>
#   -- is_valid: all not null
#   -- child 0 type: string
#     [
#       "foo",
#       "bar"
#     ]
#   -- child 1 type: string
#     [
#       null,
#       "baz"
#     ]

return t


def _bring_properties_to_top_level(table: pa.Table) -> pa.Table:
"""Bring all the fields inside of the nested "properties" struct to the top level"""
properties_field = table.schema.field("properties")
properties_column = table["properties"]

for field_idx in range(properties_field.type.num_fields):
inner_prop_field = properties_field.type.field(field_idx)
table = table.append_column(
inner_prop_field, pc.struct_field(properties_column, field_idx)
)

table = table.drop("properties")
return table


def _convert_geometry_to_wkb(table: pa.Table) -> pa.Table:
"""Convert the geometry column in the table to WKB"""
geoms = shapely.from_geojson(
[json.dumps(item) for item in table["geometry"].to_pylist()]
)
wkb_geoms = shapely.to_wkb(geoms)
return table.drop("geometry").append_column("geometry", pa.array(wkb_geoms))


def _convert_timestamp_columns(table: pa.Table) -> pa.Table:
"""Convert all timestamp columns from a string to an Arrow Timestamp data type"""
allowed_column_names = {
"datetime", # common metadata
"start_datetime",
"end_datetime",
"created",
"updated",
"expires", # timestamps extension
"published",
"unpublished",
}
for column_name in allowed_column_names:
try:
column = table[column_name]
column.type
except KeyError:
continue

if pa.types.is_timestamp(column.type):
continue
elif pa.types.is_string(column.type):
table = table.drop(column_name).append_column(
column_name, _convert_timestamp_column(column)
)
else:
raise ValueError(
f"Inferred time column '{column_name}' was expected to be a string or timestamp data type but got {column.type}"
)

return table


def _convert_timestamp_column(column: pa.ChunkedArray) -> pa.ChunkedArray:
"""Convert an individual timestamp column from string to a Timestamp type"""
chunks = []
for chunk in column.chunks:
parsed_chunk = []
for item in chunk:
if not item.is_valid:
parsed_chunk.append(None)
else:
parsed_chunk.append(ciso8601.parse_rfc3339(item.as_py()))

pyarrow_chunk = pa.array(parsed_chunk)
chunks.append(pyarrow_chunk)

return pa.chunked_array(chunks)