diff --git a/examples/write_lindi_binary.py b/examples/write_lindi_binary.py new file mode 100644 index 0000000..cdb518b --- /dev/null +++ b/examples/write_lindi_binary.py @@ -0,0 +1,20 @@ +import lindi + + +def write_lindi_binary(): + with lindi.LindiH5pyFile.from_lindi_file('test.lindi', mode='w', create_binary=True) as f: + f.attrs['test'] = 42 + ds = f.create_dataset('data', shape=(1000, 1000), dtype='f4') + ds[...] = 42 + + +def test_read(): + f = lindi.LindiH5pyFile.from_lindi_file('test.lindi', mode='r') + print(f.attrs['test']) + print(f['data'][0, 0]) + f.close() + + +if __name__ == "__main__": + write_lindi_binary() + test_read() diff --git a/lindi/LindiH5pyFile/LindiH5pyFile.py b/lindi/LindiH5pyFile/LindiH5pyFile.py index 93c85d2..6ad66fc 100644 --- a/lindi/LindiH5pyFile/LindiH5pyFile.py +++ b/lindi/LindiH5pyFile/LindiH5pyFile.py @@ -2,7 +2,6 @@ import os import json import tempfile -import urllib.request import h5py import zarr from zarr.storage import Store as ZarrStore @@ -15,11 +14,14 @@ from ..LindiStagingStore.StagingArea import StagingArea from ..LindiStagingStore.LindiStagingStore import LindiStagingStore, _apply_templates from ..LindiH5ZarrStore.LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts +from ..lindi1 import Lindi1Store from ..LocalCache.LocalCache import LocalCache from ..LindiH5ZarrStore._util import _write_rfs_to_file +from ..lindi1._lindi1 import _load_rfs_dict_from_local_lindi_file, _load_rfs_dict_from_remote_lindi_file, _write_rfs_dict_to_lindi1_file, _create_empty_lindi_file + LindiFileMode = Literal["r", "r+", "w", "w-", "x", "a"] @@ -29,7 +31,7 @@ class LindiH5pyFile(h5py.File): - def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _local_file_path: Union[str, None] = None): + def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_is_lindi1_format: Union[bool, None] = None): """ Do not use this constructor directly. Instead, use: from_lindi_file, from_h5py_file, from_reference_file_system, from_zarr_store, or @@ -40,22 +42,26 @@ def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, Non self._mode: LindiFileMode = _mode self._the_group = LindiH5pyGroup(_zarr_group, self) self._local_cache = _local_cache - self._local_file_path = _local_file_path + self._source_url_or_path = _source_url_or_path + self._source_is_lindi1_format = _source_is_lindi1_format # see comment in LindiH5pyGroup self._id = f'{id(self._zarr_group)}/' @staticmethod - def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): + def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, create_binary: bool = False): """ Create a LindiH5pyFile from a URL or path to a .lindi.json file. For a description of parameters, see from_reference_file_system(). """ - if local_file_path is None: - if not url_or_path.startswith("http://") and not url_or_path.startswith("https://"): - local_file_path = url_or_path - return LindiH5pyFile.from_reference_file_system(url_or_path, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path) + return LindiH5pyFile.from_reference_file_system( + url_or_path, + mode=mode, + staging_area=staging_area, + local_cache=local_cache, + create_binary=create_binary + ) @staticmethod def from_hdf5_file( @@ -95,11 +101,13 @@ def from_hdf5_file( return LindiH5pyFile.from_zarr_store( zarr_store=zarr_store, mode=mode, - local_cache=local_cache + local_cache=local_cache, + _source_url_or_path=None, + _source_is_lindi1_format=None ) @staticmethod - def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): + def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_is_lindi1_format: Union[bool, None] = None, create_binary: bool = False): """ Create a LindiH5pyFile from a reference file system. @@ -116,11 +124,11 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo is only used in write mode, by default None. local_cache : Union[LocalCache, None], optional The local cache to use for caching data, by default None. - local_file_path : Union[str, None], optional - If rfs is not a string or is a remote url, this is the path to the - local file for the purpose of writing to it. It is required in this - case if mode is not "r". If rfs is a string and not a remote url, it - must be equal to local_file_path if provided. + _source_url_or_path : Union[str, None], optional + _source_is_lindi1_format : Union[bool, None], optional create_binary : + bool, optional + Only applies when writing a new file. If True, a binary lindi file + will be created. """ if rfs is None: rfs = { @@ -131,26 +139,32 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo }, } + if create_binary: + if mode not in ["w", "w-", "x"]: + raise Exception("create_binary is only supported in write mode") + if isinstance(rfs, str): rfs_is_url = rfs.startswith("http://") or rfs.startswith("https://") - if local_file_path is not None and not rfs_is_url and rfs != local_file_path: - raise Exception(f"rfs is not a remote url, so local_file_path must be the same as rfs, but got: {rfs} and {local_file_path}") if rfs_is_url: - with tempfile.TemporaryDirectory() as tmpdir: - filename = f"{tmpdir}/temp.lindi.json" - _download_file(rfs, filename) - with open(filename, "r") as f: - data = json.load(f) - assert isinstance(data, dict) # prevent infinite recursion - return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path) + if create_binary: + raise Exception("create_binary is not supported with remote files") + data, is_lindi1_format = _load_rfs_dict_from_remote_lindi_file(rfs) + assert isinstance(data, dict) # prevent infinite recursion + if _source_url_or_path is not None: + if _source_url_or_path != rfs: + raise Exception(f"source_url_or_path must match rfs if rfs is a remote file, got: {rfs}, {_source_url_or_path}") + if _source_is_lindi1_format is not None: + if _source_is_lindi1_format != is_lindi1_format: + raise Exception(f"source_is_lindi1_format must match is_lindi1_format if rfs is a remote file, got: {is_lindi1_format}, {_source_is_lindi1_format}") + return LindiH5pyFile.from_reference_file_system( + data, + mode=mode, + staging_area=staging_area, + local_cache=local_cache, + _source_url_or_path=rfs, + _source_is_lindi1_format=is_lindi1_format + ) else: - empty_rfs = { - "refs": { - '.zgroup': { - 'zarr_format': 2 - } - }, - } if mode == "r": # Readonly, file must exist (default) if not os.path.exists(rfs): @@ -161,36 +175,59 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo raise Exception(f"File does not exist: {rfs}") elif mode == "w": # Create file, truncate if exists - with open(rfs, "w") as f: - json.dump(empty_rfs, f) + _create_empty_lindi_file(rfs, create_binary=create_binary) elif mode in ["w-", "x"]: # Create file, fail if exists if os.path.exists(rfs): raise Exception(f"File already exists: {rfs}") - with open(rfs, "w") as f: - json.dump(empty_rfs, f) + _create_empty_lindi_file(rfs, create_binary=create_binary) elif mode == "a": # Read/write if exists, create otherwise if os.path.exists(rfs): with open(rfs, "r") as f: data = json.load(f) + else: + _create_empty_lindi_file(rfs, create_binary=create_binary) else: raise Exception(f"Unhandled mode: {mode}") - with open(rfs, "r") as f: - data = json.load(f) + data, is_lindi1_format = _load_rfs_dict_from_local_lindi_file(rfs) assert isinstance(data, dict) # prevent infinite recursion - return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path) + if _source_url_or_path is not None: + if _source_url_or_path != rfs: + raise Exception(f"source_url_or_path must match rfs if rfs is a local file, got: {rfs}, {_source_url_or_path}") + if _source_is_lindi1_format is not None: + if _source_is_lindi1_format != is_lindi1_format: + raise Exception(f"source_is_lindi1_format must match is_lindi1_format if rfs is a local file, got: {is_lindi1_format}, {_source_is_lindi1_format}") + return LindiH5pyFile.from_reference_file_system( + data, mode=mode, + staging_area=staging_area, + local_cache=local_cache, + _source_url_or_path=rfs, + _source_is_lindi1_format=is_lindi1_format + ) elif isinstance(rfs, dict): # This store does not need to be closed - store = LindiReferenceFileSystemStore(rfs, local_cache=local_cache) + store = LindiReferenceFileSystemStore(rfs, local_cache=local_cache, _source_url_or_path=_source_url_or_path) if staging_area: + if _source_is_lindi1_format: + raise Exception("Staging area is not supported with lindi1 format") store = LindiStagingStore(base_store=store, staging_area=staging_area) - return LindiH5pyFile.from_zarr_store(store, mode=mode, local_file_path=local_file_path, local_cache=local_cache) + elif _source_is_lindi1_format and _source_url_or_path is not None: + is_url = _source_url_or_path.startswith("http://") or _source_url_or_path.startswith("https://") + if not is_url: + store = Lindi1Store(base_store=store, lindi1_file_name=_source_url_or_path) + return LindiH5pyFile.from_zarr_store( + store, + mode=mode, + local_cache=local_cache, + _source_url_or_path=_source_url_or_path, + _source_is_lindi1_format=_source_is_lindi1_format + ) else: raise Exception(f"Unhandled type for rfs: {type(rfs)}") @staticmethod - def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): + def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_is_lindi1_format: Union[bool, None] = None): """ Create a LindiH5pyFile from a zarr store. @@ -207,10 +244,17 @@ def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cach # does not need to be closed zarr_group = zarr.open(store=zarr_store, mode=mode) assert isinstance(zarr_group, zarr.Group) - return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode, local_cache=local_cache, local_file_path=local_file_path) + return LindiH5pyFile.from_zarr_group( + zarr_group, + _zarr_store=zarr_store, + mode=mode, + local_cache=local_cache, + _source_url_or_path=_source_url_or_path, + _source_is_lindi1_format=_source_is_lindi1_format + ) @staticmethod - def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): + def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_is_lindi1_format: Union[bool, None] = None): """ Create a LindiH5pyFile from a zarr group. @@ -228,7 +272,7 @@ def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_ See from_zarr_store(). """ - return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache, _local_file_path=local_file_path) + return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache, _source_url_or_path=_source_url_or_path, _source_is_lindi1_format=_source_is_lindi1_format) def to_reference_file_system(self): """ @@ -241,6 +285,8 @@ def to_reference_file_system(self): if isinstance(zarr_store, LindiStagingStore): zarr_store.consolidate_chunks() zarr_store = zarr_store._base_store + if isinstance(zarr_store, Lindi1Store): + zarr_store = zarr_store._base_store if isinstance(zarr_store, LindiH5ZarrStore): return zarr_store.to_reference_file_system() if not isinstance(zarr_store, LindiReferenceFileSystemStore): @@ -284,8 +330,8 @@ def upload( if isinstance(v, list) and len(v) == 3: url = _apply_templates(v[0], rfs.get('templates', {})) if not url.startswith("http://") and not url.startswith("https://"): - local_path = url - blobs_to_upload.add(local_path) + local_path_for_blob = url + blobs_to_upload.add(local_path_for_blob) # Upload each of the local blobs using the given upload function and get a mapping from # the original file paths to the URLs of the uploaded files blob_mapping = _upload_blobs(blobs_to_upload, on_upload_blob=on_upload_blob) @@ -358,9 +404,14 @@ def close(self): self.flush() def flush(self): - if self._mode != 'r' and self._local_file_path is not None: - rfs = self.to_reference_file_system() - _write_rfs_to_file(rfs=rfs, output_file_name=self._local_file_path) + if self._mode != 'r' and self._source_url_or_path is not None: + is_url = self._source_url_or_path.startswith("http://") or self._source_url_or_path.startswith("https://") + if not is_url: + rfs = self.to_reference_file_system() + if self._source_is_lindi1_format: + _write_rfs_dict_to_lindi1_file(rfs=rfs, output_file_name=self._source_url_or_path) + else: + _write_rfs_to_file(rfs=rfs, output_file_name=self._source_url_or_path) def __enter__(self): # type: ignore return self @@ -512,16 +563,6 @@ def staging_store(self): return store -def _download_file(url: str, filename: str) -> None: - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" - } - req = urllib.request.Request(url, headers=headers) - with urllib.request.urlopen(req) as response: - with open(filename, "wb") as f: - f.write(response.read()) - - def _recursive_copy(src_item: Union[h5py.Group, h5py.Dataset], dest: h5py.File, name: str) -> None: if isinstance(src_item, h5py.Group): dst_item = dest.create_group(name) diff --git a/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py b/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py index 7d2d5fa..018597a 100644 --- a/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py +++ b/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py @@ -69,7 +69,7 @@ class LindiReferenceFileSystemStore(ZarrStore): It is okay for rfs to be modified outside of this class, and the changes will be reflected immediately in the store. """ - def __init__(self, rfs: dict, *, mode: Literal["r", "r+"] = "r+", local_cache: Union[LocalCache, None] = None): + def __init__(self, rfs: dict, *, mode: Literal["r", "r+"] = "r+", local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None): """ Create a LindiReferenceFileSystemStore. @@ -114,6 +114,7 @@ def __init__(self, rfs: dict, *, mode: Literal["r", "r+"] = "r+", local_cache: U self.rfs = rfs self.mode = mode self.local_cache = local_cache + self._source_url_or_path = _source_url_or_path # These methods are overridden from MutableMapping def __contains__(self, key: object): @@ -155,6 +156,9 @@ def _get_helper(self, key: str): x = self.local_cache.get_remote_chunk(url=url, offset=offset, size=length) if x is not None: return x + if url == '.' and self._source_url_or_path: + # this is where the file refers to bytes in the same file (lindi binary format) + url = self._source_url_or_path val = _read_bytes_from_url_or_path(url, offset, length) if self.local_cache is not None: try: diff --git a/lindi/lindi1/Lindi1Store.py b/lindi/lindi1/Lindi1Store.py new file mode 100644 index 0000000..34b083d --- /dev/null +++ b/lindi/lindi1/Lindi1Store.py @@ -0,0 +1,82 @@ +import os +from zarr.storage import Store as ZarrStore +from ..LindiH5pyFile.LindiReferenceFileSystemStore import LindiReferenceFileSystemStore + + +class Lindi1Store(ZarrStore): + """ + A Zarr store that allows supplementing a base LindiReferenceFileSystemStore + where the large data blobs are appended to a lindi file of lindi1 format. + """ + def __init__(self, *, base_store: LindiReferenceFileSystemStore, lindi1_file_name: str): + """ + Create a LindiStagingStore. + + Parameters + ---------- + base_store : LindiReferenceFileSystemStore + The base store that this store supplements. + lindi1_file_name : str + The name of the lindi1 file that will be created or appended to. + """ + self._base_store = base_store + self._lindi1_file_name = lindi1_file_name + + def __getitem__(self, key: str): + return self._base_store.__getitem__(key) + + def __setitem__(self, key: str, value: bytes): + key_parts = key.split("/") + key_base_name = key_parts[-1] + if key_base_name.startswith('.') or key_base_name.endswith('.json'): # always inline .zattrs, .zgroup, .zarray, zarr.json + inline = True + else: + # presumably it is a chunk of an array + if not isinstance(value, bytes): + raise ValueError("Value must be bytes") + size = len(value) + inline = size < 1000 # this should be a configurable threshold + if inline: + # If inline, save in memory + return self._base_store.__setitem__(key, value) + else: + # If not inline, append it to the lindi1 file + key_without_initial_slash = key if not key.startswith("/") else key[1:] + lindi1_file_size = os.path.getsize(self._lindi1_file_name) + with open(self._lindi1_file_name, "ab") as f: + f.write(value) + self._set_ref_reference(key_without_initial_slash, '.', lindi1_file_size, len(value)) + + def __delitem__(self, key: str): + # We can't delete the file from the lindi1 file, but we do need to remove the reference + return self._base_store.__delitem__(key) + + def __iter__(self): + return self._base_store.__iter__() + + def __len__(self): + return self._base_store.__len__() + + # These methods are overridden from BaseStore + def is_readable(self): + return True + + def is_writeable(self): + return True + + def is_listable(self): + return True + + def is_erasable(self): + return False + + def _set_ref_reference(self, key: str, filename: str, offset: int, size: int): + rfs = self._base_store.rfs + if 'refs' not in rfs: + # this shouldn't happen, but we'll be defensive + rfs['refs'] = {} + rfs['refs'][key] = [ + filename, + offset, + size + ] diff --git a/lindi/lindi1/__init__.py b/lindi/lindi1/__init__.py new file mode 100644 index 0000000..8dd16f6 --- /dev/null +++ b/lindi/lindi1/__init__.py @@ -0,0 +1 @@ +from .Lindi1Store import Lindi1Store # noqa: F401 diff --git a/lindi/lindi1/_lindi1.py b/lindi/lindi1/_lindi1.py new file mode 100644 index 0000000..0bce9f1 --- /dev/null +++ b/lindi/lindi1/_lindi1.py @@ -0,0 +1,177 @@ +import os +import json +import tempfile +import urllib.request + + +def _load_rfs_dict_from_local_lindi_file(filename: str): + header_buf = _read_bytes_from_file(filename, 0, 1024) + lindi1_header = _parse_lindi1_header(header_buf) + if lindi1_header is not None: + # This is lindi1 format + assert lindi1_header['format'] == 'lindi1' + rfs_start = lindi1_header['rfs_start'] + rfs_size = lindi1_header['rfs_size'] + rfs_buf = _read_bytes_from_file(filename, rfs_start, rfs_start + rfs_size) + return json.loads(rfs_buf), True + else: + # In this case, it must be a regular json file + with open(filename, "r") as f: + return json.load(f), False + + +def _load_rfs_dict_from_remote_lindi_file(url: str): + file_size = _get_file_size_of_remote_file(url) + if file_size < 1024 * 1024 * 2: + # if it's a small file, we'll just download the whole thing + with tempfile.TemporaryDirectory() as tmpdir: + tmp_fname = f"{tmpdir}/temp.lindi.json" + _download_file(url, tmp_fname) + return _load_rfs_dict_from_local_lindi_file(tmp_fname) + else: + # if it's a large file, we start by downloading the first 1024 bytes + header_buf = _download_file_byte_range(url, 0, 1024) + lindi1_header = _parse_lindi1_header(header_buf) + if lindi1_header is not None: + # This is lindi1 format + assert lindi1_header['format'] == 'lindi1' + rfs_start = lindi1_header['rfs_start'] + rfs_size = lindi1_header['rfs_size'] + rfs_buf = _download_file_byte_range(url, rfs_start, rfs_start + rfs_size) + return json.loads(rfs_buf), True + else: + # In this case, it must be a regular json file + with tempfile.TemporaryDirectory() as tmpdir: + tmp_fname = f"{tmpdir}/temp.lindi.json" + _download_file(url, tmp_fname) + with open(tmp_fname, "r") as f: + return json.load(f), False + + +def _write_rfs_dict_to_lindi1_file(*, rfs: dict, output_file_name: str): + rfs_buf = json.dumps(rfs).encode("utf-8") + header_buf = _read_bytes_from_file(output_file_name, 0, 1024) + lindi1_header = _parse_lindi1_header(header_buf) + assert lindi1_header is not None + assert lindi1_header['format'] == 'lindi1' + old_rfs_start = lindi1_header['rfs_start'] + old_rfs_size = lindi1_header['rfs_size'] + old_rfs_padding = lindi1_header['rfs_padding'] + if len(rfs_buf) < old_rfs_size + old_rfs_padding - 1: + # we don't need to allocate a new space in the file for the rfs + rfs_buf_padded = rfs_buf + b"\0" * (old_rfs_size + old_rfs_padding - len(rfs_buf)) + _write_bytes_within_file(output_file_name, old_rfs_start, rfs_buf_padded) + new_rfs_start = old_rfs_start + new_rfs_size = len(rfs_buf) + new_rfs_padding = old_rfs_size + old_rfs_padding - len(rfs_buf) + else: + # we need to allocate a new space. First zero out the old space + zeros = b"\0" * (old_rfs_size + old_rfs_padding) + _write_bytes_within_file(output_file_name, old_rfs_start, zeros) + file_size = os.path.getsize(output_file_name) + new_rfs_start = file_size + # determine size of new space, to be double the needed size + new_rfs_size = len(rfs_buf) * 2 + new_rfs_padding = new_rfs_size - len(rfs_buf) + new_rfs_buf_padded = rfs_buf + b"\0" * new_rfs_padding + # write the new rfs + _append_bytes_to_file(output_file_name, new_rfs_buf_padded) + new_lindi1_header = { + **lindi1_header, + "rfs_start": new_rfs_start, + "rfs_size": new_rfs_size, + "rfs_padding": new_rfs_padding + } + new_lindi1_header_buf = json.dumps(new_lindi1_header).encode("utf-8") + if len(new_lindi1_header_buf) > 1024: + raise Exception("New header is too long") + new_lindi1_header_buf_padded = new_lindi1_header_buf + b"\0" * (1024 - len(new_lindi1_header_buf)) + _write_bytes_within_file(output_file_name, 0, new_lindi1_header_buf_padded) + + +def _download_file(url: str, filename: str) -> None: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" + } + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req) as response: + with open(filename, "wb") as f: + f.write(response.read()) + + +def _download_file_byte_range(url: str, start: int, end: int) -> bytes: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3", + "Range": f"bytes={start}-{end - 1}" + } + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req) as response: + return response.read() + + +def _read_bytes_from_file(filename: str, start: int, end: int) -> bytes: + with open(filename, "rb") as f: + f.seek(start) + return f.read(end - start) + + +def _write_bytes_within_file(filename: str, start: int, buf: bytes) -> None: + with open(filename, "r+b") as f: + f.seek(start) + f.write(buf) + + +def _append_bytes_to_file(filename: str, buf: bytes) -> None: + with open(filename, "ab") as f: + f.write(buf) + + +def _parse_lindi1_header(buf: bytes): + first_zero_index = buf.find(0) + if first_zero_index == -1: + return None + header_json = buf[:first_zero_index].decode("utf-8") + header: dict = json.loads(header_json) + if header.get('format') != 'lindi1': + raise Exception(f"Not lindi1 format: {header.get('format')}") + return header + + +def _get_file_size_of_remote_file(url: str) -> int: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" + } + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req) as response: + return int(response.headers['Content-Length']) + + +def _create_empty_lindi_file(filename: str, create_binary: bool): + empty_rfs = { + "refs": { + '.zgroup': { + 'zarr_format': 2 + } + }, + } + if create_binary: + rfs_buf = json.dumps(empty_rfs).encode("utf-8") + # start with reasonable padding + total_rfs_allocated_space = 1024 * 1024 + while len(rfs_buf) * 2 > total_rfs_allocated_space: + total_rfs_allocated_space *= 2 + lindi1_header = { + "format": "lindi1", + "rfs_start": 1024, + "rfs_size": len(rfs_buf), + "rfs_padding": total_rfs_allocated_space - len(rfs_buf) + } + lindi1_header_buf = json.dumps(lindi1_header).encode("utf-8") + lindi1_header_buf_padded = lindi1_header_buf + b"\0" * (1024 - len(lindi1_header_buf)) + with open(filename, "wb") as f: + f.write(lindi1_header_buf_padded) + f.write(rfs_buf) + f.write(b"\0" * lindi1_header['rfs_padding']) + else: + with open(filename, "w") as f: + json.dump(empty_rfs, f, indent=2) \ No newline at end of file