Skip to content

Commit

Permalink
unpolished column projection works
Browse files Browse the repository at this point in the history
  • Loading branch information
douglasdavis committed Jul 17, 2023
1 parent 4e5e883 commit bc686d2
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 70 deletions.
87 changes: 33 additions & 54 deletions src/dask_awkward/lib/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import json # type: ignore[no-redef]

import awkward as ak
from awkward.forms.form import Form
from dask.base import tokenize
from dask.blockwise import BlockIndex
from dask.bytes.core import read_bytes
Expand Down Expand Up @@ -44,13 +45,17 @@ def __init__(
storage: AbstractFileSystem,
compression: str | None = None,
schema: dict | None = None,
form: Form | None = None,
original_form: Form | None = None,
**kwargs: Any,
) -> None:
self.compression = compression
self.storage = storage
self.schema = schema
self.args = args
self.kwargs = kwargs
self.form = form
self.original_form = original_form

@abc.abstractmethod
def __call__(self, source: Any) -> ak.Array:
Expand All @@ -64,46 +69,55 @@ def __init__(
storage: AbstractFileSystem,
compression: str | None = None,
schema: dict | None = None,
form: Form | None = None,
**kwargs: Any,
) -> None:
super().__init__(
*args,
storage=storage,
compression=compression,
schema=schema,
form=form,
**kwargs,
)

def __call__(self, source: str) -> ak.Array:
with self.storage.open(source, mode="rt", compression=self.compression) as f:
return ak.from_json(f.read(), line_delimited=True, schema=self.schema)

def project_columns(self, columns):
schema = self.schema

# TODO: do something with columns to redefine schema...
def project_columns(self, columns, original_form: Form | None = None):
if self.form is not None:
form = self.form.select_columns(columns)
schema = layout_to_jsonschema(form.length_zero_array(highlevel=False))
return _FromJsonLineDelimitedFn(
schema=schema,
form=form,
storage=self.storage,
compression=self.compression,
original_form=original_form,
)

return _FromJsonLineDelimitedFn(
schema=schema,
storage=self.storage,
compression=self.compression,
)
return self


class _FromJsonSingleObjInFileFn(_FromJsonFn):
def __init__(
self,
*args: Any,
storage: AbstractFileSystem,
schema: dict | None = None,
compression: str | None = None,
schema: dict | None = None,
form: Form | None = None,
original_form: Form | None = None,
**kwargs: Any,
) -> None:
super().__init__(
*args,
storage=storage,
compression=compression,
schema=schema,
form=form,
original_form=original_form,
**kwargs,
)

Expand All @@ -113,8 +127,15 @@ def __call__(self, source: str) -> ak.Array:


class _FromJsonBytesFn:
def __init__(self, schema: dict | None = None) -> None:
def __init__(
self,
schema: dict | None = None,
form: Form | None = None,
original_form: Form | None = None,
) -> None:
self.schema = schema
self.form = form
self.original_form = original_form

def __call__(self, source: bytes) -> ak.Array:
return ak.from_json(source, line_delimited=True, schema=self.schema)
Expand Down Expand Up @@ -215,6 +236,7 @@ def _from_json_files(
storage=fs,
compression=compression,
schema=schema,
form=meta.layout.form,
)

return from_map(
Expand Down Expand Up @@ -264,24 +286,6 @@ def _from_json_bytes(
return new_array_object(hlg, name, meta=meta, behavior=behavior, npartitions=n)


def from_json2(
source,
*,
line_delimited=False,
schema=None,
nan_string=None,
posinf_string=None,
neginf_string=None,
complex_record_fields=None,
buffersize=65536,
initial=1024,
resize=8,
highlevel=True,
behavior=None,
):
pass


def from_json(
urlpath: str | list[str],
schema: dict | None = None,
Expand Down Expand Up @@ -555,28 +559,3 @@ def ak_schema_repr(arr):
import yaml

return yaml.dump(arr.layout.form)


def _read_beginning_compressed(
storage: AbstractFileSystem,
source: str,
compression: str | None,
n_lines: int = 5,
) -> ak.Array:
lines = []
with storage.open(source, mode="rt", compression=compression) as f:
for i, line in enumerate(f):
if i >= n_lines:
break
lines.append(ak.from_json(line))
return ak.from_iter(lines)


def _read_beginning_uncompressed(
storage: AbstractFileSystem,
source: str,
numbytes: int = 16384,
) -> ak.Array:
bytes = storage.cat(source, start=0, end=numbytes)
array = ak.concatenate([ak.from_json(line) for line in bytes.split(b"\n")[:-1]])
return array
17 changes: 1 addition & 16 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def test_to_bag(daa, caa):


@pytest.mark.parametrize("compression", ["xz", "gzip", "zip"])
def test_to_json(daa, tmpdir_factory, compression):
def test_to_and_from_json(daa, tmpdir_factory, compression):
tdir = str(tmpdir_factory.mktemp("json_temp"))

p1 = os.path.join(tdir, "z", "z")
Expand All @@ -351,21 +351,6 @@ def test_to_json(daa, tmpdir_factory, compression):
assert_eq(x, r)


def test_to_json_raise_filenotfound(
daa: dak.Array,
tmpdir_factory: pytest.TempdirFactory,
) -> None:
p = tmpdir_factory.mktemp("onelevel")
p2 = os.path.join(str(p), "two")
with pytest.raises(FileNotFoundError, match="Parent directory for output file"):
dak.to_json(
daa,
os.path.join(p2, "three", "four", "*.json"),
compute=True,
line_delimited=True,
)


@pytest.mark.parametrize("optimize_graph", [True, False])
def test_to_dataframe(daa: dak.Array, caa: ak.Array, optimize_graph: bool) -> None:
pytest.importorskip("pandas")
Expand Down

0 comments on commit bc686d2

Please sign in to comment.