diff --git a/CHANGELOG.md b/CHANGELOG.md index acd549a71..ca968a078 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ * `metadata` returned by `DataInterface.get_metadata()` is now a `DeepDict` object, making it easier to add and adjust metadata. [PR #404](https://github.com/catalystneuro/neuroconv/pull/404). * The `OpenEphysLegacyRecordingInterface` is now extracts the `session_start_time` in `get_metadata()` from `Neo` (`OpenEphysRawIO`) and does not depend on `pyopenephys` anymore. [PR #410](https://github.com/catalystneuro/neuroconv/pull/410) * Added basic temporal alignment methods to ecephys, ophys, and icephys DataInterfaces. These are `get_timestamps`, `align_starting_time`, `align_timestamps`, and `align_by_interpolation`. Added tests that serve as a first demonstration of the intended uses in a variety of cases. [PR #237](https://github.com/catalystneuro/neuroconv/pull/237) [PR #283](https://github.com/catalystneuro/neuroconv/pull/283) [PR #400](https://github.com/catalystneuro/neuroconv/pull/400) +* Added `expand_paths`. [PR #377](https://github.com/catalystneuro/neuroconv/pull/377) ### Testing * The tests for `automatic_dandi_upload` now follow up-to-date DANDI validation rules for file name conventions. [PR #310](https://github.com/catalystneuro/neuroconv/pull/310) diff --git a/requirements-minimal.txt b/requirements-minimal.txt index 35de16c66..7d7431573 100644 --- a/requirements-minimal.txt +++ b/requirements-minimal.txt @@ -11,3 +11,4 @@ psutil>=5.8.0 tqdm>=4.60.0 dandi>=0.46.2 pandas +parse diff --git a/src/neuroconv/tools/__init__.py b/src/neuroconv/tools/__init__.py index 3762d0412..ee48cda14 100644 --- a/src/neuroconv/tools/__init__.py +++ b/src/neuroconv/tools/__init__.py @@ -1,2 +1,3 @@ from .importing import get_package +from .path_expansion import LocalPathExpander from .processes import deploy_process diff --git a/src/neuroconv/tools/path_expansion.py b/src/neuroconv/tools/path_expansion.py new file mode 100644 index 000000000..9bb4d407a --- /dev/null +++ b/src/neuroconv/tools/path_expansion.py @@ -0,0 +1,84 @@ +"""Helpful classes for expanding file or folder paths on a system given a f-string rule for matching patterns.""" +import abc +import os +from pathlib import Path +from typing import Dict, Iterable, List + +from parse import parse +from pydantic import DirectoryPath, FilePath + +from ..utils import DeepDict + + +class AbstractPathExpander(abc.ABC): + def extract_metadata(self, base_directory: DirectoryPath, format_: str): + format_ = format_.replace("\\", os.sep) # Actual character is a single back-slash; first is an escape for that + format_ = format_.replace("/", os.sep) # our f-string uses '/' to communicate os-independent separators + for filepath in self.list_directory(base_directory): + result = parse(format_, filepath) + if result: + yield filepath, result.named + + @abc.abstractmethod + def list_directory(self, base_directory: DirectoryPath) -> Iterable[FilePath]: + """ + List all folders and files in a directory recursively. + + Parameters + ---------- + base_directory : DirectoryPath + The base directory whose contents will be iterated recursively. + + Yields + ------ + sub_paths : iterable of strings + Generator that yields all sub-paths of file and folders from the common root `base_directory`. + """ + pass + + def expand_paths(self, source_data_spec: Dict[str, dict]) -> List[DeepDict]: + """ + Match paths in a directory to specs and extract metadata from the paths. + + Parameters + ---------- + source_data_spec : dict + Source spec. + + Returns + ------- + deep_dicts : list of DeepDict objects + + Examples + -------- + >>> path_expander.expand_paths( + ... dict( + ... spikeglx=dict( + ... base_directory="source_folder", + ... paths=dict( + ... file_path="sub-{subject_id}/sub-{subject_id}_ses-{session_id}" + ... ) + ... ) + ... ) + ... ) + """ + out = DeepDict() + for interface, source_data in source_data_spec.items(): + for path_type in ("file_path", "folder_path"): + if path_type in source_data: + for path, metadata in self.extract_metadata(source_data["base_directory"], source_data[path_type]): + key = tuple(sorted(metadata.items())) + out[key]["source_data"][interface][path_type] = os.path.join( + source_data["base_directory"], path + ) # return the absolute path + if "session_id" in metadata: + out[key]["metadata"]["NWBFile"]["session_id"] = metadata["session_id"] + if "subject_id" in metadata: + out[key]["metadata"]["Subject"]["subject_id"] = metadata["subject_id"] + return list(dict(out).values()) + + +class LocalPathExpander(AbstractPathExpander): + def list_directory(self, base_directory: DirectoryPath) -> Iterable[FilePath]: + assert base_directory.is_dir(), f"The specified 'base_directory' ({base_directory}) is not a directory!" + return (str(path.relative_to(base_directory)) for path in Path(base_directory).rglob("*")) diff --git a/src/neuroconv/utils/__init__.py b/src/neuroconv/utils/__init__.py index 13ca90a8e..5b9aaddc5 100644 --- a/src/neuroconv/utils/__init__.py +++ b/src/neuroconv/utils/__init__.py @@ -1,11 +1,11 @@ from .checks import calculate_regular_series_rate from .dict import ( + DeepDict, append_replace_dict_in_list, dict_deep_update, exist_dict_in_list, load_dict_from_file, ) -from .globbing import decompose_f_string, parse_f_string from .json_schema import ( NWBMetaDataEncoder, fill_defaults, diff --git a/src/neuroconv/utils/dict.py b/src/neuroconv/utils/dict.py index a0ae0044b..03963c601 100644 --- a/src/neuroconv/utils/dict.py +++ b/src/neuroconv/utils/dict.py @@ -3,6 +3,7 @@ import warnings from collections import defaultdict from copy import deepcopy +from ctypes import Union from pathlib import Path from typing import Any, Optional, Union diff --git a/src/neuroconv/utils/globbing.py b/src/neuroconv/utils/globbing.py deleted file mode 100644 index 2f84b7abe..000000000 --- a/src/neuroconv/utils/globbing.py +++ /dev/null @@ -1,89 +0,0 @@ -import re -from typing import List -from warnings import warn - - -def decompose_f_string(f_string: str) -> (List[str], List[str]): - """ - Decompose an f-string into the list of variable names and the separators between them. - - An f-string is any string that contains enclosed curly brackets around text. - A variable is defined as the text expression within the enclosed curly brackets. - The separators are the strings remnants that surround the variables. - - An example f-string and components would be: 'This is {an} f-string!', with variable 'an' and separators - 'This is ' and ' f-string!'. - An instance of this example would be: 'This is definitely a good f-string!' with variable value 'definitely a good'. - - Example - ------- - variable_names, separators = decompose_f_string(f_string="a/{x}b{y}/c{z}") - # variable_names = ["x", "y", "z"] - # separators = ["a/", "b", "/c"", ""] - """ - matches = re.findall("{.*?}", f_string) # {.*?} optionally matches any characters enclosed by curly brackets - variable_names = [match.lstrip("{").rstrip("}") for match in matches] - assert not any( - (variable_name == "" for variable_name in variable_names) - ), "Empty variable name detected in f-string! Please ensure there is text between all enclosing '{' and '}'." - - pattern = "^.*?{|}.*?{|}.*?$" - # Description: patttern matches the all expressions outside of curly bracket enclosures - # .*?{ optionally matches any characters optionally before curly bracket opening - # | logical 'or' - # }.*?{ between a curly bracket closure and opening - # | - # }.*? after a closure - separators = [x.rstrip("{").lstrip("}") for x in re.findall(pattern=pattern, string=f_string)] - if any((separator == "" for separator in separators[1:-1])): - warn( - "There is an empty separator between two variables in the f-string! " - "The f-string will not be uniquely invertible." - ) - return variable_names, separators - - -def parse_f_string(string: str, f_string: str): - """ - Given an instance of an f-string rule, extract the values of the variables specified by the f-string. - - Recovery of values is only possible in cases where the string instance is uniquely invertible, - which requires at a minimum requires... - 1) Separators between all curly bracket enclosures, *e.g.*, '{var1}{var2}' is not allowed. - An easy way to resolve this is to add a unique separator between them, *i.e.*, '{var1}-{var2}'. - 2) The separator character(s) cannot also occur within the variable values, *e.g.*, '{var1}b{var2}' on - instance 'sub_01bsess_040122' where var1='sub_01 and' and var2='sess_040122'. Since the separator is a single - character 'b' which also occurs in the instance of var1, it cannot be determined which occurrence is the - proper separator. - - Resolving this relies on choosing unique separators between variables in the f-string rule; either a single - character that you know will never occur in any of your instances, or preferably a sequence of characters - that would not occur together. In the example above, a simple separator of '-' would suffice, but if other - instances might include that, such as var1='sub-05', then a sequential separator of '--' would work instead. - - Parameters - ---------- - string : str - An instance of the f-string rule. - fstring : str - String containing non-empty substrings enclosed by "{" and "}". - These correspond to the names of variables thought to encode the actual filename string. - """ - variable_names, separators = decompose_f_string(f_string=f_string) - pattern = "^" + "(.+)".join(separators) + "$" # (.+) matches any non-empty sequence of characters - pattern_match = re.findall(pattern=pattern, string=string) - assert pattern_match, "Unable to match f-string pattern to string! Please double check both structures." - variable_values = pattern_match[0] - for idx in range(len(variable_values) - 1): - assert ( - separators[idx + 1] not in variable_values[idx] - ), "Adjacent variable values contain the separator character! The f-string is not uniquely invertible." - values = dict() - for variable_name, variable_value in zip(variable_names, variable_values): - if variable_value != values.get(variable_name, variable_value): - raise ValueError( - f"Duplicated variable placements for '{variable_name}' in f-string do not match in instance! " - f"Expected '{values[variable_name]}' but found '{variable_value}'." - ) - values.update({variable_name: variable_value}) - return values diff --git a/tests/imports.py b/tests/imports.py index 4ff317287..8c7191390 100644 --- a/tests/imports.py +++ b/tests/imports.py @@ -52,10 +52,12 @@ def test_tools(self): # Sub-modules "importing", # Attached to namespace by importing get_package "nwb_helpers", # Attached to namespace by top __init__ call of NWBConverter + "path_expansion", # Functions and classes imported on the __init__ "get_package", "processes", "deploy_process", + "LocalPathExpander", ] self.assertCountEqual(first=current_structure, second=expected_structure) diff --git a/tests/test_minimal/test_tools/test_expand_paths.py b/tests/test_minimal/test_tools/test_expand_paths.py new file mode 100644 index 000000000..1dd7ba798 --- /dev/null +++ b/tests/test_minimal/test_tools/test_expand_paths.py @@ -0,0 +1,60 @@ +from pathlib import Path + +from neuroconv.tools import LocalPathExpander + + +def test_expand_paths(tmpdir): + expander = LocalPathExpander() + + # set up directory for parsing + base_directory = Path(tmpdir) + for subject_id in ("001", "002"): + Path.mkdir(base_directory / f"sub-{subject_id}") + for session_id in ("101", "102"): + Path.mkdir(base_directory / f"sub-{subject_id}" / f"session_{session_id}") + (base_directory / f"sub-{subject_id}" / f"session_{session_id}" / "abc").touch() + (base_directory / f"sub-{subject_id}" / f"session_{session_id}" / "xyz").touch() + + # run path parsing + out = expander.expand_paths( + dict( + aa=dict(base_directory=base_directory, file_path="sub-{subject_id:3}/session_{session_id:3}/abc"), + bb=dict(base_directory=base_directory, file_path="sub-{subject_id:3}/session_{session_id:3}/xyz"), + ), + ) + + expected = [ + { + "source_data": { + "aa": {"file_path": str(base_directory / "sub-002" / "session_101" / "abc")}, + "bb": {"file_path": str(base_directory / "sub-002" / "session_101" / "xyz")}, + }, + "metadata": {"NWBFile": {"session_id": "101"}, "Subject": {"subject_id": "002"}}, + }, + { + "source_data": { + "aa": {"file_path": str(base_directory / "sub-002" / "session_102" / "abc")}, + "bb": {"file_path": str(base_directory / "sub-002" / "session_102" / "xyz")}, + }, + "metadata": {"NWBFile": {"session_id": "102"}, "Subject": {"subject_id": "002"}}, + }, + { + "source_data": { + "aa": {"file_path": str(base_directory / "sub-001" / "session_101" / "abc")}, + "bb": {"file_path": str(base_directory / "sub-001" / "session_101" / "xyz")}, + }, + "metadata": {"NWBFile": {"session_id": "101"}, "Subject": {"subject_id": "001"}}, + }, + { + "source_data": { + "aa": {"file_path": str(base_directory / "sub-001" / "session_102" / "abc")}, + "bb": {"file_path": str(base_directory / "sub-001" / "session_102" / "xyz")}, + }, + "metadata": {"NWBFile": {"session_id": "102"}, "Subject": {"subject_id": "001"}}, + }, + ] + + # test results + for x in out: + assert x in expected + assert len(out) == len(expected) diff --git a/tests/test_minimal/test_utils/test_globbing_utils.py b/tests/test_minimal/test_utils/test_globbing_utils.py deleted file mode 100644 index 9c306203d..000000000 --- a/tests/test_minimal/test_utils/test_globbing_utils.py +++ /dev/null @@ -1,91 +0,0 @@ -from hdmf.testing import TestCase - -from neuroconv.utils import decompose_f_string, parse_f_string - - -class TestGlobbingAssertions(TestCase): - def test_decompose_f_string_assertion(self): - with self.assertRaisesWith( - exc_type=AssertionError, - exc_msg=( - "Empty variable name detected in f-string! Please ensure there is text between all " - "enclosing '{' and '}'." - ), - ): - decompose_f_string(f_string="a/{x}b{y}/c{z}d{}") - - def test_decompose_f_string_separators_assertion(self): - with self.assertWarnsWith( - warn_type=UserWarning, - exc_msg=( - "There is an empty separator between two variables in the f-string! " - "The f-string will not be uniquely invertible." - ), - ): - decompose_f_string(f_string="a/{x}{y}/c{z}") - - def test_parse_f_string_non_invertible_assertion(self): - with self.assertRaisesWith( - exc_type=AssertionError, - exc_msg=( - "Adjacent variable values contain the separator character! The f-string is not uniquely invertible." - ), - ): - parse_f_string(string="a/foobbar/cthat", f_string="a/{x}b{y}/c{z}") - - def test_parse_f_string_bad_structure_assertion(self): - with self.assertRaisesWith( - exc_type=AssertionError, - exc_msg="Unable to match f-string pattern to string! Please double check both structures.", - ): - parse_f_string(string="just/plain/wrong", f_string="a/{x}b{y}/c{z}") - - def test_parse_f_string_duplicated_mismatch_assertion(self): - with self.assertRaisesWith( - exc_type=ValueError, - exc_msg=( - "Duplicated variable placements for 'x' in f-string do not match in instance! " - "Expected 'foo' but found 'wrong'." - ), - ): - parse_f_string(string="a/foobthat/cbar/sub-wrong", f_string="a/{x}b{y}/c{z}/sub-{x}") - - -def test_decompose_f_string(): - variable_names, _ = decompose_f_string(f_string="a/{x}b{y}/c{z}") - assert variable_names == ["x", "y", "z"] - - -def test_decompose_f_string_separators(): - _, separators = decompose_f_string(f_string="a/{x}b{y}/c") - assert separators == ["a/", "b", "/c"] - - -def test_decompose_f_string_separators_leading(): - _, separators = decompose_f_string(f_string="{start}a/{x}b{y}/c") - assert separators == ["", "a/", "b", "/c"] - - -def test_decompose_f_string_separators_trailing(): - _, separators = decompose_f_string(f_string="a/{x}b{y}/c{end}") - assert separators == ["a/", "b", "/c", ""] - - -def test_parse_f_string(): - f_string_values = parse_f_string(string="a/foobthat/cbar", f_string="a/{x}b{y}/c{z}") - assert f_string_values == dict(x="foo", y="that", z="bar") - - -def test_parse_f_string_leading_value(): - f_string_values = parse_f_string(string="123a/foobthat/cbar", f_string="{start}a/{x}b{y}/c{z}") - assert f_string_values == dict(start="123", x="foo", y="that", z="bar") - - -def test_parse_f_string_no_trailing_value(): - f_string_values = parse_f_string(string="a/foobthat/c", f_string="a/{x}b{y}/c") - assert f_string_values == dict(x="foo", y="that") - - -def test_parse_f_string_duplicates(): - f_string_values = parse_f_string(string="a/foobthat/cbar/sub-foo", f_string="a/{x}b{y}/c{z}/sub-{x}") - assert f_string_values == dict(x="foo", y="that", z="bar")