Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement templates for reference file system #51

Merged
merged 3 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lindi/LindiH5ZarrStore/LindiH5ZarrStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,8 @@ def _process_dataset(key):
# Process the groups recursively starting with the root group
_process_group("", self._h5f)

LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts(ret)
LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts_in_rfs(ret)
LindiReferenceFileSystemStore.use_templates_in_rfs(ret)
return ret


Expand Down
3 changes: 2 additions & 1 deletion lindi/LindiH5pyFile/LindiH5pyFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ def to_reference_file_system(self):
raise Exception(f"Unexpected type for zarr store: {type(self._zarr_store)}")
rfs = zarr_store.rfs
rfs_copy = json.loads(json.dumps(rfs))
LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts(rfs_copy)
LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts_in_rfs(rfs_copy)
LindiReferenceFileSystemStore.use_templates_in_rfs(rfs_copy)
return rfs_copy

@property
Expand Down
64 changes: 61 additions & 3 deletions lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,33 @@ class LindiReferenceFileSystemStore(ZarrStore):
to be the data of the file, which may be base64 encoded (see below). If the
value is a list, it is assumed to have three elements: the URL of the file
(or path of a local file), the byte offset of the data within the file, and
the byte length of the data. If the value is a dict, it represents a json
file, and the content of the file is the json representation of the dict.
the byte length of the data. Note that we do not permit the case of a list
of a single (url) element supported by fsspec, because it is good to be able
to know the size of the chunks without making a request to the file. If the
value is a dict, it represents a json file, and the content of the file is
the json representation of the dict.

If the value for a file is a string, it may be prefixed with "base64:". If
it is, the string is assumed to be base64 encoded and is decoded before
being returned. Otherwise, the string is utf-8 encoded and returned as is.
Note that a file that actually begins with "base64:" should be represented
by a base64 encoded string, to avoid ambiguity.

We also support the use of templates as in fsspec, but do not support the
full jinja2 templating. There may be an optional "templates" key in the
dictionary, which is a dictionary of template strings. For example,
{
"templates": {"u1": "https://some/url", "u2": "https://some/other/url"},
"refs": {
... "/some/key/0": [
"{{u1}}" 0, 100
],
...
}
}
In this case, the "{{u1}}" will be replaced with the value of the "u1"
template string.

It is okay for rfs to be modified outside of this class, and the changes
will be reflected immediately in the store. This can be used by experimental
tools such as lindi-cloud.
Expand Down Expand Up @@ -80,6 +98,12 @@ def __init__(self, rfs: dict, mode: Literal["r", "r+"] = "r+"):
else:
raise Exception(f"Problem with {k}: value must be a string or a list")

# validate templates
if "templates" in rfs:
for k, v in rfs["templates"].items():
if not isinstance(v, str):
raise Exception(f"Problem with templates: value for {k} must be a string")

self.rfs = rfs
self.mode = mode

Expand All @@ -101,6 +125,9 @@ def __getitem__(self, key: str):
url = x[0]
offset = x[1]
length = x[2]
if '{{' in url and 'templates' in self.rfs:
for k, v in self.rfs["templates"].items():
url = url.replace("{{" + k + "}}", v)
val = _read_bytes_from_url(url, offset, length)
return val
else:
Expand Down Expand Up @@ -146,7 +173,7 @@ def is_erasable(self):
return False

@staticmethod
def replace_meta_file_contents_with_dicts(rfs: dict) -> None:
def replace_meta_file_contents_with_dicts_in_rfs(rfs: dict) -> None:
"""
Utility function for replacing the contents of the .zattrs, .zgroup, and
.zarray files in an rfs with the json representation of the contents.
Expand All @@ -158,6 +185,37 @@ def replace_meta_file_contents_with_dicts(rfs: dict) -> None:
if k.endswith('.zattrs') or k.endswith('.zgroup') or k.endswith('.zarray') or k.endswith('zarr.json'): # note: zarr.json is for zarr v3
rfs['refs'][k] = json.loads(store[k].decode('utf-8'))

@staticmethod
def use_templates_in_rfs(rfs: dict) -> None:
"""
Utility for replacing URLs in an rfs with template strings. Only URLs
that occur 5 or more times are replaced with template strings. The
templates are added to the "templates" key of the rfs. The template
strings are of the form "{{u1}}", "{{u2}}", etc.
"""
url_counts: Dict[str, int] = {}
for k, v in rfs['refs'].items():
if isinstance(v, list):
url = v[0]
if '{{' not in url:
url_counts[url] = url_counts.get(url, 0) + 1
urls_with_many_occurrences = sorted([url for url, count in url_counts.items() if count >= 5])
new_templates = rfs.get('templates', {})
template_names_for_urls: Dict[str, str] = {}
for url in urls_with_many_occurrences:
i = 1
while f'u{i}' in new_templates:
i += 1
new_templates[f'u{i}'] = url
template_names_for_urls[url] = f'u{i}'
if new_templates:
rfs['templates'] = new_templates
for k, v in rfs['refs'].items():
if isinstance(v, list):
url = v[0]
if url in template_names_for_urls:
v[0] = '{{' + template_names_for_urls[url] + '}}'


# Keep a global cache of file segment readers that apply to all instances of
# LindiReferenceFileSystemStore. The key is the URL of the file.
Expand Down
3 changes: 2 additions & 1 deletion lindi/LindiStagingStore/LindiStagingStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def upload(
self.consolidate_chunks()
rfs = self._base_store.rfs
rfs = json.loads(json.dumps(rfs)) # deep copy
LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts(rfs)
LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts_in_rfs(rfs)
blob_mapping = _upload_directory_of_blobs(self._staging_area.directory, on_upload_blob=on_upload_blob)
for k, v in rfs['refs'].items():
if isinstance(v, list) and len(v) == 3:
Expand All @@ -140,6 +140,7 @@ def upload(
rfs['refs'][k][0] = url2
with tempfile.TemporaryDirectory() as tmpdir:
rfs_fname = f"{tmpdir}/rfs.lindi.json"
LindiReferenceFileSystemStore.use_templates_in_rfs(rfs)
_write_rfs_to_file(rfs=rfs, output_file_name=rfs_fname)
return on_upload_main(rfs_fname)

Expand Down
58 changes: 56 additions & 2 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,67 @@ def test_lindi_reference_file_system_store():
store = LindiReferenceFileSystemStore(rfs)
assert json.loads(store[".zattrs"]) == {"test": 2}
rfs = {"refs": {".zattrs": "{\"test\": 3}"}}
LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts(rfs)
LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts_in_rfs(rfs)
assert isinstance(rfs["refs"][".zattrs"], dict)
store = LindiReferenceFileSystemStore(rfs)
assert json.loads(store[".zattrs"]) == {"test": 3}
rfs = {"refs": {".zattrs_xxx": "{\"test\": 5}"}}
LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts(rfs)
LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts_in_rfs(rfs)
assert isinstance(rfs["refs"][".zattrs_xxx"], str)
rfs = {"refs": {"0": ["http://example.com", 0, 1000]}}
LindiReferenceFileSystemStore.use_templates_in_rfs(rfs)
assert 'templates' not in rfs
assert rfs['refs']['0'] == ['http://example.com', 0, 1000]
with tempfile.TemporaryDirectory() as tmpdir:
with open(f"{tmpdir}/file1.txt", "wb") as f:
f.write(b"a" * 1000)
f.write(b"b" * 1000)
f.write(b"c" * 1000)
f.write(b"d" * 1000)
f.write(b"e" * 1000)
with open(f"{tmpdir}/file2.txt", "wb") as f:
f.write(b"f" * 1000)
f.write(b"g" * 1000)
f.write(b"h" * 1000)
f.write(b"i" * 1000)
f.write(b"j" * 1000)
rfs = {"refs": {
"0": [f"{tmpdir}/file1.txt", 0, 1000],
"1": [f"{tmpdir}/file1.txt", 1000, 1000],
"2": [f"{tmpdir}/file1.txt", 2000, 1000],
"3": [f"{tmpdir}/file1.txt", 3000, 1000],
"4": [f"{tmpdir}/file1.txt", 4000, 1000],
"5": [f"{tmpdir}/file2.txt", 0, 1000],
"6": [f"{tmpdir}/file2.txt", 1000, 1000],
"7": [f"{tmpdir}/file2.txt", 2000, 1000],
"8": [f"{tmpdir}/file2.txt", 3000, 1000],
"9": [f"{tmpdir}/file2.txt", 4000, 1000],
}}
LindiReferenceFileSystemStore.use_templates_in_rfs(rfs)
assert 'templates' in rfs
assert rfs['templates']['u1'] == f"{tmpdir}/file1.txt"
assert rfs['templates']['u2'] == f"{tmpdir}/file2.txt"
assert rfs['refs']['0'] == ['{{u1}}', 0, 1000]
assert rfs['refs']['1'] == ['{{u1}}', 1000, 1000]
assert rfs['refs']['2'] == ['{{u1}}', 2000, 1000]
assert rfs['refs']['3'] == ['{{u1}}', 3000, 1000]
assert rfs['refs']['4'] == ['{{u1}}', 4000, 1000]
assert rfs['refs']['5'] == ['{{u2}}', 0, 1000]
assert rfs['refs']['6'] == ['{{u2}}', 1000, 1000]
assert rfs['refs']['7'] == ['{{u2}}', 2000, 1000]
assert rfs['refs']['8'] == ['{{u2}}', 3000, 1000]
assert rfs['refs']['9'] == ['{{u2}}', 4000, 1000]
store = LindiReferenceFileSystemStore(rfs)
assert store['0'] == b"a" * 1000
assert store['1'] == b"b" * 1000
assert store['2'] == b"c" * 1000
assert store['3'] == b"d" * 1000
assert store['4'] == b"e" * 1000
assert store['5'] == b"f" * 1000
assert store['6'] == b"g" * 1000
assert store['7'] == b"h" * 1000
assert store['8'] == b"i" * 1000
assert store['9'] == b"j" * 1000

rfs = {"refs": {"a": "abc"}}
store = LindiReferenceFileSystemStore(rfs)
Expand Down