From ec8d8dee3db668ae4233758bf55026396918c200 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 26 Sep 2023 15:25:41 -0400 Subject: [PATCH 01/17] stac from arrow --- stac_geoparquet/stac_arrow.py | 76 +++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 stac_geoparquet/stac_arrow.py diff --git a/stac_geoparquet/stac_arrow.py b/stac_geoparquet/stac_arrow.py new file mode 100644 index 0000000..9f8912f --- /dev/null +++ b/stac_geoparquet/stac_arrow.py @@ -0,0 +1,76 @@ +from typing import IO, Sequence, Any, Union + +import pystac +import geopandas +import pyarrow.compute +import pandas as pd +import numpy as np +import pyarrow as pa +from tempfile import NamedTemporaryFile +import json +import pyarrow.json +import shapely.geometry + + +def _stac_ndjson_to_arrow(path: Union[str, IO[bytes]]) -> pa.Table: + """Parse a newline-delimited JSON file to Arrow + + Args: + path: The path or opened file object (in binary mode) with newline-delimited + JSON data. + + Returns: + pyarrow table matching on-disk schema + """ + table = pa.json.read_json(path) + return table + + +def _stac_items_to_arrow(items: Sequence[dict[str, Any]]) -> pa.Table: + """Convert dicts representing STAC Items to Arrow + + First writes a tempfile with newline-delimited JSON data, then uses the pyarrow JSON + parser to load into memory. + + Args: + items: _description_ + + Returns: + _description_ + """ + with NamedTemporaryFile("w+b", suffix=".json") as f: + for item in items: + f.write(json.dumps(item, separators=(",", ":")).encode("utf-8")) + f.write("\n".encode("utf-8")) + + return _stac_ndjson_to_arrow(f) + + +def bring_properties_to_top_level(table: pa.Table) -> pa.Table: + properties_field = table.schema.field("properties") + properties_column = table["properties"] + + for field_idx in range(properties_field.type.num_fields): + inner_prop_field = properties_field.type.field(field_idx) + table = table.append_column( + inner_prop_field, pa.compute.struct_field(properties_column, field_idx) + ) + + table = table.drop("properties") + return table + + +def convert_geometry_to_wkb(table: pa.Table) -> pa.Table: + """Convert the geometry column in the table to WKB""" + geoms = shapely.from_geojson( + [json.dumps(item) for item in table["geometry"].to_pylist()] + ) + wkb_geoms = shapely.to_wkb(geoms) + return table.drop("geometry").append_column("geometry", pa.array(wkb_geoms)) + + +# path = "/Users/kyle/tmp/sentinel-stac/combined.jsonl" +# table = _stac_ndjson_to_arrow(path) +# table2 = bring_properties_to_top_level(table) + +# table2["geometry"] From aa20a8f6911ca5fd5f19839ffed163bc0a483c41 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 26 Sep 2023 15:48:44 -0400 Subject: [PATCH 02/17] convert timestamps --- stac_geoparquet/stac_arrow.py | 54 +++++++++++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/stac_geoparquet/stac_arrow.py b/stac_geoparquet/stac_arrow.py index 9f8912f..abf8058 100644 --- a/stac_geoparquet/stac_arrow.py +++ b/stac_geoparquet/stac_arrow.py @@ -10,6 +10,9 @@ import json import pyarrow.json import shapely.geometry +import ciso8601 + +dir(ciso8601) def _stac_ndjson_to_arrow(path: Union[str, IO[bytes]]) -> pa.Table: @@ -69,8 +72,55 @@ def convert_geometry_to_wkb(table: pa.Table) -> pa.Table: return table.drop("geometry").append_column("geometry", pa.array(wkb_geoms)) +def convert_timestamps(table: pa.Table) -> pa.Table: + allowed_column_names = { + "datetime", # common metadata + "start_datetime", + "end_datetime", + "created", + "updated", + "expires", # timestamps extension + "published", + "unpublished", + } + for column_name in allowed_column_names: + try: + column = table[column_name] + except KeyError: + continue + + table = table.drop(column_name).append_column( + column_name, _convert_timestamp_column(column) + ) + + return table + + +def _convert_timestamp_column(column: pa.ChunkedArray) -> pa.Table: + chunks = [] + for chunk in column.chunks: + parsed_chunk = [] + for item in chunk: + if not item.is_valid: + parsed_chunk.append(None) + else: + parsed_chunk.append(ciso8601.parse_rfc3339(item.as_py())) + + pyarrow_chunk = pa.array(parsed_chunk) + chunks.append(pyarrow_chunk) + + return pa.chunked_array(chunks) + + # path = "/Users/kyle/tmp/sentinel-stac/combined.jsonl" # table = _stac_ndjson_to_arrow(path) -# table2 = bring_properties_to_top_level(table) +# table = bring_properties_to_top_level(table) +# table = convert_geometry_to_wkb(table) +# table = convert_timestamps(table) + + +# table["created"][0] + +# # table2 = bring_properties_to_top_level(table) -# table2["geometry"] +# # table2["geometry"] From c38446c0e3370e5590c411eef93a10e84068fec4 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 26 Sep 2023 15:50:10 -0400 Subject: [PATCH 03/17] add ciso8601 --- pyproject.toml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8c96044..56eca80 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,12 +11,13 @@ classifiers = ["License :: OSI Approved :: MIT License"] dynamic = ["version", "description"] requires-python = ">=3.8" dependencies = [ - "pystac", + "ciso8601", "geopandas", + "packaging", "pandas", "pyarrow", + "pystac", "shapely", - "packaging", ] [project.optional-dependencies] From 4d196a57e40cd4fb0155efad643b054fef03dad5 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 26 Sep 2023 16:11:55 -0400 Subject: [PATCH 04/17] parse to arrow --- stac_geoparquet/stac_arrow.py | 46 ++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/stac_geoparquet/stac_arrow.py b/stac_geoparquet/stac_arrow.py index abf8058..f7c7796 100644 --- a/stac_geoparquet/stac_arrow.py +++ b/stac_geoparquet/stac_arrow.py @@ -1,18 +1,33 @@ -from typing import IO, Sequence, Any, Union +import json +from tempfile import NamedTemporaryFile +from typing import IO, Any, Sequence, Union -import pystac +import ciso8601 import geopandas -import pyarrow.compute -import pandas as pd import numpy as np +import pandas as pd import pyarrow as pa -from tempfile import NamedTemporaryFile -import json +import pyarrow.compute import pyarrow.json +import pystac import shapely.geometry -import ciso8601 -dir(ciso8601) + +def parse_stac_items_to_arrow(items: Sequence[dict[str, Any]]): + inferred_arrow_table = _stac_items_to_arrow(items) + return _updates_for_inferred_arrow_table(inferred_arrow_table) + + +def parse_stac_ndjson_to_arrow(path: Union[str, IO[bytes]]): + inferred_arrow_table = _stac_ndjson_to_arrow(path) + return _updates_for_inferred_arrow_table(inferred_arrow_table) + + +def _updates_for_inferred_arrow_table(table: pa.Table) -> pa.Table: + table = _bring_properties_to_top_level(table) + table = _convert_geometry_to_wkb(table) + table = _convert_timestamp_columns(table) + return table def _stac_ndjson_to_arrow(path: Union[str, IO[bytes]]) -> pa.Table: @@ -49,7 +64,7 @@ def _stac_items_to_arrow(items: Sequence[dict[str, Any]]) -> pa.Table: return _stac_ndjson_to_arrow(f) -def bring_properties_to_top_level(table: pa.Table) -> pa.Table: +def _bring_properties_to_top_level(table: pa.Table) -> pa.Table: properties_field = table.schema.field("properties") properties_column = table["properties"] @@ -63,7 +78,7 @@ def bring_properties_to_top_level(table: pa.Table) -> pa.Table: return table -def convert_geometry_to_wkb(table: pa.Table) -> pa.Table: +def _convert_geometry_to_wkb(table: pa.Table) -> pa.Table: """Convert the geometry column in the table to WKB""" geoms = shapely.from_geojson( [json.dumps(item) for item in table["geometry"].to_pylist()] @@ -72,7 +87,7 @@ def convert_geometry_to_wkb(table: pa.Table) -> pa.Table: return table.drop("geometry").append_column("geometry", pa.array(wkb_geoms)) -def convert_timestamps(table: pa.Table) -> pa.Table: +def _convert_timestamp_columns(table: pa.Table) -> pa.Table: allowed_column_names = { "datetime", # common metadata "start_datetime", @@ -112,15 +127,12 @@ def _convert_timestamp_column(column: pa.ChunkedArray) -> pa.Table: return pa.chunked_array(chunks) -# path = "/Users/kyle/tmp/sentinel-stac/combined.jsonl" -# table = _stac_ndjson_to_arrow(path) -# table = bring_properties_to_top_level(table) -# table = convert_geometry_to_wkb(table) -# table = convert_timestamps(table) +path = "/Users/kyle/tmp/sentinel-stac/combined.jsonl" +table = _stac_ndjson_to_arrow(path) # table["created"][0] -# # table2 = bring_properties_to_top_level(table) +# # table2 = _bring_properties_to_top_level(table) # # table2["geometry"] From 5612e6e894ed8e98e0e265eb2a669ec960735216 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 27 Sep 2023 16:01:25 -0400 Subject: [PATCH 05/17] rename to to_arrow.py --- .../{stac_arrow.py => to_arrow.py} | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) rename stac_geoparquet/{stac_arrow.py => to_arrow.py} (88%) diff --git a/stac_geoparquet/stac_arrow.py b/stac_geoparquet/to_arrow.py similarity index 88% rename from stac_geoparquet/stac_arrow.py rename to stac_geoparquet/to_arrow.py index f7c7796..256f5cd 100644 --- a/stac_geoparquet/stac_arrow.py +++ b/stac_geoparquet/to_arrow.py @@ -1,14 +1,14 @@ +"""Convert STAC data into Arrow tables +""" + import json from tempfile import NamedTemporaryFile from typing import IO, Any, Sequence, Union import ciso8601 -import geopandas -import numpy as np -import pandas as pd import pyarrow as pa -import pyarrow.compute -import pyarrow.json +import pyarrow.compute as pc +from pyarrow.json import read_json import pystac import shapely.geometry @@ -40,7 +40,7 @@ def _stac_ndjson_to_arrow(path: Union[str, IO[bytes]]) -> pa.Table: Returns: pyarrow table matching on-disk schema """ - table = pa.json.read_json(path) + table = read_json(path) return table @@ -65,13 +65,14 @@ def _stac_items_to_arrow(items: Sequence[dict[str, Any]]) -> pa.Table: def _bring_properties_to_top_level(table: pa.Table) -> pa.Table: + """Bring all the fields inside of the nested "properties" struct to the top level""" properties_field = table.schema.field("properties") properties_column = table["properties"] for field_idx in range(properties_field.type.num_fields): inner_prop_field = properties_field.type.field(field_idx) table = table.append_column( - inner_prop_field, pa.compute.struct_field(properties_column, field_idx) + inner_prop_field, pc.struct_field(properties_column, field_idx) ) table = table.drop("properties") @@ -88,6 +89,7 @@ def _convert_geometry_to_wkb(table: pa.Table) -> pa.Table: def _convert_timestamp_columns(table: pa.Table) -> pa.Table: + """Convert all timestamp columns from a string to an Arrow Timestamp data type""" allowed_column_names = { "datetime", # common metadata "start_datetime", @@ -112,6 +114,7 @@ def _convert_timestamp_columns(table: pa.Table) -> pa.Table: def _convert_timestamp_column(column: pa.ChunkedArray) -> pa.Table: + """Convert an individual timestamp column from string to a Timestamp type""" chunks = [] for chunk in column.chunks: parsed_chunk = [] @@ -125,14 +128,3 @@ def _convert_timestamp_column(column: pa.ChunkedArray) -> pa.Table: chunks.append(pyarrow_chunk) return pa.chunked_array(chunks) - - -path = "/Users/kyle/tmp/sentinel-stac/combined.jsonl" -table = _stac_ndjson_to_arrow(path) - - -# table["created"][0] - -# # table2 = _bring_properties_to_top_level(table) - -# # table2["geometry"] From de69b61f6fb47fe3286e9c67492ba3563b3add64 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 27 Sep 2023 17:31:49 -0400 Subject: [PATCH 06/17] Convert back to json lines --- stac_geoparquet/from_arrow.py | 109 ++++++++++++++++++++++++++++++++++ stac_geoparquet/to_arrow.py | 3 +- 2 files changed, 110 insertions(+), 2 deletions(-) create mode 100644 stac_geoparquet/from_arrow.py diff --git a/stac_geoparquet/from_arrow.py b/stac_geoparquet/from_arrow.py new file mode 100644 index 0000000..e9ebce7 --- /dev/null +++ b/stac_geoparquet/from_arrow.py @@ -0,0 +1,109 @@ +"""Convert STAC Items in Arrow Table format to JSON Lines or """ + +import json +from typing import Iterable, List + +import pyarrow as pa +import pyarrow.compute as pc +import shapely + + +def stac_table_to_ndjson(table: pa.Table, dest: str): + """Write a STAC Table to a newline-delimited JSON file.""" + with open(dest, "w") as f: + for item_dict in stac_table_to_items(table): + json.dump(item_dict, f, separators=(",", ":")) + f.write("\n") + + +def stac_table_to_items(table: pa.Table) -> Iterable[dict]: + """Convert a STAC Table to a generator of STAC Item `dict`s""" + table = _undo_stac_table_transformations(table) + + # Convert WKB geometry column to GeoJSON, and then assign the geojson geometry when + # converting each row to a dictionary. + for batch in table.to_batches(): + geoms = shapely.from_wkb(batch["geometry"]) + geojson_strings = shapely.to_geojson(geoms) + + # RecordBatch is missing a `drop()` method, so we keep all columns other than + # geometry instead + keep_column_names = [name for name in batch.column_names if name != "geometry"] + struct_batch = batch.select(keep_column_names).to_struct_array() + + for row_idx in range(len(struct_batch)): + row_dict = struct_batch[row_idx].as_py() + row_dict["geometry"] = json.loads(geojson_strings[row_idx]) + yield row_dict + + +def _undo_stac_table_transformations(table: pa.Table) -> pa.Table: + """Undo the transformations done to convert STAC Json into an Arrow Table + + Note that this function does _not_ undo the GeoJSON -> WKB geometry transformation, + as that is easier to do when converting each item in the table to a dict. + """ + table = _convert_timestamp_columns_to_string(table)["created"] + table = _lower_properties_from_top_level(table) + return table + + +def _convert_timestamp_columns_to_string(table: pa.Table) -> pa.Table: + """Convert any datetime columns in the table to a string representation""" + allowed_column_names = { + "datetime", # common metadata + "start_datetime", + "end_datetime", + "created", + "updated", + "expires", # timestamps extension + "published", + "unpublished", + } + for column_name in allowed_column_names: + try: + column = table[column_name] + except KeyError: + continue + + table = table.drop(column_name).append_column( + column_name, pc.strftime(column, format="%Y-%m-%dT%H:%M:%SZ") + ) + + return table + + +def _lower_properties_from_top_level(table: pa.Table) -> pa.Table: + """Take properties columns from the top level and wrap them in a struct column""" + stac_top_level_keys = { + "stac_version", + "stac_extensions", + "type", + "id", + "bbox", + "geometry", + "collection", + "links", + "assets", + } + + properties_column_names: List[str] = [] + properties_column_fields: List[pa.Field] = [] + for column_idx in range(table.num_columns): + column_name = table.column_names[column_idx] + if column_name in stac_top_level_keys: + continue + + properties_column_names.append(column_name) + properties_column_fields.append(table.schema.field(column_idx)) + + properties_array_chunks = [] + for batch in table.select(properties_column_names).to_batches(): + struct_arr = pa.StructArray.from_arrays( + batch.columns, fields=properties_column_fields + ) + properties_array_chunks.append(struct_arr) + + return table.drop_columns(properties_column_names).append_column( + "properties", pa.chunked_array(properties_array_chunks) + ) diff --git a/stac_geoparquet/to_arrow.py b/stac_geoparquet/to_arrow.py index 256f5cd..f443683 100644 --- a/stac_geoparquet/to_arrow.py +++ b/stac_geoparquet/to_arrow.py @@ -9,7 +9,6 @@ import pyarrow as pa import pyarrow.compute as pc from pyarrow.json import read_json -import pystac import shapely.geometry @@ -113,7 +112,7 @@ def _convert_timestamp_columns(table: pa.Table) -> pa.Table: return table -def _convert_timestamp_column(column: pa.ChunkedArray) -> pa.Table: +def _convert_timestamp_column(column: pa.ChunkedArray) -> pa.ChunkedArray: """Convert an individual timestamp column from string to a Timestamp type""" chunks = [] for chunk in column.chunks: From 37b21f3039eed06c68e611884848b19ab1c79565 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Thu, 28 Sep 2023 15:04:29 -0400 Subject: [PATCH 07/17] wip chunked jsonl reader --- stac_geoparquet/from_arrow.py | 2 +- stac_geoparquet/streaming.py | 97 +++++++++++++++++++++++++++++++++++ stac_geoparquet/to_arrow.py | 19 +++++-- 3 files changed, 112 insertions(+), 6 deletions(-) create mode 100644 stac_geoparquet/streaming.py diff --git a/stac_geoparquet/from_arrow.py b/stac_geoparquet/from_arrow.py index e9ebce7..163a80d 100644 --- a/stac_geoparquet/from_arrow.py +++ b/stac_geoparquet/from_arrow.py @@ -43,7 +43,7 @@ def _undo_stac_table_transformations(table: pa.Table) -> pa.Table: Note that this function does _not_ undo the GeoJSON -> WKB geometry transformation, as that is easier to do when converting each item in the table to a dict. """ - table = _convert_timestamp_columns_to_string(table)["created"] + table = _convert_timestamp_columns_to_string(table) table = _lower_properties_from_top_level(table) return table diff --git a/stac_geoparquet/streaming.py b/stac_geoparquet/streaming.py new file mode 100644 index 0000000..cca030b --- /dev/null +++ b/stac_geoparquet/streaming.py @@ -0,0 +1,97 @@ +from contextlib import ExitStack +from dataclasses import dataclass +from io import SEEK_END, BytesIO +from pathlib import Path +from typing import IO, List, Optional + +import pyarrow as pa +import pyarrow.parquet as pq + +from stac_geoparquet.to_arrow import parse_stac_ndjson_to_arrow + +DEFAULT_JSON_CHUNK_SIZE = 300 * 1024 * 1024 +DELIMITER_SEEK_SIZE = 64 * 1024 +JSON_DELIMITER = b"\n" + + +@dataclass +class JsonChunkRange: + offset: int + """The byte offset of the file where this chunk starts.""" + + length: int + """The number of bytes in this chunk""" + + +# path = "/Users/kyle/data/sentinel-stac/out.jsonl" +# path = "/Users/kyle/data/sentinel-stac/out_1.0.0-beta.2.jsonl" +# path = "/Users/kyle/data/sentinel-stac/out_1.0.0.jsonl" +# input_file = open(path, "rb") +# output_path = 'tmp_out_streaming.parquet' + + +def jsonl_to_geoparquet( + input_file: IO[bytes], + output_path: Path, + *, + chunk_size: int = DEFAULT_JSON_CHUNK_SIZE, +): + json_chunks = find_json_chunks(input_file) + len(json_chunks) + + schemas = [] + with ExitStack() as ctx: + writer: Optional[pq.ParquetWriter] = None + for json_chunk in json_chunks: + input_file.seek(json_chunk.offset) + buf = input_file.read(json_chunk.length) + buf[:100] + table = parse_stac_ndjson_to_arrow(BytesIO(buf)) + schemas.append(table.schema) + + # if writer is None: + # writer = ctx.enter_context(pq.ParquetWriter(output_path, schema=table.schema)) + + # writer.write_table(table) + + pa.unify_schemas(schemas) + len(schemas) + len(json_chunks) + schemas + + +def find_json_chunks(input_file: IO[bytes]) -> List[JsonChunkRange]: + total_byte_length = input_file.seek(0, SEEK_END) + input_file.seek(0) + + chunk_ranges = [] + previous_chunk_offset = 0 + while True: + if previous_chunk_offset + DEFAULT_JSON_CHUNK_SIZE >= total_byte_length: + chunk_range = JsonChunkRange( + offset=previous_chunk_offset, + length=total_byte_length - previous_chunk_offset, + ) + chunk_ranges.append(chunk_range) + break + + # Advance by chunk size bytes + # TODO: don't advance past end of file + byte_offset = input_file.seek(previous_chunk_offset + DEFAULT_JSON_CHUNK_SIZE) + delim_index = -1 + while delim_index == -1: + delim_search_buf = input_file.read(DELIMITER_SEEK_SIZE) + delim_index = delim_search_buf.find(JSON_DELIMITER) + byte_offset += delim_index + + chunk_range = JsonChunkRange( + offset=previous_chunk_offset, length=byte_offset - previous_chunk_offset + ) + chunk_ranges.append(chunk_range) + # + 1 to skip the newline character + previous_chunk_offset = byte_offset + 1 + + assert ( + chunk_ranges[-1].offset + chunk_ranges[-1].length == total_byte_length + ), "The last chunk range should end at the file length" + return chunk_ranges diff --git a/stac_geoparquet/to_arrow.py b/stac_geoparquet/to_arrow.py index f443683..145d9ab 100644 --- a/stac_geoparquet/to_arrow.py +++ b/stac_geoparquet/to_arrow.py @@ -18,8 +18,8 @@ def parse_stac_items_to_arrow(items: Sequence[dict[str, Any]]): def parse_stac_ndjson_to_arrow(path: Union[str, IO[bytes]]): - inferred_arrow_table = _stac_ndjson_to_arrow(path) - return _updates_for_inferred_arrow_table(inferred_arrow_table) + table = _stac_ndjson_to_arrow(path) + return _updates_for_inferred_arrow_table(table) def _updates_for_inferred_arrow_table(table: pa.Table) -> pa.Table: @@ -55,6 +55,7 @@ def _stac_items_to_arrow(items: Sequence[dict[str, Any]]) -> pa.Table: Returns: _description_ """ + # TODO:!! Can just call pa.array() on the list of python dicts!! with NamedTemporaryFile("w+b", suffix=".json") as f: for item in items: f.write(json.dumps(item, separators=(",", ":")).encode("utf-8")) @@ -102,12 +103,20 @@ def _convert_timestamp_columns(table: pa.Table) -> pa.Table: for column_name in allowed_column_names: try: column = table[column_name] + column.type except KeyError: continue - table = table.drop(column_name).append_column( - column_name, _convert_timestamp_column(column) - ) + if pa.types.is_timestamp(column.type): + continue + elif pa.types.is_string(column.type): + table = table.drop(column_name).append_column( + column_name, _convert_timestamp_column(column) + ) + else: + raise ValueError( + f"Inferred time column '{column_name}' was expected to be a string or timestamp data type but got {column.type}" + ) return table From bc40430fef13992238f9892636a1848bf4296815 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Fri, 6 Oct 2023 16:06:46 -0500 Subject: [PATCH 08/17] Avoid JSON in _items_to_arrow --- stac_geoparquet/to_arrow.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/stac_geoparquet/to_arrow.py b/stac_geoparquet/to_arrow.py index 145d9ab..da06e6b 100644 --- a/stac_geoparquet/to_arrow.py +++ b/stac_geoparquet/to_arrow.py @@ -1,6 +1,7 @@ """Convert STAC data into Arrow tables """ +from collections import defaultdict import json from tempfile import NamedTemporaryFile from typing import IO, Any, Sequence, Union @@ -55,13 +56,18 @@ def _stac_items_to_arrow(items: Sequence[dict[str, Any]]) -> pa.Table: Returns: _description_ """ - # TODO:!! Can just call pa.array() on the list of python dicts!! - with NamedTemporaryFile("w+b", suffix=".json") as f: - for item in items: - f.write(json.dumps(item, separators=(",", ":")).encode("utf-8")) - f.write("\n".encode("utf-8")) - - return _stac_ndjson_to_arrow(f) + # TODO: Handle STAC items with different schemas + # This will fail if any of the items is missing a field since the arrays + # will be different lengths. + d = defaultdict(list) + + for item in items: + for k, v in item.items(): + d[k].append(v) + + arrays = {k: pa.array(v) for k, v in d.items()} + t = pa.table(arrays) + return t def _bring_properties_to_top_level(table: pa.Table) -> pa.Table: From ee889e32c4e6479d1a3b52cff6f611b21932e201 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 15 Apr 2024 22:47:03 -0400 Subject: [PATCH 09/17] Updates to to-arrow conversion --- stac_geoparquet/to_arrow.py | 147 +++++++++++++++++++++++++----------- 1 file changed, 105 insertions(+), 42 deletions(-) diff --git a/stac_geoparquet/to_arrow.py b/stac_geoparquet/to_arrow.py index da06e6b..7a9303a 100644 --- a/stac_geoparquet/to_arrow.py +++ b/stac_geoparquet/to_arrow.py @@ -1,73 +1,137 @@ -"""Convert STAC data into Arrow tables -""" +"""Convert STAC data into Arrow tables""" -from collections import defaultdict import json -from tempfile import NamedTemporaryFile -from typing import IO, Any, Sequence, Union +from pathlib import Path +from typing import Any, Dict, List, Optional, Sequence, Union import ciso8601 import pyarrow as pa import pyarrow.compute as pc -from pyarrow.json import read_json +import shapely import shapely.geometry -def parse_stac_items_to_arrow(items: Sequence[dict[str, Any]]): - inferred_arrow_table = _stac_items_to_arrow(items) - return _updates_for_inferred_arrow_table(inferred_arrow_table) +def _chunks(lst: Sequence[Dict[str, Any]], n: int): + """Yield successive n-sized chunks from lst.""" + for i in range(0, len(lst), n): + yield lst[i : i + n] -def parse_stac_ndjson_to_arrow(path: Union[str, IO[bytes]]): - table = _stac_ndjson_to_arrow(path) - return _updates_for_inferred_arrow_table(table) +def parse_stac_items_to_arrow( + items: Sequence[Dict[str, Any]], + *, + chunk_size: int = 8192, + schema: Optional[pa.Schema] = None, +) -> pa.Table: + """Parse a collection of STAC Items to a :class:`pyarrow.Table`. - -def _updates_for_inferred_arrow_table(table: pa.Table) -> pa.Table: - table = _bring_properties_to_top_level(table) - table = _convert_geometry_to_wkb(table) - table = _convert_timestamp_columns(table) - return table - - -def _stac_ndjson_to_arrow(path: Union[str, IO[bytes]]) -> pa.Table: - """Parse a newline-delimited JSON file to Arrow + The objects under `properties` are moved up to the top-level of the + Table, similar to :meth:`geopandas.GeoDataFrame.from_features`. Args: - path: The path or opened file object (in binary mode) with newline-delimited - JSON data. + items: the STAC Items to convert + chunk_size: The chunk size to use for Arrow record batches. This only takes effect if `schema` is not None. When `schema` is None, the input will be parsed into a single contiguous record batch. Defaults to 8192. + schema: The schema of the input data. If provided, can improve memory use; otherwise all items need to be parsed into a single array for schema inference. Defaults to None. Returns: - pyarrow table matching on-disk schema + a pyarrow Table with the STAC-GeoParquet representation of items. """ - table = read_json(path) + + if schema is not None: + # If schema is provided, then for better memory usage we parse input STAC items + # to Arrow batches in chunks. + batches = [] + for chunk in _chunks(items, chunk_size): + batches.append(_stac_items_to_arrow(chunk, schema=schema)) + + stac_table = pa.Table.from_batches(batches, schema=schema) + else: + # If schema is _not_ provided, then we must convert to Arrow all at once, or + # else it would be possible for a STAC item late in the collection (after the + # first chunk) to have a different schema and not match the schema inferred for + # the first chunk. + stac_table = pa.Table.from_batches(_stac_items_to_arrow(items)) + + return _process_arrow_table(stac_table) + + +def parse_stac_ndjson_to_arrow( + path: Union[str, Path], + *, + chunk_size: int = 8192, + schema: Optional[pa.Schema] = None, +): + # If the schema was not provided, then we need to load all data into memory at once + # to perform schema resolution. + if schema is None: + with open(path) as f: + items = [] + for line in f: + items.append(json.loads(line)) + + return parse_stac_items_to_arrow(items, chunk_size=chunk_size, schema=schema) + + # Otherwise, we can stream over the input, converting each batch of `chunk_size` + # into an Arrow RecordBatch at a time. This is much more memory efficient. + with open(path) as f: + batches: List[pa.RecordBatch] = [] + items: List[dict] = [] + + for line in f: + items.append(json.loads(line)) + + if len(items) >= chunk_size: + batches.append(_stac_items_to_arrow(items, schema=schema)) + items = [] + + # Don't forget the last chunk in case the total number of items is not a multiple of + # chunk_size. + if len(items) > 0: + batches.append(_stac_items_to_arrow(items, schema=schema)) + + stac_table = pa.Table.from_batches(batches, schema=schema) + return _process_arrow_table(stac_table) + + +def _process_arrow_table(table: pa.Table) -> pa.Table: + table = _bring_properties_to_top_level(table) + table = _convert_timestamp_columns(table) return table -def _stac_items_to_arrow(items: Sequence[dict[str, Any]]) -> pa.Table: +def _stac_items_to_arrow( + items: Sequence[Dict[str, Any]], *, schema: Optional[pa.Schema] = None +) -> pa.RecordBatch: """Convert dicts representing STAC Items to Arrow - First writes a tempfile with newline-delimited JSON data, then uses the pyarrow JSON - parser to load into memory. + This converts GeoJSON geometries to WKB before Arrow conversion to allow multiple + geometry types. + + All items will be parsed into a single RecordBatch, meaning that each internal array + is fully contiguous in memory for the length of `items`. Args: - items: _description_ + items: STAC Items to convert to Arrow + + Kwargs: + schema: An optional schema that describes the format of the data. Note that this must represent the geometry column as binary type. Returns: - _description_ + Arrow RecordBatch with items in Arrow """ - # TODO: Handle STAC items with different schemas - # This will fail if any of the items is missing a field since the arrays - # will be different lengths. - d = defaultdict(list) - + # Preprocess GeoJSON to WKB in each STAC item + # Otherwise, pyarrow will try to parse coordinates into a native geometry type and + # if you have multiple geometry types pyarrow will error with + # `ArrowInvalid: cannot mix list and non-list, non-null values` for item in items: - for k, v in item.items(): - d[k].append(v) + item["geometry"] = shapely.to_wkb(shapely.geometry.shape(item["geometry"])) + + if schema is not None: + array = pa.array(items, type=pa.struct(schema)) + else: + array = pa.array(items) - arrays = {k: pa.array(v) for k, v in d.items()} - t = pa.table(arrays) - return t + return pa.RecordBatch.from_struct_array(array) def _bring_properties_to_top_level(table: pa.Table) -> pa.Table: @@ -109,7 +173,6 @@ def _convert_timestamp_columns(table: pa.Table) -> pa.Table: for column_name in allowed_column_names: try: column = table[column_name] - column.type except KeyError: continue From 52c38498e4817a10098724f698c4660f3aff7587 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 15 Apr 2024 22:47:48 -0400 Subject: [PATCH 10/17] Remove streaming for now --- stac_geoparquet/streaming.py | 97 ------------------------------------ 1 file changed, 97 deletions(-) delete mode 100644 stac_geoparquet/streaming.py diff --git a/stac_geoparquet/streaming.py b/stac_geoparquet/streaming.py deleted file mode 100644 index cca030b..0000000 --- a/stac_geoparquet/streaming.py +++ /dev/null @@ -1,97 +0,0 @@ -from contextlib import ExitStack -from dataclasses import dataclass -from io import SEEK_END, BytesIO -from pathlib import Path -from typing import IO, List, Optional - -import pyarrow as pa -import pyarrow.parquet as pq - -from stac_geoparquet.to_arrow import parse_stac_ndjson_to_arrow - -DEFAULT_JSON_CHUNK_SIZE = 300 * 1024 * 1024 -DELIMITER_SEEK_SIZE = 64 * 1024 -JSON_DELIMITER = b"\n" - - -@dataclass -class JsonChunkRange: - offset: int - """The byte offset of the file where this chunk starts.""" - - length: int - """The number of bytes in this chunk""" - - -# path = "/Users/kyle/data/sentinel-stac/out.jsonl" -# path = "/Users/kyle/data/sentinel-stac/out_1.0.0-beta.2.jsonl" -# path = "/Users/kyle/data/sentinel-stac/out_1.0.0.jsonl" -# input_file = open(path, "rb") -# output_path = 'tmp_out_streaming.parquet' - - -def jsonl_to_geoparquet( - input_file: IO[bytes], - output_path: Path, - *, - chunk_size: int = DEFAULT_JSON_CHUNK_SIZE, -): - json_chunks = find_json_chunks(input_file) - len(json_chunks) - - schemas = [] - with ExitStack() as ctx: - writer: Optional[pq.ParquetWriter] = None - for json_chunk in json_chunks: - input_file.seek(json_chunk.offset) - buf = input_file.read(json_chunk.length) - buf[:100] - table = parse_stac_ndjson_to_arrow(BytesIO(buf)) - schemas.append(table.schema) - - # if writer is None: - # writer = ctx.enter_context(pq.ParquetWriter(output_path, schema=table.schema)) - - # writer.write_table(table) - - pa.unify_schemas(schemas) - len(schemas) - len(json_chunks) - schemas - - -def find_json_chunks(input_file: IO[bytes]) -> List[JsonChunkRange]: - total_byte_length = input_file.seek(0, SEEK_END) - input_file.seek(0) - - chunk_ranges = [] - previous_chunk_offset = 0 - while True: - if previous_chunk_offset + DEFAULT_JSON_CHUNK_SIZE >= total_byte_length: - chunk_range = JsonChunkRange( - offset=previous_chunk_offset, - length=total_byte_length - previous_chunk_offset, - ) - chunk_ranges.append(chunk_range) - break - - # Advance by chunk size bytes - # TODO: don't advance past end of file - byte_offset = input_file.seek(previous_chunk_offset + DEFAULT_JSON_CHUNK_SIZE) - delim_index = -1 - while delim_index == -1: - delim_search_buf = input_file.read(DELIMITER_SEEK_SIZE) - delim_index = delim_search_buf.find(JSON_DELIMITER) - byte_offset += delim_index - - chunk_range = JsonChunkRange( - offset=previous_chunk_offset, length=byte_offset - previous_chunk_offset - ) - chunk_ranges.append(chunk_range) - # + 1 to skip the newline character - previous_chunk_offset = byte_offset + 1 - - assert ( - chunk_ranges[-1].offset + chunk_ranges[-1].length == total_byte_length - ), "The last chunk range should end at the file length" - return chunk_ranges From ce487008315f9c3ce2b7e668113f35aaf0d1ee64 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 15 Apr 2024 23:03:49 -0400 Subject: [PATCH 11/17] lint --- stac_geoparquet/to_arrow.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/stac_geoparquet/to_arrow.py b/stac_geoparquet/to_arrow.py index 7a9303a..06ef5cc 100644 --- a/stac_geoparquet/to_arrow.py +++ b/stac_geoparquet/to_arrow.py @@ -30,8 +30,12 @@ def parse_stac_items_to_arrow( Args: items: the STAC Items to convert - chunk_size: The chunk size to use for Arrow record batches. This only takes effect if `schema` is not None. When `schema` is None, the input will be parsed into a single contiguous record batch. Defaults to 8192. - schema: The schema of the input data. If provided, can improve memory use; otherwise all items need to be parsed into a single array for schema inference. Defaults to None. + chunk_size: The chunk size to use for Arrow record batches. This only takes + effect if `schema` is not None. When `schema` is None, the input will be + parsed into a single contiguous record batch. Defaults to 8192. + schema: The schema of the input data. If provided, can improve memory use; + otherwise all items need to be parsed into a single array for schema + inference. Defaults to None. Returns: a pyarrow Table with the STAC-GeoParquet representation of items. @@ -114,7 +118,8 @@ def _stac_items_to_arrow( items: STAC Items to convert to Arrow Kwargs: - schema: An optional schema that describes the format of the data. Note that this must represent the geometry column as binary type. + schema: An optional schema that describes the format of the data. Note that this + must represent the geometry column as binary type. Returns: Arrow RecordBatch with items in Arrow @@ -184,7 +189,8 @@ def _convert_timestamp_columns(table: pa.Table) -> pa.Table: ) else: raise ValueError( - f"Inferred time column '{column_name}' was expected to be a string or timestamp data type but got {column.type}" + f"Inferred time column '{column_name}' was expected to be a string or" + f" timestamp data type but got {column.type}" ) return table From 5d7277002d5f89f5fc8b2bcac3f8bc972181b825 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 15 Apr 2024 23:07:25 -0400 Subject: [PATCH 12/17] lint --- pyproject.toml | 25 +++++++++---------------- stac_geoparquet/to_arrow.py | 9 +++++---- 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 7ee010e..0efdd6c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,9 +4,9 @@ build-backend = "hatchling.build" [project] name = "stac_geoparquet" -authors = [{name = "Tom Augspurger", email = "taugspurger@microsoft.com"}] +authors = [{ name = "Tom Augspurger", email = "taugspurger@microsoft.com" }] readme = "README.md" -license = {file = "LICENSE"} +license = { file = "LICENSE" } classifiers = ["License :: OSI Approved :: MIT License"] dynamic = ["version", "description"] requires-python = ">=3.8" @@ -34,13 +34,7 @@ pgstac = [ "tqdm", "python-dateutil", ] -pc = [ - "adlfs", - "pypgstac", - "psycopg[binary,pool]", - "tqdm", - "azure-data-tables", -] +pc = ["adlfs", "pypgstac", "psycopg[binary,pool]", "tqdm", "azure-data-tables"] test = [ "pytest", "requests", @@ -58,9 +52,7 @@ pc-geoparquet = "stac_geoparquet.cli:main" [tool.pytest.ini_options] minversion = "6.0" -filterwarnings = [ - "ignore:.*distutils Version.*:DeprecationWarning", -] +filterwarnings = ["ignore:.*distutils Version.*:DeprecationWarning"] [tool.mypy] @@ -68,14 +60,15 @@ python_version = "3.10" [[tool.mypy.overrides]] module = [ - "shapely.*", + "ciso8601.*", + "fsspec.*", "geopandas.*", "pandas.*", - "fsspec.*", - "tqdm.*", - "pypgstac.*", "pyarrow.*", + "pypgstac.*", "rich.*", + "shapely.*", + "tqdm.*", ] ignore_missing_imports = true diff --git a/stac_geoparquet/to_arrow.py b/stac_geoparquet/to_arrow.py index 06ef5cc..9f4c56f 100644 --- a/stac_geoparquet/to_arrow.py +++ b/stac_geoparquet/to_arrow.py @@ -1,6 +1,7 @@ """Convert STAC data into Arrow tables""" import json +from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Union @@ -65,11 +66,13 @@ def parse_stac_ndjson_to_arrow( chunk_size: int = 8192, schema: Optional[pa.Schema] = None, ): + # Define outside of if/else to make mypy happy + items: List[dict] = [] + # If the schema was not provided, then we need to load all data into memory at once # to perform schema resolution. if schema is None: with open(path) as f: - items = [] for line in f: items.append(json.loads(line)) @@ -79,8 +82,6 @@ def parse_stac_ndjson_to_arrow( # into an Arrow RecordBatch at a time. This is much more memory efficient. with open(path) as f: batches: List[pa.RecordBatch] = [] - items: List[dict] = [] - for line in f: items.append(json.loads(line)) @@ -200,7 +201,7 @@ def _convert_timestamp_column(column: pa.ChunkedArray) -> pa.ChunkedArray: """Convert an individual timestamp column from string to a Timestamp type""" chunks = [] for chunk in column.chunks: - parsed_chunk = [] + parsed_chunk: List[Optional[datetime]] = [] for item in chunk: if not item.is_valid: parsed_chunk.append(None) From 713cdd1fc506121ddc1ee3db89f1c470584daecb Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 16 Apr 2024 17:40:31 -0400 Subject: [PATCH 13/17] Convert bbox column to struct layout --- stac_geoparquet/to_arrow.py | 73 ++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 2 deletions(-) diff --git a/stac_geoparquet/to_arrow.py b/stac_geoparquet/to_arrow.py index 9f4c56f..e741561 100644 --- a/stac_geoparquet/to_arrow.py +++ b/stac_geoparquet/to_arrow.py @@ -1,11 +1,13 @@ """Convert STAC data into Arrow tables""" import json +from copy import deepcopy from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Union import ciso8601 +import numpy as np import pyarrow as pa import pyarrow.compute as pc import shapely @@ -55,7 +57,7 @@ def parse_stac_items_to_arrow( # else it would be possible for a STAC item late in the collection (after the # first chunk) to have a different schema and not match the schema inferred for # the first chunk. - stac_table = pa.Table.from_batches(_stac_items_to_arrow(items)) + stac_table = pa.Table.from_batches([_stac_items_to_arrow(items)]) return _process_arrow_table(stac_table) @@ -101,6 +103,7 @@ def parse_stac_ndjson_to_arrow( def _process_arrow_table(table: pa.Table) -> pa.Table: table = _bring_properties_to_top_level(table) table = _convert_timestamp_columns(table) + table = _convert_bbox_to_struct(table) return table @@ -129,8 +132,14 @@ def _stac_items_to_arrow( # Otherwise, pyarrow will try to parse coordinates into a native geometry type and # if you have multiple geometry types pyarrow will error with # `ArrowInvalid: cannot mix list and non-list, non-null values` + wkb_items = [] for item in items: - item["geometry"] = shapely.to_wkb(shapely.geometry.shape(item["geometry"])) + wkb_item = deepcopy(item) + # Note: this mutates the existing items. Should we + wkb_item["geometry"] = shapely.to_wkb( + shapely.geometry.shape(wkb_item["geometry"]) + ) + wkb_items.append(wkb_item) if schema is not None: array = pa.array(items, type=pa.struct(schema)) @@ -212,3 +221,63 @@ def _convert_timestamp_column(column: pa.ChunkedArray) -> pa.ChunkedArray: chunks.append(pyarrow_chunk) return pa.chunked_array(chunks) + + +def _convert_bbox_to_struct(table: pa.Table, *, downcast: bool = True) -> pa.Table: + """Convert bbox column to a struct representation + + Since the bbox in JSON is stored as an array, pyarrow automatically converts the + bbox column to a ListArray. But according to GeoParquet 1.1, we should save the bbox + column as a StructArray, which allows for Parquet statistics to infer any spatial + partitioning in the dataset. + + Args: + table: _description_ + downcast: if True, will use float32 coordinates for the bounding boxes instead of float64. Float rounding is applied to ensure the float32 bounding box strictly contains the original float64 box. This is recommended when possible to minimize file size. + + Returns: + New table + """ + bbox_col_idx = table.schema.get_field_index("bbox") + bbox_col = table.column(bbox_col_idx) + + new_chunks = [] + for chunk in bbox_col.chunks: + assert ( + pa.types.is_list(chunk.type) + or pa.types.is_large_list(chunk.type) + or pa.types.is_fixed_size_list(chunk.type) + ) + coords = chunk.flatten().to_numpy().reshape(-1, 4) + xmin = coords[:, 0] + ymin = coords[:, 1] + xmax = coords[:, 2] + ymax = coords[:, 3] + + if downcast: + coords = coords.astype(np.float32) + + # Round min values down to the next float32 value + # Round max values up to the next float32 value + xmin = np.nextafter(xmin, -np.Infinity) + ymin = np.nextafter(ymin, -np.Infinity) + xmax = np.nextafter(xmax, np.Infinity) + ymax = np.nextafter(ymax, np.Infinity) + + struct_arr = pa.StructArray.from_arrays( + [ + xmin, + ymin, + xmax, + ymax, + ], + names=[ + "xmin", + "ymin", + "xmax", + "ymax", + ], + ) + new_chunks.append(struct_arr) + + return table.set_column(bbox_col_idx, "bbox", new_chunks) From f5ac44ec928389b9d44e741824d9c37a3e07a587 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 16 Apr 2024 17:49:52 -0400 Subject: [PATCH 14/17] Convert bbox back to list before writing to JSON --- stac_geoparquet/from_arrow.py | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/stac_geoparquet/from_arrow.py b/stac_geoparquet/from_arrow.py index 163a80d..d079c19 100644 --- a/stac_geoparquet/from_arrow.py +++ b/stac_geoparquet/from_arrow.py @@ -1,8 +1,9 @@ -"""Convert STAC Items in Arrow Table format to JSON Lines or """ +"""Convert STAC Items in Arrow Table format to JSON Lines or Python dicts.""" import json from typing import Iterable, List +import numpy as np import pyarrow as pa import pyarrow.compute as pc import shapely @@ -45,6 +46,7 @@ def _undo_stac_table_transformations(table: pa.Table) -> pa.Table: """ table = _convert_timestamp_columns_to_string(table) table = _lower_properties_from_top_level(table) + table = _convert_bbox_to_array(table) return table @@ -107,3 +109,31 @@ def _lower_properties_from_top_level(table: pa.Table) -> pa.Table: return table.drop_columns(properties_column_names).append_column( "properties", pa.chunked_array(properties_array_chunks) ) + + +def _convert_bbox_to_array(table: pa.Table) -> pa.Table: + """Convert the struct bbox column back to a list column for writing to JSON""" + + bbox_col_idx = table.schema.get_field_index("bbox") + bbox_col = table.column(bbox_col_idx) + + new_chunks = [] + for chunk in bbox_col.chunks: + assert pa.types.is_struct(chunk.type) + xmin = chunk.field(0).to_numpy() + ymin = chunk.field(1).to_numpy() + xmax = chunk.field(2).to_numpy() + ymax = chunk.field(3).to_numpy() + coords = np.column_stack( + [ + xmin, + ymin, + xmax, + ymax, + ] + ) + + list_arr = pa.FixedSizeListArray.from_arrays(coords.flatten("C"), 4) + new_chunks.append(list_arr) + + return table.set_column(bbox_col_idx, "bbox", new_chunks) From e82e2ea7256ddbec455fcb61ee414d809ce38f02 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 16 Apr 2024 18:05:36 -0400 Subject: [PATCH 15/17] Use ISO WKB --- stac_geoparquet/to_arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stac_geoparquet/to_arrow.py b/stac_geoparquet/to_arrow.py index e741561..4dd5840 100644 --- a/stac_geoparquet/to_arrow.py +++ b/stac_geoparquet/to_arrow.py @@ -137,7 +137,7 @@ def _stac_items_to_arrow( wkb_item = deepcopy(item) # Note: this mutates the existing items. Should we wkb_item["geometry"] = shapely.to_wkb( - shapely.geometry.shape(wkb_item["geometry"]) + shapely.geometry.shape(wkb_item["geometry"]), flavor="iso" ) wkb_items.append(wkb_item) From 7d6a75bcc27e72c975f54558e80bf61c18a9fa0d Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 16 Apr 2024 18:10:37 -0400 Subject: [PATCH 16/17] Lint --- stac_geoparquet/to_arrow.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/stac_geoparquet/to_arrow.py b/stac_geoparquet/to_arrow.py index 4dd5840..69d7f97 100644 --- a/stac_geoparquet/to_arrow.py +++ b/stac_geoparquet/to_arrow.py @@ -233,7 +233,10 @@ def _convert_bbox_to_struct(table: pa.Table, *, downcast: bool = True) -> pa.Tab Args: table: _description_ - downcast: if True, will use float32 coordinates for the bounding boxes instead of float64. Float rounding is applied to ensure the float32 bounding box strictly contains the original float64 box. This is recommended when possible to minimize file size. + downcast: if True, will use float32 coordinates for the bounding boxes instead + of float64. Float rounding is applied to ensure the float32 bounding box + strictly contains the original float64 box. This is recommended when + possible to minimize file size. Returns: New table From 19fe8c02609e7f6b1321e34c9fd30db2b4ac80b1 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 17 Apr 2024 15:08:55 -0400 Subject: [PATCH 17/17] Update to_arrow.py Co-authored-by: Tom Augspurger --- stac_geoparquet/to_arrow.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/stac_geoparquet/to_arrow.py b/stac_geoparquet/to_arrow.py index 69d7f97..f2ef0d9 100644 --- a/stac_geoparquet/to_arrow.py +++ b/stac_geoparquet/to_arrow.py @@ -142,10 +142,9 @@ def _stac_items_to_arrow( wkb_items.append(wkb_item) if schema is not None: - array = pa.array(items, type=pa.struct(schema)) + array = pa.array(wkb_items, type=pa.struct(schema)) else: - array = pa.array(items) - + array = pa.array(wkb_items) return pa.RecordBatch.from_struct_array(array)