Skip to content

Commit

Permalink
add a general from_text input function
Browse files Browse the repository at this point in the history
  • Loading branch information
douglasdavis committed Sep 5, 2023
1 parent d8a916b commit 734fe35
Showing 1 changed file with 69 additions and 1 deletion.
70 changes: 69 additions & 1 deletion src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from dask.base import flatten, tokenize
from dask.highlevelgraph import HighLevelGraph
from dask.utils import funcname, is_integer, parse_bytes
from fsspec.utils import infer_compression
from fsspec.core import get_fs_token_paths
from fsspec.utils import infer_compression, read_block

from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardInputLayer
from dask_awkward.layers.layers import AwkwardMaterializedLayer
Expand Down Expand Up @@ -727,3 +728,70 @@ def _bytes_with_sample(
sample_bytes = sample_bytes[:rfind]

return out, sample_bytes


def _string_array_from_bytestring(bytestring, delimiter):
buffer = np.frombuffer(bytestring, dtype=np.uint8)
array = ak.from_numpy(buffer)
array = ak.unflatten(array, len(array))
array = ak.enforce_type(array, "string")
array_split = ak.str.split_pattern(array, delimiter)
lines = array_split[0]
if len(lines) == 0:
return lines
if lines[-1] == "":
lines = lines[:-1]
return lines


class FromTextFn:
def __init__(self):
pass

def __call__(self, bri: _BytesReadingInstructions) -> ak.Array:
with bri.fs.open(bri.path, compression=bri.compression) as f:
if bri.offset == 0 and bri.length is None:
bytestring = f.read()
else:
bytestring = read_block(f, bri.offset, bri.length, bri.delimiter)

return _string_array_from_bytestring(bytestring, bri.delimiter)


def from_text(
source: str | list[str],
blocksize: str | int = "128 MiB",
delimiter: bytes = b"\n",
sample_size: str | int = "128 KiB",
compression: str | None = "infer",
storage_options: dict | None = None,
) -> Array:
fs, token, paths = get_fs_token_paths(source, storage_options=storage_options or {})

token = tokenize(source, token, blocksize, delimiter, sample_size)

if compression == "infer":
compression = infer_compression(paths[0])

bytes_ingredients, sample_bytes = _bytes_with_sample(
fs,
paths,
compression,
delimiter,
False,
blocksize,
sample_size,
)

rfind = sample_bytes.rfind(delimiter)
if rfind > 0:
sample_bytes = sample_bytes[:rfind]
meta = typetracer_array(_string_array_from_bytestring(sample_bytes, delimiter))

return from_map(
FromTextFn(),
list(flatten(bytes_ingredients)),
label="from-text",
token=token,
meta=meta,
)

0 comments on commit 734fe35

Please sign in to comment.