diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2f161f3a..743a45dd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,7 @@
# Changelog
## [Latest](https://github.com/int-brain-lab/ONE/commits/main) [3.0.0]
This version drops support for python 3.9 and below, and ONE is now in remote mode by default.
+Also adds a new ALFPath class to replace alf path functions.
### Modified
@@ -8,6 +9,11 @@ This version drops support for python 3.9 and below, and ONE is now in remote mo
- OneAlyx uses remote mode by default, instead of auto
- OneAlyx.search now updates the cache tables in remote mode as paginated sessions are accessed
- datasets table file_size column nullable by default
+- one.alf.io.save_metadata now returns the saved filepath
+- paths returned by One methods and functions in one.alf.io are now ALFPath instances
+- bugfix: one.alf.path.full_path_parts didn't always raise when invalid path passed
+- one.alf.path module containing ALFPath class
+- one.alf.exceptions.InvalidALF exception
### Added
@@ -18,6 +24,8 @@ This version drops support for python 3.9 and below, and ONE is now in remote mo
### Removed
- setup.py
+- one.alf.files; use one.alf.path instead
+- one.alf.io.remove_uuid_file
## [2.11.1]
diff --git a/docs/notebooks/datasets_and_types.ipynb b/docs/notebooks/datasets_and_types.ipynb
index c3c8fbb0..a08937c1 100644
--- a/docs/notebooks/datasets_and_types.ipynb
+++ b/docs/notebooks/datasets_and_types.ipynb
@@ -16,37 +16,43 @@
},
{
"cell_type": "code",
- "execution_count": 13,
- "outputs": [],
- "source": [
- "from pprint import pprint\n",
- "from one.alf import spec\n",
- "from one.alf.files import filename_parts"
- ],
+ "execution_count": null,
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
- }
+ },
+ "outputs": [],
+ "source": [
+ "from pprint import pprint\n",
+ "from one.alf import spec\n",
+ "from one.alf.path import ALFPath"
+ ]
},
{
"cell_type": "markdown",
- "source": [
- "## Datasets\n",
- "\n",
- "Print information about ALF objects"
- ],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%% md\n"
}
- }
+ },
+ "source": [
+ "## Datasets\n",
+ "\n",
+ "Print information about ALF objects"
+ ]
},
{
"cell_type": "code",
"execution_count": 14,
+ "metadata": {
+ "collapsed": false,
+ "pycharm": {
+ "name": "#%%\n"
+ }
+ },
"outputs": [
{
"name": "stdout",
@@ -73,83 +79,83 @@
],
"source": [
"spec.describe('object')"
- ],
- "metadata": {
- "collapsed": false,
- "pycharm": {
- "name": "#%%\n"
- }
- }
+ ]
},
{
"cell_type": "markdown",
- "source": [
- "Check the file name is ALF compliant"
- ],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%% md\n"
}
- }
+ },
+ "source": [
+ "Check the file name is ALF compliant"
+ ]
},
{
"cell_type": "code",
"execution_count": 15,
- "outputs": [],
- "source": [
- "assert spec.is_valid('spikes.times.npy')"
- ],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
- }
+ },
+ "outputs": [],
+ "source": [
+ "assert spec.is_valid('spikes.times.npy')"
+ ]
},
{
"cell_type": "markdown",
- "source": [
- "Safely construct an ALF dataset using the 'to_alf' function. This will ensure the correct\n",
- "case and format"
- ],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%% md\n"
}
- }
+ },
+ "source": [
+ "Safely construct an ALF dataset using the 'to_alf' function. This will ensure the correct\n",
+ "case and format"
+ ]
},
{
"cell_type": "code",
"execution_count": 16,
- "outputs": [],
- "source": [
- "filename = spec.to_alf('spikes', 'times', 'npy',\n",
- " namespace='ibl', timescale='ephys clock', extra='raw')"
- ],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
- }
+ },
+ "outputs": [],
+ "source": [
+ "filename = spec.to_alf('spikes', 'times', 'npy',\n",
+ " namespace='ibl', timescale='ephys clock', extra='raw')"
+ ]
},
{
"cell_type": "markdown",
- "source": [
- "Parsing a new file into its constituent parts ensures the dataset is correct"
- ],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%% md\n"
}
- }
+ },
+ "source": [
+ "Parsing a new file into its constituent parts ensures the dataset is correct"
+ ]
},
{
"cell_type": "code",
- "execution_count": 17,
+ "execution_count": null,
+ "metadata": {
+ "collapsed": false,
+ "pycharm": {
+ "name": "#%%\n"
+ }
+ },
"outputs": [
{
"name": "stdout",
@@ -165,18 +171,18 @@
}
],
"source": [
- "parts = filename_parts('_ibl_spikes.times_ephysClock.raw.npy', as_dict=True, assert_valid=True)\n",
+ "parts = ALFPath('_ibl_spikes.times_ephysClock.raw.npy').parse_alf_name()\n",
"pprint(parts)"
- ],
+ ]
+ },
+ {
+ "cell_type": "markdown",
"metadata": {
"collapsed": false,
"pycharm": {
- "name": "#%%\n"
+ "name": "#%% md\n"
}
- }
- },
- {
- "cell_type": "markdown",
+ },
"source": [
"## Dataset types\n",
"
\n",
@@ -197,17 +203,17 @@
"\n",
"When registering files they must match exactly 1 dataset type.\n",
"
"
- ],
- "metadata": {
- "collapsed": false,
- "pycharm": {
- "name": "#%% md\n"
- }
- }
+ ]
},
{
"cell_type": "code",
"execution_count": 18,
+ "metadata": {
+ "collapsed": false,
+ "pycharm": {
+ "name": "#%%\n"
+ }
+ },
"outputs": [
{
"name": "stdout",
@@ -218,7 +224,13 @@
},
{
"data": {
- "text/plain": "{'id': '1427b6ba-6535-4f8f-9058-e3df63f0261e',\n 'name': 'spikes.times',\n 'created_by': None,\n 'description': '[nspi]. Times of spikes (seconds, relative to experiment onset). Note this includes spikes from all probes, merged together',\n 'filename_pattern': 'spikes.times*.npy'}"
+ "text/plain": [
+ "{'id': '1427b6ba-6535-4f8f-9058-e3df63f0261e',\n",
+ " 'name': 'spikes.times',\n",
+ " 'created_by': None,\n",
+ " 'description': '[nspi]. Times of spikes (seconds, relative to experiment onset). Note this includes spikes from all probes, merged together',\n",
+ " 'filename_pattern': 'spikes.times*.npy'}"
+ ]
},
"execution_count": 18,
"metadata": {},
@@ -229,29 +241,29 @@
"from one.api import ONE\n",
"one = ONE(base_url='https://openalyx.internationalbrainlab.org')\n",
"one.describe_dataset('spikes.times') # Requires online version (an Alyx database connection)"
- ],
- "metadata": {
- "collapsed": false,
- "pycharm": {
- "name": "#%%\n"
- }
- }
+ ]
},
{
"cell_type": "markdown",
- "source": [
- "Datasets and their types can be interconverted using the following functions (online mode only):"
- ],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%% md\n"
}
- }
+ },
+ "source": [
+ "Datasets and their types can be interconverted using the following functions (online mode only):"
+ ]
},
{
"cell_type": "code",
"execution_count": 19,
+ "metadata": {
+ "collapsed": false,
+ "pycharm": {
+ "name": "#%%\n"
+ }
+ },
"outputs": [
{
"name": "stdout",
@@ -269,13 +281,7 @@
"\n",
"dset_list = '\", \"'.join(datasets)\n",
"print(f'the dataset type \"{dataset_type}\" for {eid} comprises the datasets: \\n\"{dset_list}\"')"
- ],
- "metadata": {
- "collapsed": false,
- "pycharm": {
- "name": "#%%\n"
- }
- }
+ ]
}
],
"metadata": {
@@ -299,4 +305,4 @@
},
"nbformat": 4,
"nbformat_minor": 0
-}
\ No newline at end of file
+}
diff --git a/one/alf/cache.py b/one/alf/cache.py
index 74e7e775..1143e9a0 100644
--- a/one/alf/cache.py
+++ b/one/alf/cache.py
@@ -29,7 +29,7 @@
from iblutil.io.hashfile import md5
from one.alf.spec import QC
-from one.alf.io import iter_sessions, iter_datasets
+from one.alf.io import iter_sessions
from one.alf.path import session_path_parts, get_alf_path
__all__ = ['make_parquet_db', 'patch_cache', 'remove_missing_datasets',
@@ -96,17 +96,33 @@ def _get_session_info(rel_ses_path):
return out
-def _get_dataset_info(full_ses_path, rel_dset_path, ses_eid=None, compute_hash=False):
- rel_ses_path = get_alf_path(full_ses_path)
- full_dset_path = Path(full_ses_path, rel_dset_path).as_posix()
- file_size = Path(full_dset_path).stat().st_size
- ses_eid = ses_eid or _ses_str_id(rel_ses_path)
+def _get_dataset_info(dset_path, ses_eid=None, compute_hash=False):
+ """Create dataset record from local path.
+
+ Parameters
+ ----------
+ dset_path : one.alf.ALFPath
+ A full ALF path.
+ ses_eid : str, UUID, optional
+ A session uuid.
+ compute_hash : bool, optional
+ Whether to compute a file hash.
+
+ Returns
+ -------
+ dict
+ A dataset record.
+
+ TODO Return tuples for more memory-efficient cache generation.
+ """
+ rel_dset_path = get_alf_path(dset_path.relative_to_session())
+ ses_eid = ses_eid or _ses_str_id(dset_path.session_path())
return {
- 'id': Path(rel_ses_path, rel_dset_path).as_posix(),
- 'eid': str(ses_eid),
- 'rel_path': Path(rel_dset_path).as_posix(),
- 'file_size': file_size,
- 'hash': md5(full_dset_path) if compute_hash else None,
+ 'id': rel_dset_path,
+ 'eid': ses_eid or pd.NA,
+ 'rel_path': rel_dset_path,
+ 'file_size': dset_path.stat().st_size,
+ 'hash': md5(dset_path) if compute_hash else '',
'exists': True,
'qc': 'NOT_SET'
}
@@ -117,7 +133,7 @@ def _rel_path_to_uuid(df, id_key='rel_path', base_id=None, keep_old=False):
toUUID = partial(uuid.uuid3, base_id) # MD5 hash from base uuid and rel session path string
if keep_old:
df[f'{id_key}_'] = df[id_key].copy()
- df.loc[:, id_key] = df.groupby(id_key)[id_key].transform(lambda x: str(toUUID(x.name)))
+ df.loc[:, id_key] = df.groupby(id_key)[id_key].transform(lambda x: toUUID(x.name))
return df
@@ -210,8 +226,8 @@ def _make_datasets_df(root_dir, hash_files=False) -> pd.DataFrame:
# Go through sessions and append datasets
for session_path in iter_sessions(root_dir):
rows = []
- for rel_dset_path in iter_datasets(session_path):
- file_info = _get_dataset_info(session_path, rel_dset_path, compute_hash=hash_files)
+ for dset_path in session_path.iter_datasets(recursive=True):
+ file_info = _get_dataset_info(dset_path, compute_hash=hash_files)
assert set(file_info.keys()) <= set(DATASETS_COLUMNS)
rows.append(file_info)
df = pd.concat((df, pd.DataFrame(rows, columns=DATASETS_COLUMNS).astype(DATASETS_COLUMNS)),
@@ -256,6 +272,9 @@ def make_parquet_db(root_dir, out_dir=None, hash_ids=True, hash_files=False, lab
# Add integer id columns
if hash_ids and len(df_ses) > 0:
df_ses, df_dsets = _ids_to_uuid(df_ses, df_dsets)
+ # For parquet all indices must be str
+ df_ses.index = df_ses.index.map(str)
+ df_dsets.index = df_dsets.index.map(lambda x: tuple(map(str, x)))
if lab: # Fill in lab name field
assert not df_ses['lab'].any() or (df_ses['lab'] == 'lab').all(), 'lab name conflict'
@@ -330,9 +349,9 @@ def remove_missing_datasets(cache_dir, tables=None, remove_empty_sessions=True,
datasets = tables['datasets'].loc[sessions[session_path]]
except KeyError:
datasets = tables['datasets'].iloc[0:0, :]
- for dataset in iter_datasets(session_path):
- if dataset.as_posix() not in datasets['rel_path']:
- to_delete.add(session_path.joinpath(dataset))
+ for dataset in session_path.iter_datasets():
+ if dataset.relative_to_session().as_posix() not in datasets['rel_path']:
+ to_delete.add(dataset)
if session_path not in sessions and remove_empty_sessions:
to_delete.add(session_path)
diff --git a/one/alf/exceptions.py b/one/alf/exceptions.py
index d963767b..4218cf2d 100644
--- a/one/alf/exceptions.py
+++ b/one/alf/exceptions.py
@@ -87,3 +87,9 @@ class ALFMultipleRevisionsFound(ALFError):
class ALFWarning(Warning):
"""Cautions when loading ALF datasets."""
pass
+
+
+class ALFInvalid(ALFError, ValueError):
+ """ALF path invalid."""
+ explanation = ('The file path provided is does not match the ALF path specification defined '
+ 'in `one.alf.spec`.')
diff --git a/one/alf/files.py b/one/alf/files.py
deleted file mode 100644
index 3bd442a1..00000000
--- a/one/alf/files.py
+++ /dev/null
@@ -1,11 +0,0 @@
-"""
-(DEPRECATED) Module for identifying and parsing ALF file names.
-
-This module has moved to :mod:`one.alf.path`.
-"""
-import warnings
-
-from .path import * # noqa
-
-warnings.warn(
- '`one.alf.files` will be removed in version 3.0. Use `one.alf.path` instead.', FutureWarning)
diff --git a/one/alf/io.py b/one/alf/io.py
index b1ee1ab8..ee0d9b25 100644
--- a/one/alf/io.py
+++ b/one/alf/io.py
@@ -9,7 +9,6 @@
import json
import copy
import logging
-import os
import re
from fnmatch import fnmatch
from pathlib import Path
@@ -26,7 +25,7 @@
from iblutil.io import parquet
from iblutil.io import jsonable
from .exceptions import ALFObjectNotFound
-from . import spec, path as files
+from . import path, spec
from .spec import FILE_SPEC
_logger = logging.getLogger(__name__)
@@ -136,21 +135,21 @@ def dataframe(adict):
return df
-def _find_metadata(file_alf) -> Path:
+def _find_metadata(file_alf) -> path.ALFPath:
"""
File path for an existing meta-data file for an alf_file
Parameters
----------
file_alf : str, pathlib.Path
- A path of existing ALF
+ A path of existing ALF.
Returns
-------
- pathlib.Path
- Path of meta-data file if exists
+ one.alf.path.ALFPath
+ Path of meta-data file if exists.
"""
- file_alf = Path(file_alf)
+ file_alf = path.ALFPath(file_alf)
ns, obj = file_alf.name.split('.')[:2]
return next(file_alf.parent.glob(f'{ns}.{obj}*.metadata*.json'), None)
@@ -175,11 +174,10 @@ def read_ts(filename):
--------
>>> t, d = read_ts(filename)
"""
- if not isinstance(filename, Path):
- filename = Path(filename)
+ filename = path.ensure_alf_path(filename)
# alf format is object.attribute.extension, for example '_ibl_wheel.position.npy'
- _, obj, attr, *_, ext = files.filename_parts(filename.parts[-1])
+ _, obj, attr, *_, ext = filename.dataset_name_parts
try:
# looking for matching object with attribute timestamps: '_ibl_wheel.timestamps.npy'
@@ -328,8 +326,8 @@ def load_file_content(fil):
return sparse.load_npz(fil)
except ModuleNotFoundError:
warnings.warn(f'{Path(fil).name} requires the pydata sparse package to load.')
- return Path(fil)
- return Path(fil)
+ return path.ALFPath(fil)
+ return path.ALFPath(fil)
def _ls(alfpath, object=None, **kwargs) -> (list, tuple):
@@ -349,17 +347,17 @@ def _ls(alfpath, object=None, **kwargs) -> (list, tuple):
Returns
-------
- list
- A list of ALF paths
+ list of one.alf.path.ALFPath
+ A list of ALF paths.
tuple
- A tuple of ALF attributes corresponding to the file paths
+ A tuple of ALF attributes corresponding to the file paths.
Raises
------
ALFObjectNotFound
No matching ALF object was found in the alfpath directory
"""
- alfpath = Path(alfpath)
+ alfpath = path.ALFPath(alfpath)
if not alfpath.exists():
files_alf = attributes = None
elif alfpath.is_dir():
@@ -369,7 +367,7 @@ def _ls(alfpath, object=None, **kwargs) -> (list, tuple):
else:
files_alf, attributes = filter_by(alfpath, object=object, **kwargs)
else:
- object = files.filename_parts(alfpath.name)[1]
+ object = alfpath.object
alfpath = alfpath.parent
files_alf, attributes = filter_by(alfpath, object=object, **kwargs)
@@ -409,10 +407,10 @@ def iter_sessions(root_dir, pattern='*'):
>>> sessions = list(iter_sessions(root_dir, pattern='*/????-??-??/*'))
"""
if spec.is_session_path(root_dir):
- yield root_dir
- for path in sorted(Path(root_dir).rglob(pattern)):
- if path.is_dir() and spec.is_session_path(path):
- yield path
+ yield path.ALFPath(root_dir)
+ for p in sorted(Path(root_dir).rglob(pattern)):
+ if p.is_dir() and spec.is_session_path(p):
+ yield path.ALFPath(p)
def iter_datasets(session_path):
@@ -426,12 +424,11 @@ def iter_datasets(session_path):
Yields
-------
- pathlib.Path
+ one.alf.path.ALFPath
The next dataset path (relative to the session path) in lexicographical order.
"""
- for p in sorted(Path(session_path).rglob('*.*')):
- if not p.is_dir() and spec.is_valid(p.name):
- yield p.relative_to(session_path)
+ for dataset in path.ALFPath(session_path).iter_datasets(recursive=True):
+ yield dataset.relative_to(session_path)
def exists(alfpath, object, attributes=None, **kwargs) -> bool:
@@ -521,8 +518,8 @@ def load_object(alfpath, object=None, short_keys=False, **kwargs):
raise ValueError('If a directory is provided, the object name should be provided too')
files_alf, parts = _ls(alfpath, object, **kwargs)
else: # A list of paths allows us to load an object from different revisions
- files_alf = alfpath
- parts = [files.filename_parts(x.name) for x in files_alf]
+ files_alf = list(map(path.ALFPath, alfpath))
+ parts = [x.dataset_name_parts for x in files_alf]
assert len(set(p[1] for p in parts)) == 1
object = next(x[1] for x in parts)
# Take attribute and timescale from parts list
@@ -594,27 +591,27 @@ def save_object_npy(alfpath, dico, object, parts=None, namespace=None, timescale
Saves a dictionary in `ALF format`_ using object as object name and dictionary keys as
attribute names. Dimensions have to be consistent.
- Simplified ALF example: _namespace_object.attribute.part1.part2.extension
+ Simplified ALF example: _namespace_object.attribute.part1.part2.extension.
Parameters
----------
alfpath : str, pathlib.Path
- Path of the folder to save data to
+ Path of the folder to save data to.
dico : dict
- Dictionary to save to npy; keys correspond to ALF attributes
+ Dictionary to save to npy; keys correspond to ALF attributes.
object : str
- Name of the object to save
+ Name of the object to save.
parts : str, list, None
- Extra parts to the ALF name
+ Extra parts to the ALF name.
namespace : str, None
- The optional namespace of the object
+ The optional namespace of the object.
timescale : str, None
- The optional timescale of the object
+ The optional timescale of the object.
Returns
-------
- list
- List of written files
+ list of one.alf.path.ALFPath
+ List of written files.
Examples
--------
@@ -624,7 +621,7 @@ def save_object_npy(alfpath, dico, object, parts=None, namespace=None, timescale
.. _ALF format:
https://int-brain-lab.github.io/ONE/alf_intro.html
"""
- alfpath = Path(alfpath)
+ alfpath = path.ALFPath(alfpath)
status = check_dimensions(dico)
if status != 0:
raise ValueError('Dimensions are not consistent to save all arrays in ALF format: ' +
@@ -638,7 +635,7 @@ def save_object_npy(alfpath, dico, object, parts=None, namespace=None, timescale
return out_files
-def save_metadata(file_alf, dico) -> None:
+def save_metadata(file_alf, dico) -> path.ALFPath:
"""Writes a meta data file matching a current ALF file object.
For example given an alf file `clusters.ccfLocation.ssv` this will write a dictionary in JSON
@@ -655,39 +652,18 @@ def save_metadata(file_alf, dico) -> None:
Full path to the alf object
dico : dict, ALFBunch
Dictionary containing meta-data
- """
- assert spec.is_valid(file_alf.parts[-1]), 'ALF filename not valid'
- file_meta_data = file_alf.parent / (file_alf.stem + '.metadata.json')
- with open(file_meta_data, 'w+') as fid:
- fid.write(json.dumps(dico, indent=1))
-
-
-def remove_uuid_file(file_path, dry=False) -> Path:
- """
- (DEPRECATED) Renames a file without the UUID and returns the new pathlib.Path object.
-
- Parameters
- ----------
- file_path : str, pathlib.Path
- An ALF path containing a UUID in the file name.
- dry : bool
- If False, the file is not renamed on disk.
Returns
-------
- pathlib.Path
- The new file path without the UUID in the file name.
+ one.alf.path.ALFPath
+ The saved metadata file path.
"""
- warnings.warn(
- 'remove_uuid_file deprecated, use one.alf.files.remove_uuid_string instead',
- DeprecationWarning)
- file_path = Path(file_path)
- new_path = files.remove_uuid_string(file_path)
- if new_path == file_path:
- return new_path
- if not dry and file_path.exists():
- file_path.replace(new_path)
- return new_path
+ file_alf = path.ALFPath(file_alf)
+ assert file_alf.is_dataset, 'ALF filename not valid'
+ file_meta_data = file_alf.parent / (file_alf.stem + '.metadata.json')
+ with open(file_meta_data, 'w+') as fid:
+ fid.write(json.dumps(dico, indent=1))
+ return file_meta_data
def remove_uuid_recursive(folder, dry=False) -> None:
@@ -704,8 +680,11 @@ def remove_uuid_recursive(folder, dry=False) -> None:
warnings.warn(
'remove_uuid_recursive is deprecated and will be removed in the next release',
DeprecationWarning)
- for fn in Path(folder).rglob('*.*'):
- print(remove_uuid_file(fn, dry=dry))
+ for fn in path.ALFPath(folder).iter_datasets(recursive=True):
+ if (new_fn := fn.without_uuid()).name != fn.name:
+ print(new_fn)
+ if not dry:
+ fn.rename(new_fn)
def next_num_folder(session_date_folder: Union[str, Path]) -> str:
@@ -742,29 +721,29 @@ def filter_by(alf_path, wildcards=True, **kwargs):
Parameters
----------
alf_path : str, pathlib.Path
- A path to a folder containing ALF datasets
+ A path to a folder containing ALF datasets.
wildcards : bool
- If true, kwargs are matched as unix-style patterns, otherwise as regular expressions
+ If true, kwargs are matched as unix-style patterns, otherwise as regular expressions.
object : str, list
- Filter by a given object (e.g. 'spikes')
+ Filter by a given object (e.g. 'spikes').
attribute : str, list
- Filter by a given attribute (e.g. 'intervals')
+ Filter by a given attribute (e.g. 'intervals').
extension : str, list
- Filter by extension (e.g. 'npy')
+ Filter by extension (e.g. 'npy').
namespace : str, list
- Filter by a given namespace (e.g. 'ibl') or None for files without one
+ Filter by a given namespace (e.g. 'ibl') or None for files without one.
timescale : str, list
- Filter by a given timescale (e.g. 'bpod') or None for files without one
+ Filter by a given timescale (e.g. 'bpod') or None for files without one.
extra : str, list
Filter by extra parameters (e.g. 'raw') or None for files without extra parts
NB: Wild cards not permitted here.
Returns
-------
- alf_files : str
- A Path to a directory containing ALF files
+ alf_files : list of one.alf.path.ALFPath
+ A Path to a directory containing ALF files.
attributes : list of dicts
- A list of parsed file parts
+ A list of parsed file parts.
Examples
--------
@@ -793,8 +772,8 @@ def filter_by(alf_path, wildcards=True, **kwargs):
>>> filter_by(alf_path, object='^wheel.*', wildcards=False)
>>> filter_by(alf_path, object=['^wheel$', '.*Moves'], wildcards=False)
"""
- alf_files = [f for f in os.listdir(alf_path) if spec.is_valid(f)]
- attributes = [files.filename_parts(f, as_dict=True) for f in alf_files]
+ alf_files = [f.relative_to(alf_path) for f in path.ALFPath(alf_path).iter_datasets()]
+ attributes = list(map(path.ALFPath.parse_alf_name, alf_files))
if kwargs:
# Validate keyword arguments against regex group names
@@ -887,9 +866,6 @@ def find_variants(file_list, namespace=True, timescale=True, extra=True, extensi
]}
"""
- # Parse into individual ALF parts
- to_parts_dict = partial(files.full_path_parts, as_dict=True)
- uParts = map(to_parts_dict, file_list)
# Initialize map of unique files to their duplicates
duplicates = {}
# Determine which parts to filter
@@ -900,19 +876,20 @@ def find_variants(file_list, namespace=True, timescale=True, extra=True, extensi
def parts_match(parts, file):
"""Compare a file's unique parts to a given file"""
- other = to_parts_dict(file)
+ other = file.parse_alf_path()
return all(parts[k] == other[k] for k in to_compare)
# iterate over unique files and their parts
- for f, parts in zip(map(Path, file_list), uParts):
+ for f in map(path.ALFPath, file_list):
+ parts = f.parse_alf_path()
# first glob for files matching object.attribute (including revisions)
pattern = f'*{parts["object"]}.{parts["attribute"]}*'
# this works because revision will always be last folder;
# i.e. revisions can't contain collections
- globbed = map(files.without_revision(f).parent.glob, (pattern, '#*#/' + pattern))
+ globbed = map(f.without_revision().parent.glob, (pattern, '#*#/' + pattern))
globbed = chain.from_iterable(globbed) # unite revision and non-revision globs
# refine duplicates based on other parts (this also ensures we don't catch similar objects)
globbed = filter(partial(parts_match, parts), globbed)
- # key = f.relative_to(one.alf.files.get_session_path(f)).as_posix()
+ # key = f.relative_to_session().as_posix()
duplicates[f] = [x for x in globbed if x != f] # map file to list of its duplicates
return duplicates
diff --git a/one/alf/path.py b/one/alf/path.py
index bf13ad4a..be89f8e8 100644
--- a/one/alf/path.py
+++ b/one/alf/path.py
@@ -13,17 +13,37 @@
For more information, see the following documentation:
https://int-brain-lab.github.io/ONE/alf_intro.html
+
+ALFPath differences
+-------------------
+ALFPath.iter_datasets returns full paths (close the pathlib.Path.iterdir), whereas
+alf.io.iter_datasets returns relative paths as POSIX strings (TODO).
+
+ALFPath.parse_* methods return a dict by default, whereas parse_* functions return
+tuples by default. Additionally, the parse_* functions raise ALFInvalid errors by
+default if the path can't be parsed. ALFPath.parse_* methods have no validation
+option.
+
+ALFPath properties return empty str instead of None if ALF part isn't present..
"""
+import os
+import pathlib
from collections import OrderedDict
from datetime import datetime
-from typing import Union, Optional
-from pathlib import Path
+from typing import Union, Optional, Iterable
import logging
+from iblutil.util import Listable
+
+from .exceptions import ALFInvalid
from . import spec
from .spec import SESSION_SPEC, COLLECTION_SPEC, FILE_SPEC, REL_PATH_SPEC
_logger = logging.getLogger(__name__)
+__all__ = [
+ 'ALFPath', 'PureALFPath', 'WindowsALFPath', 'PosixALFPath',
+ 'PureWindowsALFPath', 'PurePosixALFPath'
+]
def rel_path_parts(rel_path, as_dict=False, assert_valid=True):
@@ -40,7 +60,7 @@ def rel_path_parts(rel_path, as_dict=False, assert_valid=True):
If true, an OrderedDict of parts are returned with the keys ('lab', 'subject', 'date',
'number'), otherwise a tuple of values are returned.
assert_valid : bool
- If true a ValueError is raised when the session cannot be parsed, otherwise an empty
+ If true an ALFInvalid is raised when the session cannot be parsed, otherwise an empty
dict of tuple of Nones is returned.
Returns
@@ -68,7 +88,7 @@ def session_path_parts(session_path, as_dict=False, assert_valid=True):
If true, an OrderedDict of parts are returned with the keys ('lab', 'subject', 'date',
'number'), otherwise a tuple of values are returned.
assert_valid : bool
- If true a ValueError is raised when the session cannot be parsed, otherwise an empty
+ If true an ALFInvalid is raised when the session cannot be parsed, otherwise an empty
dict of tuple of Nones is returned.
Returns
@@ -78,7 +98,7 @@ def session_path_parts(session_path, as_dict=False, assert_valid=True):
Raises
------
- ValueError
+ ALFInvalid
Invalid ALF session path (assert_valid is True).
"""
return _path_parts(session_path, SESSION_SPEC, False, as_dict, assert_valid)
@@ -105,7 +125,7 @@ def _path_parts(path, spec_str, match=True, as_dict=False, assert_valid=True):
Raises
------
- ValueError
+ ALFInvalid
Invalid ALF path (assert_valid is True).
"""
if hasattr(path, 'as_posix'):
@@ -117,7 +137,7 @@ def _path_parts(path, spec_str, match=True, as_dict=False, assert_valid=True):
parsed_dict = parsed.groupdict()
return OrderedDict(parsed_dict) if as_dict else tuple(parsed_dict.values())
elif assert_valid:
- raise ValueError(f'Invalid ALF: "{path}"')
+ raise ALFInvalid(path)
else:
return empty if as_dict else tuple(empty.values())
@@ -174,7 +194,7 @@ def filename_parts(filename, as_dict=False, assert_valid=True) -> Union[dict, tu
Raises
------
- ValueError
+ ALFInvalid
Invalid ALF dataset (assert_valid is True).
"""
return _path_parts(filename, FILE_SPEC, True, as_dict, assert_valid)
@@ -220,15 +240,21 @@ def full_path_parts(path, as_dict=False, assert_valid=True) -> Union[dict, tuple
Raises
------
- ValueError
+ ALFInvalid
Invalid ALF path (assert_valid is True).
"""
- path = Path(path)
+ path = pathlib.Path(path)
# NB We try to determine whether we have a folder or filename path. Filenames contain at
# least two periods, however it is currently permitted to have any number of periods in a
# collection, making the ALF path ambiguous.
if sum(x == '.' for x in path.name) < 2: # folder only
folders = folder_parts(path, as_dict, assert_valid)
+ if assert_valid:
+ # Edge case: ensure is indeed folder by checking that name is in parts
+ invalid_file = path.name not in (folders.values() if as_dict else folders)
+ is_revision = f'#{folders["revision"] if as_dict else folders[-1]}#' == path.name
+ if not is_revision and invalid_file:
+ raise ALFInvalid(path)
dataset = filename_parts('', as_dict, assert_valid=False)
elif '/' not in path.as_posix(): # filename only
folders = folder_parts('', as_dict, assert_valid=False)
@@ -273,7 +299,7 @@ def folder_parts(folder_path, as_dict=False, assert_valid=True) -> Union[dict, t
Raises
------
- ValueError
+ ALFInvalid
Invalid ALF path (assert_valid is True).
"""
if hasattr(folder_path, 'as_posix'):
@@ -293,7 +319,7 @@ def _isdatetime(s: str) -> bool:
return False
-def get_session_path(path: Union[str, Path]) -> Optional[Path]:
+def get_session_path(path: Union[str, pathlib.Path]) -> Optional[pathlib.Path]:
"""
Returns the session path from any filepath if the date/number pattern is found,
including the root directory.
@@ -314,17 +340,15 @@ def get_session_path(path: Union[str, Path]) -> Optional[Path]:
if path is None:
return
if isinstance(path, str):
- path = Path(path)
- sess = None
+ path = pathlib.Path(path)
for i, p in enumerate(path.parts):
if p.isdigit() and _isdatetime(path.parts[i - 1]):
- sess = Path().joinpath(*path.parts[:i + 1])
-
- return sess
+ return path.__class__().joinpath(*path.parts[:i + 1])
-def get_alf_path(path: Union[str, Path]) -> str:
+def get_alf_path(path: Union[str, pathlib.Path]) -> str:
"""Returns the ALF part of a path or filename.
+
Attempts to return the first valid part of the path, first searching for a session path,
then relative path (collection/revision/filename), then just the filename. If all invalid,
None is returned.
@@ -351,7 +375,7 @@ def get_alf_path(path: Union[str, Path]) -> str:
'collection/file.attr.ext'
"""
if not isinstance(path, str):
- path = Path(path).as_posix()
+ path = pathlib.Path(path).as_posix()
path = path.strip('/')
# Check if session path
@@ -397,7 +421,8 @@ def add_uuid_string(file_path, uuid):
See Also
--------
- one.alf.files.remove_uuid_string
+ one.alf.path.ALFPath.with_uuid
+ one.alf.path.remove_uuid_string
one.alf.spec.is_uuid
"""
if isinstance(uuid, str) and not spec.is_uuid_string(uuid):
@@ -405,7 +430,7 @@ def add_uuid_string(file_path, uuid):
uuid = str(uuid)
# NB: Only instantiate as Path if not already a Path, otherwise we risk changing the class
if isinstance(file_path, str):
- file_path = Path(file_path)
+ file_path = pathlib.Path(file_path)
name_parts = file_path.stem.split('.')
if spec.is_uuid(name_parts[-1]):
*name_parts, old_uuid = name_parts
@@ -428,7 +453,7 @@ def remove_uuid_string(file_path):
Returns
-------
- pathlib.Path, pathlib.PurePath
+ ALFPath, PureALFPath, pathlib.Path, pathlib.PurePath
A new Path or PurePath object without a UUID in the filename.
Examples
@@ -441,10 +466,11 @@ def remove_uuid_string(file_path):
See Also
--------
- one.alf.files.add_uuid_string
+ one.alf.path.ALFPath.without_uuid
+ one.alf.path.add_uuid_string
"""
if isinstance(file_path, str):
- file_path = Path(file_path)
+ file_path = pathlib.Path(file_path)
name_parts = file_path.stem.split('.')
if spec.is_uuid_string(name_parts[-1]):
@@ -463,7 +489,7 @@ def padded_sequence(file_path):
Returns
-------
- pathlib.Path, pathlib.PurePath
+ ALFPath, PureALFPath
The same path but with the experiment sequence folder zero-padded. If a PurePath was
passed, a PurePath will be returned, otherwise a Path object is returned.
@@ -479,8 +505,7 @@ def padded_sequence(file_path):
>>> padded_sequence(file_path)
pathlib.PurePosixPath('subject/2023-01-01/001')
"""
- if isinstance(file_path, str):
- file_path = Path(file_path)
+ file_path = ensure_alf_path(file_path)
if (session_path := get_session_path(file_path)) is None:
raise ValueError('path must include a valid ALF session path, e.g. subject/YYYY-MM-DD/N')
idx = len(file_path.parts) - len(session_path.parts)
@@ -508,6 +533,894 @@ def without_revision(file_path):
Path('/lab/Subjects/subject/2023-01-01/001/collection/obj.attr.ext')
"""
if isinstance(file_path, str):
- file_path = Path(file_path)
+ file_path = pathlib.Path(file_path)
*_, collection, revision = folder_parts(file_path.parent)
return get_session_path(file_path).joinpath(*filter(None, (collection, file_path.name)))
+
+
+class PureALFPath(pathlib.PurePath): # py3.12 supports direct subclassing
+ """Base class for manipulating Alyx file (ALF) paths without I/O.
+
+ Similar to a pathlib PurePath object but with methods for validating, parsing, and replacing
+ ALF path parts.
+
+ Parameters
+ ----------
+ args : str, pathlib.PurePath
+ One or more pathlike objects to combine into an ALF path object.
+ """
+
+ def __new__(cls, *args):
+ """Construct a ALFPurePath from one or several strings and or existing PurePath objects.
+
+ The strings and path objects are combined so as to yield a canonicalized path, which is
+ incorporated into the new PurePath object.
+ """
+ if cls is PureALFPath:
+ cls = PureWindowsALFPath if os.name == 'nt' else PurePosixALFPath
+ return cls._from_parts(args)
+
+ def is_dataset(self):
+ """Determine if path is an ALF dataset, rather than a folder.
+
+ Returns
+ -------
+ bool
+ True if filename is ALF dataset.
+ """
+ return spec.is_valid(self.name)
+
+ def is_valid_alf(path) -> bool:
+ """Check if path is a valid ALF path.
+
+ This returns true if the input path matches any part of the ALF path specification.
+ This method can be used as a static method with any pathlike input, or as an instance
+ method. This will validate both directory paths and file paths.
+
+ Parameters
+ ----------
+ path : str, pathlib.PurePath
+ A path to check the validity of.
+
+ Returns
+ -------
+ bool
+ True if the path is recognized as a valid ALF path.
+
+ Examples
+ --------
+ >>> ALFPath('/home/foo/2020-01-01/001').is_valid_alf()
+ True
+
+ >>> ALFPath('/home/foo/2020-01-01/001/alf/spikes.times.npy').is_valid_alf()
+ True
+
+ >>> ALFPath.is_valid_alf('_ibl_wheel.timestamps.npy')
+ True
+
+ >>> ALFPath.is_valid_alf('foo.bar')
+ False
+
+ See Also
+ --------
+ PureALFPath.is_dataset - Test whether file name is valid as well as directory path.
+ full_path_parts - Validates path and returns the parsed ALF path parts.
+ """
+ try:
+ return any(full_path_parts(path))
+ except ALFInvalid:
+ return False
+
+ def is_session_path(path) -> bool:
+ """Check if path is a valid ALF session path.
+
+ This returns true if the input path matches the ALF session path specification.
+ This method can be used as a static method with any pathlike input, or as an instance
+ method.
+
+ Parameters
+ ----------
+ path : str, pathlib.PurePath
+ A session path to check the validity of.
+
+ Returns
+ -------
+ bool
+ True if the path is recognized as a valid ALF session path.
+
+ Examples
+ --------
+ >>> ALFPath('/home/foo/2020-01-01/001').is_session_path()
+ True
+
+ >>> ALFPath('/home/foo/2020-01-01/001/alf/spikes.times.npy').is_session_path()
+ False
+
+ >>> ALFPath.is_session_path('_ibl_wheel.timestamps.npy')
+ False
+
+ >>> ALFPath.is_valid_alf('lab/Subjects/foo/2020-01-01/001')
+ True
+
+ See Also
+ --------
+ PureALFPath.is_valid_alf - Test whether path is generally valid a valid ALF path.
+ PureALFPath.session_path_parts - Returns parsed session path parts as tuple of str.
+ """
+ return spec.is_session_path(path)
+
+ def session_path(self):
+ """Extract the full session path.
+
+ Returns the session path from the filepath if the date/number pattern is found,
+ including the root directory.
+
+ Returns
+ -------
+ PureALFPath
+ The session path part of the input path or None if path invalid.
+
+ Examples
+ --------
+ >>> ALFPath('/mnt/sd0/Data/lab/Subjects/subject/2020-01-01/001').session_path()
+ ALFPath('/mnt/sd0/Data/lab/Subjects/subject/2020-01-01/001')
+
+ >>> ALFPath('C:\\Data\\subject\\2020-01-01\\1\\trials.intervals.npy').session_path()
+ ALFPath('C:/Data/subject/2020-01-01/1')
+
+ """
+ return get_session_path(self)
+
+ def session_path_short(self, include_lab=False) -> str:
+ """Return only the ALF session path as a posix str.
+
+ Params
+ ------
+ include_lab : bool
+ If true, the lab/subject/date/number is returned, otherwise the lab part is dropped.
+
+ Returns
+ -------
+ str
+ The session path part of the input path or None if path invalid.
+
+ Examples
+ --------
+ >>> ALFPath('/mnt/sd0/Data/lab/Subjects/subject/2020-01-01/001').session_path_short()
+ 'subject/2020-01-01/001'
+
+ >>> alfpath = ALFPath('/mnt/sd0/Data/lab/Subjects/subject/2020-01-01/001')
+ >>> alfpath.session_path_short(include_lab=True)
+ 'lab/subject/2020-01-01/001'
+
+ >>> ALFPath('C:\\Data\\subject\\2020-01-01\\1\\trials.intervals.npy').session_path_short()
+ 'subject/2020-01-01/1'
+ """
+ idx = 0 if include_lab else 1
+ if any(parts := self.session_parts[idx:]):
+ return '/'.join(parts)
+
+ def without_lab(self) -> 'PureALFPath':
+ """Return path without the /Subjects/ part.
+
+ If the /Subjects pattern is not found, the same path is returned.
+
+ Returns
+ -------
+ PureALFPath
+ The same path without the /Subjects part.
+ """
+ p = self.as_posix()
+ if m := spec.regex('{lab}/Subjects/').search(p):
+ return self.__class__(p[:m.start()], p[m.end():])
+ else:
+ return self
+
+ def relative_to_lab(self) -> 'PureALFPath':
+ """Return path relative to /Subjects/ part.
+
+ Returns
+ -------
+ PureALFPath
+ The same path, relative to the /Subjects/ part.
+
+ Raises
+ ------
+ ValueError
+ The path doesn't contain a /Subjects/ pattern.
+ """
+ p = self.as_posix()
+ if m := spec.regex('{lab}/Subjects/').search(p):
+ return self.__class__(p[m.end():])
+ else:
+ raise ValueError(f'{self} does not contain /Subjects pattern')
+
+ def relative_to_session(self):
+ """Return path relative to session part.
+
+ Returns
+ -------
+ PureALFPath
+ The same path, relative to the /Subjects/// part.
+
+ Raises
+ ------
+ ValueError
+ The path doesn't contain a /Subjects/ pattern.
+ """
+ if (session_path := self.session_path()):
+ return self.relative_to(session_path)
+ else:
+ raise ValueError(f'{self} does not contain session path pattern')
+
+ def parse_alf_path(self, as_dict=True):
+ """Parse all filename and folder parts.
+
+ Parameters
+ ----------
+ as_dict : bool
+ When true a dict of matches is returned.
+
+ Returns
+ -------
+ OrderedDict, tuple
+ A dict if as_dict is true, or a tuple of parsed values.
+
+ Examples
+ --------
+ >>> alfpath = PureALFPath(
+ ... 'lab/Subjects/subject/2020-01-01/001/collection/#revision#/'
+ ... '_namespace_obj.times_timescale.extra.foo.ext')
+ >>> alfpath.parse_alf_path()
+ {'lab': 'lab',
+ 'subject': 'subject',
+ 'date': '2020-01-01',
+ 'number': '001',
+ 'collection': 'collection',
+ 'revision': 'revision',
+ 'namespace': 'namespace',
+ 'object': 'obj',
+ 'attribute': 'times',
+ 'timescale': 'timescale',
+ 'extra': 'extra.foo',
+ 'extension': 'ext'}
+
+ >>> PureALFPath('_namespace_obj.times_timescale.extra.foo.ext').parse_alf_path()
+ (None, None, None, None, None, None, 'namespace',
+ 'obj', 'times','timescale', 'extra.foo', 'ext')
+ """
+ return full_path_parts(self, assert_valid=False, as_dict=as_dict)
+
+ def parse_alf_name(self, as_dict=True):
+ """
+ Return the parsed elements of a given ALF filename.
+
+ Parameters
+ ----------
+ as_dict : bool
+ When true a dict of matches is returned.
+
+ Returns
+ -------
+ namespace : str
+ The _namespace_ or None if not present.
+ object : str
+ ALF object.
+ attribute : str
+ The ALF attribute.
+ timescale : str
+ The ALF _timescale or None if not present.
+ extra : str
+ Any extra parts to the filename, or None if not present.
+ extension : str
+ The file extension.
+
+ Examples
+ --------
+ >>> alfpath = PureALFPath(
+ ... 'lab/Subjects/subject/2020-01-01/001/collection/#revision#/'
+ ... '_namespace_obj.times_timescale.extra.foo.ext')
+ >>> alfpath.parse_alf_name()
+ {'namespace': 'namespace',
+ 'object': 'obj',
+ 'attribute': 'times',
+ 'timescale': 'timescale',
+ 'extra': 'extra.foo',
+ 'extension': 'ext'}
+
+ >>> PureALFPath('spikes.clusters.npy', as_dict=False)
+ (None, 'spikes', 'clusters', None, None, npy)
+ """
+ return filename_parts(self.name, assert_valid=False, as_dict=as_dict)
+
+ @property
+ def dataset_name_parts(self):
+ """tuple of str: the dataset name parts, with empty strings for missing parts."""
+ return tuple(p or '' for p in self.parse_alf_name(as_dict=False))
+
+ @property
+ def session_parts(self):
+ """tuple of str: the session path parts, with empty strings for missing parts."""
+ return tuple(p or '' for p in session_path_parts(self, assert_valid=False))
+
+ @property
+ def alf_parts(self):
+ """tuple of str: the full ALF path parts, with empty strings for missing parts."""
+ return tuple(p or '' for p in self.parse_alf_path(as_dict=False))
+
+ @property
+ def namespace(self):
+ """str : The namespace part of the ALF name, or and empty str if not present."""
+ return self.dataset_name_parts[0]
+
+ @property
+ def object(self):
+ """str : The object part of the ALF name, or and empty str if not present."""
+ return self.dataset_name_parts[1]
+
+ @property
+ def attribute(self):
+ """str : The attribute part of the ALF name, or and empty str if not present."""
+ return self.dataset_name_parts[2]
+
+ @property
+ def timescale(self):
+ """str : The timescale part of the ALF name, or and empty str if not present."""
+ return self.dataset_name_parts[3]
+
+ @property
+ def extra(self):
+ """str : The extra part of the ALF name, or and empty str if not present."""
+ return self.dataset_name_parts[4]
+
+ def with_object(self, obj):
+ """Return a new path with the ALF object changed.
+
+ Parameters
+ ----------
+ obj : str
+ An ALF object name part to use.
+
+ Returns
+ -------
+ PureALFPath
+ The same file path but with the object part replaced with the input.
+
+ Raises
+ ------
+ ALFInvalid
+ The path is not a valid ALF dataset (e.g. doesn't have a three-part filename, or
+ contains invalid characters).
+ """
+ if not self.is_dataset():
+ raise ALFInvalid(str(self))
+ ns_obj, rest = self.name.split('.', 1)
+ ns, _ = spec.regex(FILE_SPEC.split('\\.')[0]).match(ns_obj).groups()
+ ns = f'_{ns}_' if ns else ''
+ return self.with_name(f'{ns}{obj}.{rest}')
+
+ def with_namespace(self, ns):
+ """Return a new path with the ALF namespace added or changed.
+
+ Parameters
+ ----------
+ namespace : str
+ An ALF namespace part to use.
+
+ Returns
+ -------
+ PureALFPath
+ The same file path but with the namespace part added/replaced with the input.
+
+ Raises
+ ------
+ ALFInvalid
+ The path is not a valid ALF dataset (e.g. doesn't have a three-part filename, or
+ contains invalid characters).
+ """
+ if not self.is_dataset():
+ raise ALFInvalid(self)
+ ns_obj, rest = self.name.split('.', 1)
+ _, obj = spec.regex(FILE_SPEC.split('\\.')[0]).match(ns_obj).groups()
+ ns = f'_{ns}_' if ns else ''
+ return self.with_name(f'{ns}{obj}.{rest}')
+
+ def with_attribute(self, attr):
+ """Return a new path with the ALF attribute changed.
+
+ Parameters
+ ----------
+ attribute : str
+ An ALF attribute part to use.
+
+ Returns
+ -------
+ PureALFPath
+ The same file path but with the attribute part replaced with the input.
+
+ Raises
+ ------
+ ALFInvalid
+ The path is not a valid ALF dataset (e.g. doesn't have a three-part filename, or
+ contains invalid characters).
+ """
+ if not self.is_dataset():
+ raise ALFInvalid(self)
+ ns_obj, attr_ts, rest = self.name.split('.', 2)
+ _, ts = spec.regex('{attribute}(?:_{timescale})?').match(attr_ts).groups()
+ ts = f'_{ts}' if ts else ''
+ return self.with_name(f'{ns_obj}.{attr}{ts}.{rest}')
+
+ def with_timescale(self, timescale):
+ """Return a new path with the ALF timescale added or changed.
+
+ Parameters
+ ----------
+ timescale : str
+ An ALF timescale part to use.
+
+ Returns
+ -------
+ PureALFPath
+ The same file path but with the timescale part added/replaced with the input.
+
+ Raises
+ ------
+ ALFInvalid
+ The path is not a valid ALF dataset (e.g. doesn't have a three-part filename, or
+ contains invalid characters).
+ """
+ if not self.is_dataset():
+ raise ALFInvalid(self)
+ ns_obj, attr_ts, rest = self.name.split('.', 2)
+ attr, _ = spec.regex('{attribute}(?:_{timescale})?').match(attr_ts).groups()
+ ts = f'_{timescale}' if timescale else ''
+ return self.with_name(f'{ns_obj}.{attr}{ts}.{rest}')
+
+ def with_extra(self, extra, append=False):
+ """Return a new path with extra ALF parts added or changed.
+
+ Parameters
+ ----------
+ extra : str, list of str
+ Extra ALF parts to add/replace.
+ append : bool
+ When false (default) any existing extra parts are replaced instead of added to.
+
+ Returns
+ -------
+ PureALFPath
+ The same file path but with the extra part(s) replaced or appended to with the input.
+
+ Raises
+ ------
+ ALFInvalid
+ The path is not a valid ALF dataset (e.g. doesn't have a three-part filename, or
+ contains invalid characters).
+ """
+ if not self.is_dataset():
+ raise ALFInvalid(self)
+ parts = self.stem.split('.', 2)
+ if isinstance(extra, str):
+ extra = extra.strip('.').split('.')
+ if (prev := parts.pop() if len(parts) > 2 else None) and append:
+ extra = (prev, *extra)
+ obj_attr = '.'.join(parts)
+ if extra := '.'.join(filter(None, extra)):
+ return self.with_stem(f'{obj_attr}.{extra}')
+ else:
+ return self.with_stem(obj_attr)
+
+ def with_extension(self, ext):
+ """Return a new path with the ALF extension (suffix) changed.
+
+ Note that unlike PurePath's `with_suffix` method, this asserts that the filename is a valid
+ ALF dataset and the `ext` argument should be without the period.
+
+ Parameters
+ ----------
+ ext : str
+ An ALF extension part to use (sans period).
+
+ Returns
+ -------
+ PureALFPath
+ The same file path but with the extension part replaced with the input.
+
+ Raises
+ ------
+ ALFInvalid
+ The path is not a valid ALF dataset (e.g. doesn't have a three-part filename, or
+ contains invalid characters).
+ """
+ if not self.is_dataset():
+ raise ALFInvalid(str(self))
+ return self.with_suffix(f'.{ext}')
+
+ def with_padded_sequence(path):
+ """Ensures a file path contains a zero-padded experiment sequence folder.
+
+ Parameters
+ ----------
+ path : str pathlib.PurePath
+ A session or file path to convert.
+
+ Returns
+ -------
+ ALFPath, PureALFPath
+ The same path but with the experiment sequence folder zero-padded. If a PurePath was
+ passed, a PurePath will be returned, otherwise a Path object is returned.
+
+ Examples
+ --------
+ Supports calling as static function
+
+ >>> file_path = '/iblrigdata/subject/2023-01-01/1/_ibl_experiment.description.yaml'
+ >>> ALFPath.with_padded_sequence(file_path)
+ ALFPath('/iblrigdata/subject/2023-01-01/001/_ibl_experiment.description.yaml')
+
+ Supports folders and will not affect already padded paths
+
+ >>> ALFPath('subject/2023-01-01/001').with_padded_sequence(file_path)
+ ALFPath('subject/2023-01-01/001')
+ """
+ return padded_sequence(path)
+
+ def with_revision(self, revision):
+ """Return a new path with the ALF revision part added/changed.
+
+ Parameters
+ ----------
+ revision : str
+ An ALF revision part to use (NB: do not include the pound sign '#').
+
+ Returns
+ -------
+ PureALFPath
+ The same file path but with the revision part added or replaced with the input.
+
+ Examples
+ --------
+ If not in the ALF path, one will be added
+
+ >>> ALFPath('/subject/2023-01-01/1/alf/obj.attr.ext').with_revision('revision')
+ ALFPath('/subject/2023-01-01/1/alf/#xxx#/obj.attr.ext')
+
+ If a revision is already in the ALF path it will be replaced
+
+ >>> ALFPath('/subject/2023-01-01/1/alf/#revision#/obj.attr.ext').with_revision('xxx')
+ ALFPath('/subject/2023-01-01/1/alf/#xxx#/obj.attr.ext')
+
+ Raises
+ ------
+ ALFInvalid
+ The ALF path is not valid or is relative to the session path. The path must include
+ the session parts otherwise the path is too ambiguous to determine validity.
+ ALFInvalid
+ The revision provided does not match the ALF specification pattern.
+
+ See Also
+ --------
+ PureALFPath.without_revision
+ """
+ # Validate the revision input
+ revision, = _path_parts(revision, '^{revision}$', match=True, assert_valid=True)
+ if PureALFPath.is_dataset(self):
+ return self.without_revision().parent / f'#{revision}#' / self.name
+ else:
+ return self.without_revision() / f'#{revision}#'
+
+ def without_revision(self):
+ """Return a new path with the ALF revision part removed.
+
+ Returns
+ -------
+ PureALFPath
+ The same file path but with the revision part removed.
+
+ Examples
+ --------
+ If not in the ALF path, no change occurs
+
+ >>> ALFPath('/subject/2023-01-01/1/alf/obj.attr.ext').with_revision('revision')
+ ALFPath('/subject/2023-01-01/1/alf/obj.attr.ext')
+
+ If a revision is in the ALF path it will be removed
+
+ >>> ALFPath('/subject/2023-01-01/1/alf/#revision#/obj.attr.ext').without_revision()
+ ALFPath('/subject/2023-01-01/1/alf/obj.attr.ext')
+
+ Raises
+ ------
+ ALFInvalid
+ The ALF path is not valid or is relative to the session path. The path must include
+ the session parts otherwise the path is too ambiguous to determine validity.
+
+ See Also
+ --------
+ PureALFPath.with_revision
+ """
+ if PureALFPath.is_dataset(self):
+ # Is a file path (rather than folder path)
+ return without_revision(self)
+ if not self.is_valid_alf():
+ raise ALFInvalid(f'{self} not a valid ALF path or is relative to session')
+ elif spec.regex('^#{revision}#$').match(self.name):
+ # Includes revision
+ return self.parent
+ else:
+ # Does not include revision
+ return self
+
+ def with_uuid(self, uuid):
+ """Return a new path with the ALF UUID part added/changed.
+
+ Parameters
+ ----------
+ uuid : str, uuid.UUID
+ The UUID to add.
+
+ Returns
+ -------
+ PureALFPath
+ A new ALFPath object with a UUID in the filename.
+
+ Examples
+ --------
+ >>> uuid = 'a976e418-c8b8-4d24-be47-d05120b18341'
+ >>> ALFPath('/path/to/trials.intervals.npy').with_uuid(uuid)
+ ALFPath('/path/to/trials.intervals.a976e418-c8b8-4d24-be47-d05120b18341.npy')
+
+ Raises
+ ------
+ ValueError
+ `uuid` must be a valid hyphen-separated hexadecimal UUID.
+ ALFInvalid
+ Path is not a valid ALF file path.
+ """
+ if not self.is_dataset():
+ raise ALFInvalid(f'{self} is not a valid ALF dataset file path')
+ return add_uuid_string(self, uuid)
+
+ def without_uuid(self):
+ """Return a new path with the ALF UUID part removed.
+
+ Returns
+ -------
+ PureALFPath
+ A new ALFPath object with a UUID removed from the filename, if present.
+
+ Examples
+ --------
+ >>> alfpath = ALFPath('/path/to/trials.intervals.a976e418-c8b8-4d24-be47-d05120b18341.npy')
+ >>> alfpath.without_uuid(uuid)
+ ALFPath('/path/to/trials.intervals.npy')
+
+ >>> ALFPath('/path/to/trials.intervals.npy').without_uuid(uuid)
+ ALFPath('/path/to/trials.intervals.npy')
+ """
+ return remove_uuid_string(self) if self.is_dataset() else self
+
+
+class ALFPath(pathlib.Path, PureALFPath):
+ """Base class for manipulating Alyx file (ALF) paths with system calls.
+
+ Similar to a pathlib Path object but with methods for validating, parsing, and replacing ALF
+ path parts. This class also contains methods that work on system files.
+
+ Parameters
+ ----------
+ args : str, pathlib.PurePath
+ One or more pathlike objects to combine into an ALF path object.
+ """
+
+ def __new__(cls, *args):
+ """Construct a ALFPurePath from one or several strings and or existing PurePath objects.
+
+ The strings and path objects are combined so as to yield a canonicalized path, which is
+ incorporated into the new PurePath object.
+ """
+ if cls is ALFPath:
+ cls = WindowsALFPath if os.name == 'nt' else PosixALFPath
+ self = cls._from_parts(args, init=False)
+ if not self._flavour.is_supported:
+ raise NotImplementedError(
+ f'cannot instantiate {cls.__name__} on your system')
+ self._init()
+ return self
+
+ def is_dataset(self) -> bool:
+ """Determine if path is an ALF dataset, rather than a folder.
+
+ Unlike pathlib and PureALFPath methods, this will return False if the path exists but
+ is a folder, otherwise this simply tests the path name, whether it exists or not.
+
+ Returns
+ -------
+ bool
+ True if filename is ALF dataset.
+ """
+ return not self.is_dir() and spec.is_valid(self.name)
+
+ def is_session_path(self) -> bool:
+ """Check if path is a valid ALF session path.
+
+ This returns true if the input path matches the ALF session path specification.
+ This method can be used as a static method with any pathlike input, or as an instance
+ method.
+
+ Unlike the PureALFPath method, this will return false if the path matches but is in fact
+ a file on disk.
+
+ Parameters
+ ----------
+ path : str, pathlib.PurePath
+ A session path to check the validity of.
+
+ Returns
+ -------
+ bool
+ True if the path is recognized as a valid ALF session path.
+
+ Examples
+ --------
+ >>> ALFPath('/home/foo/2020-01-01/001').is_session_path()
+ True
+
+ >>> ALFPath('/home/foo/2020-01-01/001/alf/spikes.times.npy').is_session_path()
+ False
+
+ >>> ALFPath.is_session_path('_ibl_wheel.timestamps.npy')
+ False
+
+ >>> ALFPath.is_valid_alf('lab/Subjects/foo/2020-01-01/001')
+ True
+
+ See Also
+ --------
+ PureALFPath.is_valid_alf - Test whether path is generally valid a valid ALF path.
+ PureALFPath.session_path_parts - Returns parsed session path parts as tuple of str.
+ """
+ return not self.is_file() and spec.is_session_path(self)
+
+ def is_valid_alf(path) -> bool:
+ """Check if path is a valid ALF path.
+
+ This returns true if the input path matches any part of the ALF path specification.
+ This method can be used as a static method with any pathlike input, or as an instance
+ method. This will validate both directory paths and file paths.
+
+ Unlike the PureALFPath method, this one will return false if the path matches a dataset
+ file pattern but is actually a folder on disk, or if the path matches as a file but is
+ is a folder on disk.
+
+ Parameters
+ ----------
+ path : str, pathlib.PurePath
+ A path to check the validity of.
+
+ Returns
+ -------
+ bool
+ True if the path is recognized as a valid ALF path.
+
+ Examples
+ --------
+ >>> ALFPath('/home/foo/2020-01-01/001').is_valid_alf()
+ True
+
+ >>> ALFPath('/home/foo/2020-01-01/001/alf/spikes.times.npy').is_valid_alf()
+ True
+
+ >>> ALFPath.is_valid_alf('_ibl_wheel.timestamps.npy')
+ True
+
+ >>> ALFPath.is_valid_alf('foo.bar')
+ False
+
+ See Also
+ --------
+ PureALFPath.is_dataset - Test whether file name is valid as well as directory path.
+ full_path_parts - Validates path and returns the parsed ALF path parts.
+ """
+ try:
+ parsed = full_path_parts(path, as_dict=True)
+ except ALFInvalid:
+ return False
+ is_dataset = parsed['object'] is not None
+ if isinstance(path, str):
+ path = ALFPath(path)
+ if hasattr(path, 'is_file') and path.is_file():
+ return is_dataset
+ elif hasattr(path, 'is_dir') and path.is_dir():
+ return not is_dataset
+ return True
+
+ def iter_datasets(self, recursive=False):
+ """
+ Iterate over all files in path, and yield relative dataset paths.
+
+ Parameters
+ ----------
+ recursive : bool
+ If true, yield datasets in subdirectories.
+
+ Yields
+ -------
+ ALFPath
+ The next valid dataset path in lexicographical order.
+
+ See Also
+ --------
+ one.alf.io.iter_datasets - Equivalent function that can take any pathlike input and returns
+ paths relative to the input path.
+ """
+ glob = self.rglob if recursive else self.glob
+ for p in sorted(glob('*.*.*')):
+ if not p.is_dir() and p.is_dataset:
+ yield p
+
+
+class PureWindowsALFPath(pathlib.PureWindowsPath, PureALFPath):
+ """PureALFPath subclass for Windows systems."""
+ pass
+
+
+class PurePosixALFPath(pathlib.PurePosixPath, PureALFPath):
+ """PureALFPath subclass for non-Windows systems."""
+ pass
+
+
+class WindowsALFPath(pathlib.WindowsPath, ALFPath):
+ """ALFPath subclass for Windows systems."""
+ pass
+
+
+class PosixALFPath(pathlib.PosixPath, ALFPath):
+ """ALFPath subclass for non-Windows systems."""
+ pass
+
+
+def ensure_alf_path(path) -> Listable(PureALFPath):
+ """Ensure path is a PureALFPath instance.
+
+ Ensures the path entered is cast to a PureALFPath instance. If input class is PureALFPath or
+ pathlib.PurePath, a PureALFPath instance is returned, otherwise an ALFPath instance is
+ returned.
+
+ Parameters
+ ----------
+ path : str, pathlib.PurePath, ALFPath, iterable
+ One or more path-like objects.
+
+ Returns
+ -------
+ ALFPath, PureALFPath, list of ALFPath, list of PureALFPath
+ One or more ALFPath objects.
+
+ Raises
+ ------
+ TypeError
+ Unexpected path instance; input must be a str or pathlib.PurePath instance, or an
+ iterable thereof.
+ """
+ if isinstance(path, PureALFPath):
+ # Already an ALFPath instance
+ return path
+ if isinstance(path, pathlib.PurePath):
+ # Cast pathlib instance to equivalent ALFPath
+ if isinstance(path, pathlib.Path):
+ return ALFPath(path)
+ elif isinstance(path, pathlib.PurePosixPath):
+ return PurePosixALFPath(path)
+ elif isinstance(path, pathlib.PureWindowsPath):
+ return PureWindowsALFPath(path)
+ else:
+ return PureALFPath(path)
+ if isinstance(path, str):
+ # Cast str to ALFPath
+ return ALFPath(path)
+ if isinstance(path, Iterable):
+ # Cast list, generator, tuple, etc. to list of ALFPath
+ return list(map(ensure_alf_path, path))
+ raise TypeError(f'expected os.PathLike type, got {type(path)} instead')
diff --git a/one/api.py b/one/api.py
index 9a25a62c..422c1346 100644
--- a/one/api.py
+++ b/one/api.py
@@ -28,6 +28,7 @@
import one.alf.io as alfio
import one.alf.path as alfiles
import one.alf.exceptions as alferr
+from one.alf.path import ALFPath
from .alf.cache import (
make_parquet_db, patch_cache, remove_cache_table_files,
EMPTY_DATASETS_FRAME, EMPTY_SESSIONS_FRAME)
@@ -376,7 +377,7 @@ def save_loaded_ids(self, sessions_only=False, clear_list=True):
self._cache['_loaded_datasets'] = np.array([])
return ids, filename
- def _download_datasets(self, dsets, **kwargs) -> List[Path]:
+ def _download_datasets(self, dsets, **kwargs) -> List[ALFPath]:
"""
Download several datasets given a set of datasets.
@@ -385,31 +386,31 @@ def _download_datasets(self, dsets, **kwargs) -> List[Path]:
Parameters
----------
dsets : list
- List of dataset dictionaries from an Alyx REST query OR URL strings
+ List of dataset dictionaries from an Alyx REST query OR URL strings.
Returns
-------
- list of pathlib.Path
- A local file path list
+ list of one.alf.path.ALFPath
+ A local file path list.
"""
# Looking to entirely remove method
pass
- def _download_dataset(self, dset, cache_dir=None, **kwargs) -> Path:
+ def _download_dataset(self, dset, cache_dir=None, **kwargs) -> ALFPath:
"""
- Download a dataset from an Alyx REST dictionary
+ Download a dataset from an Alyx REST dictionary.
Parameters
----------
dset : pandas.Series, dict, str
- A single dataset dictionary from an Alyx REST query OR URL string
+ A single dataset dictionary from an Alyx REST query OR URL string.
cache_dir : str, pathlib.Path
- The root directory to save the data in (home/downloads by default)
+ The root directory to save the data in (home/downloads by default).
Returns
-------
- pathlib.Path
- The local file path
+ one.alf.path.ALFPath
+ The local file path.
"""
pass # pragma: no cover
@@ -585,19 +586,19 @@ def _check_filesystem(self, datasets, offline=None, update_exists=True, check_ha
Parameters
----------
datasets : pandas.Series, pandas.DataFrame, list of dicts
- A list or DataFrame of dataset records
+ A list or DataFrame of dataset records.
offline : bool, None
If false and Web client present, downloads the missing datasets from a remote
- repository
+ repository.
update_exists : bool
- If true, the cache is updated to reflect the filesystem
+ If true, the cache is updated to reflect the filesystem.
check_hash : bool
Consider dataset missing if local file hash does not match. In online mode, the dataset
will be re-downloaded.
Returns
-------
- A list of file paths for the datasets (None elements for non-existent datasets)
+ A list of one.alf.path.ALFPath for the datasets (None elements for non-existent datasets).
"""
if isinstance(datasets, pd.Series):
datasets = pd.DataFrame([datasets])
@@ -632,9 +633,9 @@ def _check_filesystem(self, datasets, offline=None, update_exists=True, check_ha
# First go through datasets and check if file exists and hash matches
for i, rec in datasets.iterrows():
- file = Path(self.cache_dir, *rec[['session_path', 'rel_path']])
+ file = ALFPath(self.cache_dir, *rec[['session_path', 'rel_path']])
if self.uuid_filenames:
- file = alfiles.add_uuid_string(file, i[1] if isinstance(i, tuple) else i)
+ file = file.with_uuid(i[1] if isinstance(i, tuple) else i)
if file.exists():
# Check if there's a hash mismatch
# If so, add this index to list of datasets that need downloading
@@ -986,7 +987,7 @@ def load_object(self,
query_type: Optional[str] = None,
download_only: bool = False,
check_hash: bool = True,
- **kwargs) -> Union[alfio.AlfBunch, List[Path]]:
+ **kwargs) -> Union[alfio.AlfBunch, List[ALFPath]]:
"""
Load all attributes of an ALF object from a Session ID and an object name.
@@ -1023,7 +1024,7 @@ def load_object(self,
Returns
-------
one.alf.io.AlfBunch, list
- An ALF bunch or if download_only is True, a list of Paths objects.
+ An ALF bunch or if download_only is True, a list of one.alf.path.ALFPath objects.
Examples
--------
@@ -1112,8 +1113,8 @@ def load_dataset(self,
Returns
-------
- np.ndarray, pathlib.Path
- Dataset or a Path object if download_only is true.
+ np.ndarray, one.alf.path.ALFPath
+ Dataset or a ALFPath object if download_only is true.
Examples
--------
@@ -1411,7 +1412,7 @@ def load_dataset_from_id(self,
Returns
-------
- np.ndarray, pathlib.Path
+ np.ndarray, one.alf.path.ALFPath
Dataset data (or filepath if download_only) and dataset record if details is True.
"""
if isinstance(dset_id, UUID):
@@ -1445,7 +1446,7 @@ def load_collection(self,
query_type: Optional[str] = None,
download_only: bool = False,
check_hash: bool = True,
- **kwargs) -> Union[Bunch, List[Path]]:
+ **kwargs) -> Union[Bunch, List[ALFPath]]:
"""
Load all objects in an ALF collection from a Session ID. Any datasets with matching object
name(s) will be loaded. Returns a bunch of objects.
@@ -1479,8 +1480,8 @@ def load_collection(self,
Returns
-------
- Bunch of one.alf.io.AlfBunch, list of pathlib.Path
- A Bunch of objects or if download_only is True, a list of Paths objects
+ Bunch of one.alf.io.AlfBunch, list of one.alf.path.ALFPath
+ A Bunch of objects or if download_only is True, a list of ALFPath objects.
Examples
--------
@@ -1972,8 +1973,8 @@ def load_aggregate(self, relation: str, identifier: str,
Returns
-------
- pandas.DataFrame, pathlib.Path
- Dataset or a Path object if download_only is true.
+ pandas.DataFrame, one.alf.path.ALFPath
+ Dataset or a ALFPath object if download_only is true.
Raises
------
@@ -2340,7 +2341,7 @@ def _update_sessions_table(self, session_records):
df = pd.DataFrame(next(zip(*map(ses2records, session_records))))
return self._update_cache_from_records(sessions=df)
- def _download_datasets(self, dsets, **kwargs) -> List[Path]:
+ def _download_datasets(self, dsets, **kwargs) -> List[ALFPath]:
"""
Download a single or multitude of datasets if stored on AWS, otherwise calls
OneAlyx._download_dataset.
@@ -2355,7 +2356,7 @@ def _download_datasets(self, dsets, **kwargs) -> List[Path]:
Returns
-------
- list of pathlib.Path
+ list of one.alf.path.ALFPath
A list of local file paths.
"""
# determine whether to remove the UUID after download, this may be overridden by user
@@ -2375,7 +2376,7 @@ def _download_datasets(self, dsets, **kwargs) -> List[Path]:
_logger.debug(ex)
return self._download_dataset(dsets, **kwargs)
- def _download_aws(self, dsets, update_exists=True, keep_uuid=None, **_) -> List[Path]:
+ def _download_aws(self, dsets, update_exists=True, keep_uuid=None, **_) -> List[ALFPath]:
"""
Download datasets from an AWS S3 instance using boto3.
@@ -2392,7 +2393,7 @@ def _download_aws(self, dsets, update_exists=True, keep_uuid=None, **_) -> List[
Returns
-------
- list of pathlib.Path
+ list of one.alf.path.ALFPath
A list the length of `dsets` of downloaded dataset file paths. Missing datasets are
returned as None.
@@ -2438,7 +2439,7 @@ def _download_aws(self, dsets, update_exists=True, keep_uuid=None, **_) -> List[
local_path.parent.mkdir(exist_ok=True, parents=True)
out_files.append(aws.s3_download_file(
source_path, local_path, s3=s3, bucket_name=bucket_name, overwrite=update_exists))
- return out_files
+ return [ALFPath(x) if x else x for x in out_files]
def _dset2url(self, dset, update_cache=True):
"""
@@ -2500,7 +2501,8 @@ def _dset2url(self, dset, update_cache=True):
return url
- def _download_dataset(self, dset, cache_dir=None, update_cache=True, **kwargs) -> List[Path]:
+ def _download_dataset(
+ self, dset, cache_dir=None, update_cache=True, **kwargs) -> List[ALFPath]:
"""
Download a single or multitude of dataset from an Alyx REST dictionary.
@@ -2517,7 +2519,7 @@ def _download_dataset(self, dset, cache_dir=None, update_cache=True, **kwargs) -
Returns
-------
- list of pathlib.Path
+ list of one.alf.path.ALFPath
A local file path or list of paths.
"""
cache_dir = cache_dir or self.cache_dir
@@ -2538,7 +2540,7 @@ def _download_dataset(self, dset, cache_dir=None, update_cache=True, **kwargs) -
target_dir.append(str(Path(cache_dir, alfiles.get_alf_path(_path)).parent))
files = self._download_file(valid_urls, target_dir, **kwargs)
# Return list of file paths or None if we failed to extract URL from dataset
- return [None if not x else files.pop(0) for x in url]
+ return [None if not x else ALFPath(files.pop(0)) for x in url]
def _tag_mismatched_file_record(self, url):
fr = self.alyx.rest('files', 'list',
@@ -2579,7 +2581,7 @@ def _download_file(self, url, target_dir, keep_uuid=None, file_size=None, hash=N
Returns
-------
- pathlib.Path or list of pathlib.Path
+ one.alf.path.ALFPath or list of one.alf.path.ALFPath
The file path of the downloaded file or files.
Example
@@ -2670,7 +2672,7 @@ def setup(base_url=None, **kwargs):
@util.refresh
@util.parse_id
- def eid2path(self, eid, query_type=None) -> Listable(Path):
+ def eid2path(self, eid, query_type=None) -> Listable(ALFPath):
"""
From an experiment ID gets the local session path
@@ -2680,12 +2682,12 @@ def eid2path(self, eid, query_type=None) -> Listable(Path):
Experiment session identifier; may be a UUID, URL, experiment reference string
details dict or Path.
query_type : str
- If set to 'remote', will force database connection
+ If set to 'remote', will force database connection.
Returns
-------
- pathlib.Path, list
- A session path or list of session paths
+ one.alf.path.ALFPath, list
+ A session path or list of session paths.
"""
# first try avoid hitting the database
mode = query_type or self.mode
@@ -2704,12 +2706,12 @@ def eid2path(self, eid, query_type=None) -> Listable(Path):
if len(ses) == 0:
return None
else:
- return Path(self.cache_dir).joinpath(
+ return ALFPath(self.cache_dir).joinpath(
ses[0]['lab'], 'Subjects', ses[0]['subject'], ses[0]['start_time'][:10],
str(ses[0]['number']).zfill(3))
@util.refresh
- def path2eid(self, path_obj: Union[str, Path], query_type=None) -> Listable(Path):
+ def path2eid(self, path_obj: Union[str, Path], query_type=None) -> Listable(str):
"""
From a local path, gets the experiment ID
@@ -2727,14 +2729,13 @@ def path2eid(self, path_obj: Union[str, Path], query_type=None) -> Listable(Path
"""
# If path_obj is a list recurse through it and return a list
if isinstance(path_obj, list):
- path_obj = [Path(x) for x in path_obj]
eid_list = []
unwrapped = unwrap(self.path2eid)
for p in path_obj:
eid_list.append(unwrapped(self, p))
return eid_list
- # else ensure the path ends with mouse,date, number
- path_obj = Path(path_obj)
+ # else ensure the path ends with mouse, date, number
+ path_obj = ALFPath(path_obj)
# try the cached info to possibly avoid hitting database
mode = query_type or self.mode
@@ -2743,7 +2744,7 @@ def path2eid(self, path_obj: Union[str, Path], query_type=None) -> Listable(Path
if cache_eid or mode == 'local':
return cache_eid
- session_path = alfiles.get_session_path(path_obj)
+ session_path = path_obj.session_path()
# if path does not have a date and a number return None
if session_path is None:
return None
diff --git a/one/converters.py b/one/converters.py
index 256840a7..6879d0c3 100644
--- a/one/converters.py
+++ b/one/converters.py
@@ -13,7 +13,7 @@
import urllib.parse
from uuid import UUID
from inspect import unwrap
-from pathlib import Path, PurePosixPath
+from pathlib import Path
from typing import Optional, Union, Mapping, List, Iterable as Iter
import pandas as pd
@@ -22,7 +22,7 @@
from one.alf.spec import is_session_path, is_uuid_string
from one.alf.cache import EMPTY_DATASETS_FRAME
from one.alf.path import (
- get_session_path, add_uuid_string, session_path_parts, get_alf_path, remove_uuid_string)
+ ALFPath, PurePosixALFPath, ensure_alf_path, get_session_path, get_alf_path, remove_uuid_string)
def recurse(func):
@@ -152,19 +152,19 @@ def to_eid(self,
raise ValueError('Unrecognized experiment ID')
@recurse
- def eid2path(self, eid: str) -> Optional[Listable(Path)]:
+ def eid2path(self, eid: str) -> Optional[Listable(ALFPath)]:
"""
From an experiment id or a list of experiment ids, gets the local cache path.
Parameters
----------
eid : str, uuid.UUID
- Experiment ID (UUID) or list of UUIDs
+ Experiment ID (UUID) or list of UUIDs.
Returns
-------
- pathlib.Path
- A session path
+ one.alf.path.ALFPath
+ A session path.
"""
# If not valid return None
if not is_uuid_string(eid):
@@ -220,24 +220,25 @@ def path2eid(self, path_obj):
def path2record(self, path) -> pd.Series:
"""Convert a file or session path to a dataset or session cache record.
- NB: Assumes /Subjects/// pattern
+ NB: Assumes /Subjects/// pattern.
Parameters
----------
path : str, pathlib.Path
- Local path or HTTP URL
+ Local path or HTTP URL.
Returns
-------
pandas.Series
- A cache file record
+ A cache file record.
"""
+ path = ALFPath(path)
is_session = is_session_path(path)
if self._cache['sessions' if is_session else 'datasets'].empty:
return # short circuit: no records in the cache
if is_session_path(path):
- lab, subject, date, number = session_path_parts(path)
+ lab, subject, date, number = path.session_parts
df = self._cache['sessions']
rec = df[
(df['lab'] == lab) & (df['subject'] == subject) &
@@ -246,9 +247,6 @@ def path2record(self, path) -> pd.Series:
]
return None if rec.empty else rec.squeeze()
- # Deal with dataset path
- if isinstance(path, str):
- path = Path(path)
# If there's a UUID in the path, use that to fetch the record
name_parts = path.stem.split('.')
if is_uuid_string(uuid := name_parts[-1]):
@@ -264,7 +262,7 @@ def path2record(self, path) -> pd.Series:
return
# Find row where relative path matches
- rec = df[df['rel_path'] == path.relative_to(get_session_path(path)).as_posix()]
+ rec = df[df['rel_path'] == path.relative_to_session().as_posix()]
assert len(rec) < 2, 'Multiple records found'
if rec.empty:
return None
@@ -324,22 +322,22 @@ def record2url(self, record):
assert isinstance(record.name, tuple) and len(record.name) == 2
eid, uuid = record.name # must be (eid, did)
session_path = self.eid2path(eid)
- url = PurePosixPath(get_alf_path(session_path), record['rel_path'])
- return webclient.rel_path2url(add_uuid_string(url, uuid).as_posix())
+ url = PurePosixALFPath(get_alf_path(session_path), record['rel_path'])
+ return webclient.rel_path2url(url.with_uuid(uuid).as_posix())
- def record2path(self, dataset) -> Optional[Path]:
+ def record2path(self, dataset) -> Optional[ALFPath]:
"""
- Given a set of dataset records, returns the corresponding paths
+ Given a set of dataset records, returns the corresponding paths.
Parameters
----------
dataset : pd.DataFrame, pd.Series
- A datasets dataframe slice
+ A datasets dataframe slice.
Returns
-------
- pathlib.Path
- File path for the record
+ one.alf.path.ALFPath
+ File path for the record.
"""
if isinstance(dataset, pd.DataFrame):
return [self.record2path(r) for _, r in dataset.iterrows()]
@@ -352,7 +350,7 @@ def record2path(self, dataset) -> Optional[Path]:
raise ValueError(f'Failed to determine session path for eid "{eid}"')
file = session_path / dataset['rel_path']
if self.uuid_filenames:
- file = add_uuid_string(file, uuid)
+ file = file.with_uuid(uuid)
return file
@recurse
@@ -452,7 +450,7 @@ def ref2path(self, ref):
Returns
-------
- pathlib.Path
+ one.alf.path.ALFPath
Path object(s) for the experiment session(s).
Examples
@@ -629,13 +627,13 @@ def one_path_from_dataset(dset, one_cache):
Returns
-------
- pathlib.Path
+ one.alf.path.ALFPath
The local path for a given dataset.
"""
return path_from_dataset(dset, root_path=one_cache, uuid=False)
-def path_from_dataset(dset, root_path=PurePosixPath('/'), repository=None, uuid=False):
+def path_from_dataset(dset, root_path=PurePosixALFPath('/'), repository=None, uuid=False):
"""
Returns the local file path from a dset record from a REST query.
Unlike `to_eid`, this function does not require ONE, and the dataset may not exist.
@@ -654,7 +652,7 @@ def path_from_dataset(dset, root_path=PurePosixPath('/'), repository=None, uuid=
Returns
-------
- pathlib.Path, list
+ one.alf.path.ALFPath, list
File path or list of paths.
"""
if isinstance(dset, list):
@@ -667,11 +665,12 @@ def path_from_dataset(dset, root_path=PurePosixPath('/'), repository=None, uuid=
return path_from_filerecord(fr, root_path=root_path, uuid=uuid)
-def path_from_filerecord(fr, root_path=PurePosixPath('/'), uuid=None):
+def path_from_filerecord(fr, root_path=PurePosixALFPath('/'), uuid=None):
"""
- Returns a data file Path constructed from an Alyx file record. The Path type returned
- depends on the type of root_path: If root_path is a string a Path object is returned,
- otherwise if the root_path is a PurePath, the same path type is returned.
+ Returns a data file Path constructed from an Alyx file record.
+
+ The Path type returned depends on the type of root_path: If root_path is a string an ALFPath
+ object is returned, otherwise if the root_path is a PurePath, a PureALFPath is returned.
Parameters
----------
@@ -684,21 +683,18 @@ def path_from_filerecord(fr, root_path=PurePosixPath('/'), uuid=None):
Returns
-------
- pathlib.Path
+ one.alf.path.ALFPath
A filepath as a pathlib object.
"""
if isinstance(fr, list):
return [path_from_filerecord(f) for f in fr]
repo_path = (p := fr['data_repository_path'])[p[0] == '/':] # Remove slash at start, if any
- file_path = PurePosixPath(repo_path, fr['relative_path'])
+ file_path = PurePosixALFPath(repo_path, fr['relative_path'])
if root_path:
- # NB: By checking for string we won't cast any PurePaths
- if isinstance(root_path, str):
- root_path = Path(root_path)
+ # NB: this function won't cast any PurePaths
+ root_path = ensure_alf_path(root_path)
file_path = root_path / file_path
- if uuid:
- file_path = add_uuid_string(file_path, uuid)
- return file_path
+ return file_path.with_uuid(uuid) if uuid else file_path
def session_record2path(session, root_dir=None):
@@ -717,7 +713,7 @@ def session_record2path(session, root_dir=None):
Returns
-------
- pathlib.Path, Pathlib.PurePath
+ one.alf.path.ALFPath, one.alf.path.PureALFPath
A constructed path of the session.
Examples
@@ -730,16 +726,14 @@ def session_record2path(session, root_dir=None):
>>> session_record2path(record, Path('/home/user'))
Path('/home/user/foo/Subjects/ALK01/2020-01-01/001')
"""
- rel_path = PurePosixPath(
+ rel_path = PurePosixALFPath(
session.get('lab') if session.get('lab') else '',
'Subjects' if session.get('lab') else '',
session['subject'], str(session['date']), str(session['number']).zfill(3)
)
if not root_dir:
return rel_path
- elif isinstance(root_dir, str):
- root_dir = Path(root_dir)
- return Path(root_dir).joinpath(rel_path)
+ return ensure_alf_path(root_dir).joinpath(rel_path)
def ses2records(ses: dict):
diff --git a/one/registration.py b/one/registration.py
index 58588da4..ea955349 100644
--- a/one/registration.py
+++ b/one/registration.py
@@ -26,7 +26,7 @@
from iblutil.util import Bunch, ensure_list
import one.alf.io as alfio
-from one.alf.path import session_path_parts, get_session_path, folder_parts, filename_parts
+from one.alf.path import ALFPath, session_path_parts, ensure_alf_path, folder_parts
from one.alf.spec import is_valid
import one.alf.exceptions as alferr
from one.api import ONE
@@ -61,14 +61,13 @@ def get_dataset_type(filename, dtypes):
filename matches multiple dataset types
"""
dataset_types = []
- if isinstance(filename, str):
- filename = PurePosixPath(filename)
+ filename = ensure_alf_path(filename)
for dt in dtypes:
if not dt.filename_pattern.strip():
# If the filename pattern is null, check whether the filename object.attribute matches
# the dataset type name.
if is_valid(filename.name):
- obj_attr = '.'.join(filename_parts(filename.name)[1:3])
+ obj_attr = '.'.join(filename.dataset_name_parts[1:3])
else: # will match name against filename sans extension
obj_attr = filename.stem
if dt.name == obj_attr:
@@ -131,12 +130,13 @@ def create_sessions(self, root_data_folder, glob_pattern='**/create_me.flag',
if dry:
records.append(print(flag_file))
continue
- _logger.info('creating session for ' + str(flag_file.parent))
+ session_path = ALFPath(flag_file.parent)
+ _logger.info('creating session for ' + str(session_path))
# providing a false flag stops the registration after session creation
- session_info, _ = self.register_session(flag_file.parent, file_list=register_files)
+ session_info, _ = self.register_session(session_path, file_list=register_files)
records.append(session_info)
flag_file.unlink()
- return [ff.parent for ff in flag_files], records
+ return [ALFPath(ff.parent) for ff in flag_files], records
def create_new_session(self, subject, session_root=None, date=None, register=True, **kwargs):
"""Create a new local session folder and optionally create session record on Alyx.
@@ -204,9 +204,9 @@ def find_files(self, session_path):
pathlib.Path
File paths that match the dataset type patterns in Alyx.
"""
- session_path = Path(session_path)
- for p in session_path.rglob('*.*.*'):
- if p.is_file() and any(p.name.endswith(ext) for ext in self.file_extensions):
+ session_path = ALFPath(session_path)
+ for p in session_path.iter_datasets(recursive=True):
+ if any(p.name.endswith(ext) for ext in self.file_extensions):
try:
get_dataset_type(p, self.dtypes)
yield p
@@ -341,8 +341,7 @@ def register_session(self, ses_path, users=None, file_list=True, **kwargs):
ConnectionError
Failed to connect to Alyx, most likely due to a bad internet connection.
"""
- if isinstance(ses_path, str):
- ses_path = Path(ses_path)
+ ses_path = ALFPath(ses_path)
details = session_path_parts(ses_path.as_posix(), as_dict=True, assert_valid=True)
# query alyx endpoints for subject, error if not found
self.assert_exists(details['subject'], 'subjects')
@@ -423,7 +422,7 @@ def prepare_files(self, file_list, versions=None):
if single_file := isinstance(file_list, (str, pathlib.Path)):
file_list = [file_list]
- file_list = list(map(pathlib.Path, file_list)) # Ensure list of path objects
+ file_list = list(map(ALFPath, file_list)) # Ensure list of path objects
if versions is None or isinstance(versions, str):
versions = itertools.repeat(versions)
@@ -432,7 +431,7 @@ def prepare_files(self, file_list, versions=None):
# Filter valid files and sort by session
for fn, ver in zip(file_list, versions):
- session_path = get_session_path(fn)
+ session_path = fn.session_path()
if not session_path:
_logger.debug(f'{fn}: Invalid session path')
continue
diff --git a/one/tests/alf/test_alf_files.py b/one/tests/alf/test_alf_files.py
deleted file mode 100644
index 05e2ed55..00000000
--- a/one/tests/alf/test_alf_files.py
+++ /dev/null
@@ -1,248 +0,0 @@
-"""Unit tests for the one.alf.files module."""
-import unittest
-from pathlib import Path, PureWindowsPath
-import uuid
-
-import one.alf.path as files
-
-
-class TestAlfParse(unittest.TestCase):
- """Tests for ALF parsing methods"""
- def test_filename_parts(self):
- """Test for one.alf.files.filename_parts"""
- verifiable = files.filename_parts('_namespace_obj.times_timescale.extra.foo.ext')
- expected = ('namespace', 'obj', 'times', 'timescale', 'extra.foo', 'ext')
- self.assertEqual(expected, verifiable)
-
- verifiable = files.filename_parts('spikes.clusters.npy', as_dict=True)
- expected = {
- 'namespace': None,
- 'object': 'spikes',
- 'attribute': 'clusters',
- 'timescale': None,
- 'extra': None,
- 'extension': 'npy'}
- self.assertEqual(expected, verifiable)
-
- verifiable = files.filename_parts('spikes.times_ephysClock.npy')
- expected = (None, 'spikes', 'times', 'ephysClock', None, 'npy')
- self.assertEqual(expected, verifiable)
-
- verifiable = files.filename_parts('_iblmic_audioSpectrogram.frequencies.npy')
- expected = ('iblmic', 'audioSpectrogram', 'frequencies', None, None, 'npy')
- self.assertEqual(expected, verifiable)
-
- verifiable = files.filename_parts('_spikeglx_ephysData_g0_t0.imec.wiring.json')
- expected = ('spikeglx', 'ephysData_g0_t0', 'imec', None, 'wiring', 'json')
- self.assertEqual(expected, verifiable)
-
- verifiable = files.filename_parts('_spikeglx_ephysData_g0_t0.imec0.lf.bin')
- expected = ('spikeglx', 'ephysData_g0_t0', 'imec0', None, 'lf', 'bin')
- self.assertEqual(expected, verifiable)
-
- verifiable = files.filename_parts('_ibl_trials.goCue_times_bpod.csv')
- expected = ('ibl', 'trials', 'goCue_times', 'bpod', None, 'csv')
- self.assertEqual(expected, verifiable)
-
- with self.assertRaises(ValueError):
- files.filename_parts('badfile')
- verifiable = files.filename_parts('badfile', assert_valid=False)
- self.assertFalse(any(verifiable))
-
- def test_rel_path_parts(self):
- """Test for one.alf.files.rel_path_parts"""
- alf_str = Path('collection/#revision#/_namespace_obj.times_timescale.extra.foo.ext')
- verifiable = files.rel_path_parts(alf_str)
- expected = ('collection', 'revision', 'namespace', 'obj', 'times',
- 'timescale', 'extra.foo', 'ext')
- self.assertEqual(expected, verifiable)
-
- # Check as_dict
- verifiable = files.rel_path_parts('spikes.clusters.npy', as_dict=True)
- expected = {
- 'collection': None,
- 'revision': None,
- 'namespace': None,
- 'object': 'spikes',
- 'attribute': 'clusters',
- 'timescale': None,
- 'extra': None,
- 'extension': 'npy'}
- self.assertEqual(expected, verifiable)
-
- # Check assert valid
- with self.assertRaises(ValueError):
- files.rel_path_parts('bad/badfile')
- verifiable = files.rel_path_parts('bad/badfile', assert_valid=False)
- self.assertFalse(any(verifiable))
-
- def test_session_path_parts(self):
- """Test for one.alf.files.session_path_parts"""
- session_path = '/home/user/Data/labname/Subjects/subject/2020-01-01/001/alf'
- parsed = files.session_path_parts(session_path, as_dict=True)
- expected = {
- 'lab': 'labname',
- 'subject': 'subject',
- 'date': '2020-01-01',
- 'number': '001'}
- self.assertEqual(expected, parsed)
- parsed = files.session_path_parts(session_path, as_dict=False)
- self.assertEqual(tuple(expected.values()), parsed)
- # Check Path as input
- self.assertTrue(any(files.session_path_parts(Path(session_path))))
- # Check parse fails
- session_path = '/home/user/Data/labname/2020-01-01/alf/001/'
- with self.assertRaises(ValueError):
- files.session_path_parts(session_path, assert_valid=True)
- parsed = files.session_path_parts(session_path, assert_valid=False, as_dict=True)
- expected = dict.fromkeys(expected.keys())
- self.assertEqual(expected, parsed)
- parsed = files.session_path_parts(session_path, assert_valid=False, as_dict=False)
- self.assertEqual(tuple([None] * 4), parsed)
-
- def test_folder_parts(self):
- """Test for one.alf.files.folder_parts"""
- path = Path('/home/user/Data/labname/Subjects/subject/2020-01-01/001/'
- 'collection/#revision#/')
- out = files.folder_parts(path)
- expected_values = ('labname', 'subject', '2020-01-01', '001', 'collection', 'revision')
- self.assertEqual(expected_values, out)
-
- path = '/home/user/Data/labname/Subjects/subject/2020-01-01/001'
- expected_values = ('labname', 'subject', '2020-01-01', '001', None, None)
- self.assertEqual(expected_values, files.folder_parts(path))
-
- def test_full_path_parts(self):
- """Test for one.alf.files.full_path_parts"""
- fullpath = Path(
- '/home/user/Data/labname/Subjects/subject/2020-01-01/001/'
- 'collection/#revision#/_namespace_obj.times_timescale.extra.foo.ext'
- )
- # As dict
- out = files.full_path_parts(fullpath, as_dict=True)
- expected_keys = (
- 'lab', 'subject', 'date', 'number', 'collection', 'revision',
- 'namespace', 'object', 'attribute', 'timescale', 'extra', 'extension'
- )
- self.assertIsInstance(out, dict)
- self.assertEqual(expected_keys, tuple(out.keys()))
-
- # As tuple
- out = files.full_path_parts(fullpath, as_dict=False)
- self.assertIsInstance(out, tuple)
- self.assertEqual(len(expected_keys), len(out))
- self.assertTrue(all(out))
-
- # Folders only
- out = files.full_path_parts(fullpath.parent, as_dict=False)
- self.assertTrue(all(out[:6]) and not any(out[6:]))
-
- # Filename only
- out = files.full_path_parts(fullpath.name, as_dict=False)
- self.assertTrue(not any(out[:6]) and all(out[6:]))
-
- def test_isdatetime(self):
- """Test for one.alf.files._isdatetime"""
- inp = ['açsldfkça', '12312', '2020-01-01', '01-01-2020', '2020-12-32']
- out = [False, False, True, False, False]
- for i, o in zip(inp, out):
- self.assertEqual(o, files._isdatetime(i))
-
- def test_add_uuid(self):
- """Test for one.alf.files.add_uuid_string."""
- _uuid = uuid.uuid4()
-
- file_with_uuid = f'/titi/tutu.part1.part1.{_uuid}.json'
- inout = [(file_with_uuid, Path(file_with_uuid)),
- ('/tutu/tata.json', Path(f'/tutu/tata.{_uuid}.json')),
- ('/tutu/tata.part1.json', Path(f'/tutu/tata.part1.{_uuid}.json'))]
- for tup in inout:
- self.assertEqual(tup[1], files.add_uuid_string(tup[0], _uuid))
- self.assertEqual(tup[1], files.add_uuid_string(tup[0], str(_uuid)))
-
- _uuid2 = uuid.uuid4()
- with self.assertLogs(files.__name__, level=10) as cm:
- expected = Path(f'/titi/tutu.part1.part1.{_uuid2}.json')
- self.assertEqual(expected, files.add_uuid_string(file_with_uuid, _uuid2))
- self.assertRegex(cm.output[0], 'Replacing [a-f0-9-]+ with [a-f0-9-]+')
-
- with self.assertRaises(ValueError):
- files.add_uuid_string('/foo/bar.npy', 'fake')
-
- def test_remove_uuid(self):
- """Test for one.alf.files.remove_uuid_string."""
- # First test with full file
- file_path = '/tmp/Subjects/CSHL063/2020-09-12/001/raw_ephys_data/probe00/' \
- '_spikeglx_sync.channels.probe00.89c861ea-66aa-4729-a808-e79f84d08b81.npy'
- desired_output = Path(file_path).with_name('_spikeglx_sync.channels.probe00.npy')
- files.remove_uuid_string(file_path)
- self.assertEqual(desired_output, files.remove_uuid_string(file_path))
- self.assertEqual(desired_output, files.remove_uuid_string(desired_output))
-
- # Test with just file name
- file_path = 'toto.89c861ea-66aa-4729-a808-e79f84d08b81.npy'
- desired_output = Path('toto.npy')
- self.assertEqual(desired_output, files.remove_uuid_string(file_path))
-
- def test_padded_sequence(self):
- """Test for one.alf.files.padded_sequence."""
- # Test with pure path file input
- filepath = PureWindowsPath(r'F:\ScanImageAcquisitions\subject\2023-01-01\1\foo\bar.baz')
- expected = PureWindowsPath(r'F:\ScanImageAcquisitions\subject\2023-01-01\001\foo\bar.baz')
- self.assertEqual(files.padded_sequence(filepath), expected)
-
- # Test with str input session path
- session_path = '/mnt/s0/Data/Subjects/subject/2023-01-01/001'
- expected = Path('/mnt/s0/Data/Subjects/subject/2023-01-01/001')
- self.assertEqual(files.padded_sequence(session_path), expected)
-
- # Test invalid ALF session path
- self.assertRaises(ValueError, files.padded_sequence, '/foo/bar/baz')
-
-
-class TestALFGet(unittest.TestCase):
- """Tests for path extraction functions"""
- def test_get_session_folder(self):
- """Test for one.alf.files.get_session_folder"""
- inp = (Path('/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001/raw_behavior_data/'
- '_iblrig_micData.raw.wav'),
- Path('/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001'),
- '/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001/raw_behavior_data'
- '/_iblrig_micData.raw.wav',
- '/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001',)
- out = (Path('/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001'),
- Path('/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001'),
- Path('/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001'),
- Path('/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001'),)
- for i, o in zip(inp, out):
- self.assertEqual(o, files.get_session_path(i))
- # Test if None is passed
- no_out = files.get_session_path(None)
- self.assertTrue(no_out is None)
-
- def test_get_alf_path(self):
- """Test for one.alf.files.get_alf_path"""
- path = Path('/mnt/s0/Data/Subjects/'
- 'ZM_1368/2019-04-19/001/raw_behavior_data/_iblrig_micData.raw.wav')
- out = files.get_alf_path(path)
- self.assertEqual(out, '/'.join(path.parts[-7:]))
- path = 'collection/trials.intervals_bpod.npy'
- self.assertEqual(files.get_alf_path(path), path)
- path = '/trials.intervals_bpod.npy'
- self.assertEqual(files.get_alf_path(path), 'trials.intervals_bpod.npy')
-
- def test_without_revision(self):
- """Test for one.alf.files.without_revision function."""
- path = '/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001/alf/#2020-01-01#/obj.attr.ext'
- out = files.without_revision(path)
- expected = Path(path.replace('/#2020-01-01#', ''))
- self.assertIsInstance(out, Path)
- self.assertEqual(expected, out, 'failed to remove revision folder')
- self.assertEqual(expected, files.without_revision(out)) # should do nothing to path
- with self.assertRaises(ValueError) as cm:
- files.without_revision('foo/bar/baz.npy')
- self.assertRegex(str(cm.exception), 'Invalid ALF')
-
-
-if __name__ == '__main__':
- unittest.main(exit=False, verbosity=2)
diff --git a/one/tests/alf/test_alf_io.py b/one/tests/alf/test_alf_io.py
index cfe05b33..e8f89d6f 100644
--- a/one/tests/alf/test_alf_io.py
+++ b/one/tests/alf/test_alf_io.py
@@ -18,6 +18,7 @@
import one.alf.io as alfio
from one.alf.exceptions import ALFObjectNotFound
from one.alf.spec import FILE_SPEC, regex
+from one.alf.path import ALFPath
try:
import sparse
@@ -181,27 +182,29 @@ def test_filter_by(self):
'wheel.timestamps.npy',
'wheelMoves.intervals.npy',
'_ibl_trials.intervals.npy']
- self.assertCountEqual(alf_files, expected, 'failed to filter with None attribute')
+ self.assertTrue(all(isinstance(x, ALFPath) for x in alf_files))
+ self.assertCountEqual(
+ alf_files, map(ALFPath, expected), 'failed to filter with None attribute')
# Test filtering by object; should return only 'wheel' ALF objects
alf_files, parts = alfio.filter_by(self.tmpdir, object='wheel')
expected = ['wheel.position.npy', 'wheel.timestamps.npy']
- self.assertCountEqual(alf_files, expected, 'failed to filter by object')
+ self.assertCountEqual(alf_files, map(ALFPath, expected), 'failed to filter by object')
self.assertEqual(len(alf_files), len(parts))
# Test wildcards; should return 'wheel' and 'wheelMoves' ALF objects
alf_files, _ = alfio.filter_by(self.tmpdir, object='wh*')
expected = ['wheel.position.npy', 'wheel.timestamps.npy', 'wheelMoves.intervals.npy']
- self.assertCountEqual(alf_files, expected, 'failed to filter with wildcard')
+ self.assertCountEqual(alf_files, map(ALFPath, expected), 'failed to filter with wildcard')
# Test wildcard arrays
alf_files, _ = alfio.filter_by(self.tmpdir, object='wh*', attribute=['time*', 'pos*'])
expected = ['wheel.position.npy', 'wheel.timestamps.npy']
- self.assertCountEqual(alf_files, expected, 'failed to filter with wildcard')
+ self.assertCountEqual(alf_files, map(ALFPath, expected), 'failed to filter with wildcard')
# Test filtering by specific timescale; test parts returned
alf_files, parts = alfio.filter_by(self.tmpdir, timescale='bpod')
- expected = ['_ibl_trials.intervals_bpod.csv']
+ expected = [ALFPath('_ibl_trials.intervals_bpod.csv')]
self.assertEqual(alf_files, expected, 'failed to filter by timescale')
expected = ('ibl', 'trials', 'intervals', 'bpod', None, 'csv')
self.assertTupleEqual(parts[0], expected)
@@ -211,11 +214,13 @@ def test_filter_by(self):
# Test filtering multiple attributes; should return only trials intervals
alf_files, _ = alfio.filter_by(self.tmpdir, attribute='intervals', object='trials')
expected = ['_ibl_trials.intervals.npy', '_ibl_trials.intervals_bpod.csv']
- self.assertCountEqual(alf_files, expected, 'failed to filter by multiple attribute')
+ self.assertCountEqual(
+ alf_files, map(ALFPath, expected), 'failed to filter by multiple attribute')
# Test returning only ALF files
alf_files, _ = alfio.filter_by(self.tmpdir)
- self.assertCountEqual(alf_files, file_names[1:], 'failed to return ALF files')
+ self.assertCountEqual(
+ alf_files, map(ALFPath, file_names[1:]), 'failed to return ALF files')
# Test return empty
out = alfio.filter_by(self.tmpdir, object=None)
@@ -223,11 +228,11 @@ def test_filter_by(self):
# Test extras
alf_files, _ = alfio.filter_by(self.tmpdir, extra='v12')
- expected = ['_namespace_obj.attr_timescale.raw.v12.ext']
+ expected = [ALFPath('_namespace_obj.attr_timescale.raw.v12.ext')]
self.assertEqual(alf_files, expected, 'failed to filter extra attributes')
alf_files, _ = alfio.filter_by(self.tmpdir, extra=['v12', 'raw'])
- expected = ['_namespace_obj.attr_timescale.raw.v12.ext']
+ expected = [ALFPath('_namespace_obj.attr_timescale.raw.v12.ext')]
self.assertEqual(alf_files, expected, 'failed to filter extra attributes as list')
alf_files, _ = alfio.filter_by(self.tmpdir, extra=['foo', 'v12'])
@@ -240,10 +245,10 @@ def test_filter_by(self):
# Check regular expression search
alf_files, _ = alfio.filter_by(self.tmpdir, object='^wheel.*', wildcards=False)
expected = ['wheel.position.npy', 'wheel.timestamps.npy', 'wheelMoves.intervals.npy']
- self.assertCountEqual(alf_files, expected, 'failed to filter by regex')
+ self.assertCountEqual(alf_files, map(ALFPath, expected), 'failed to filter by regex')
# Should work with lists
alf_files, _ = alfio.filter_by(self.tmpdir, object=['^wheel$', '.*Moves'], wildcards=False)
- self.assertCountEqual(alf_files, expected, 'failed to filter by regex')
+ self.assertCountEqual(alf_files, map(ALFPath, expected), 'failed to filter by regex')
def tearDown(self) -> None:
shutil.rmtree(self.tmpdir)
@@ -273,7 +278,7 @@ def setUp(self) -> None:
pd.DataFrame(np.random.rand(10, 5), columns=cols).to_parquet(self.object_files[-1])
def test_exists(self):
- """Test for one.alf.io.exists"""
+ """Test for one.alf.io.exists."""
self.assertFalse(alfio.exists(self.tmpdir, 'asodiujfas'))
self.assertTrue(alfio.exists(self.tmpdir, 'neuveu'))
# test with attribute string only
@@ -297,7 +302,9 @@ def test_metadata_columns(self):
cols = ['titi', 'tutu', 'toto', 'tata']
np.save(file_alf, data)
np.save(self.tmpdir / '_ns_object.gnagna.npy', data[:, -1])
- alfio.save_metadata(file_alf, {'columns': cols})
+ path = alfio.save_metadata(file_alf, {'columns': cols})
+ self.assertIsInstance(path, ALFPath)
+ self.assertEqual('_ns_object.attribute.metadata.json', path.name)
dread = alfio.load_object(self.tmpdir, 'object', namespace='ns', short_keys=False)
self.assertTrue(np.all(dread['titi'] == data[:, 0]))
self.assertTrue(np.all(dread['gnagna'] == data[:, -1]))
@@ -401,7 +408,7 @@ def test_ls(self):
"""Test for one.alf.io._ls"""
# Test listing all ALF files in a directory
alf_files, _ = alfio._ls(self.tmpdir)
- self.assertIsInstance(alf_files[0], Path)
+ self.assertIsInstance(alf_files[0], ALFPath)
self.assertEqual(8, len(alf_files))
# Test with filepath
@@ -547,9 +554,10 @@ def test_load_file_content(self):
pqt = next(Path(__file__).parents[1].joinpath('fixtures').glob('*.pqt'))
loaded = alfio.load_file_content(pqt)
self.assertIsInstance(loaded, pd.DataFrame)
- # Unknown file should return Path
+ # Unknown file should return ALFPath
file = alfio.load_file_content(str(self.xyz))
self.assertEqual(file, self.xyz)
+ self.assertIsInstance(file, ALFPath)
self.assertIsNone(alfio.load_file_content(None))
# Load YAML file
loaded = alfio.load_file_content(str(self.yaml))
@@ -588,23 +596,6 @@ def test_load_sparse_npz(self):
class TestUUID_Files(unittest.TestCase):
- def test_remove_uuid(self):
- with tempfile.TemporaryDirectory() as dir:
- f1 = Path(dir).joinpath('tutu.part1.part1.30c09473-4d3d-4f51-9910-c89a6840096e.json')
- f2 = Path(dir).joinpath('tata.part1.part1.json')
- f3 = Path(dir).joinpath('toto.json')
- f1.touch()
- f2.touch()
- f2.touch()
- self.assertTrue(alfio.remove_uuid_file(f1) ==
- Path(dir).joinpath('tutu.part1.part1.json'))
- self.assertTrue(alfio.remove_uuid_file(f2) ==
- Path(dir).joinpath('tata.part1.part1.json'))
- self.assertTrue(alfio.remove_uuid_file(f3) ==
- Path(dir).joinpath('toto.json'))
- self.assertTrue(alfio.remove_uuid_file(str(f3)) ==
- Path(dir).joinpath('toto.json'))
-
def test_remove_uuid_recusive(self):
uuid = '30c09473-4d3d-4f51-9910-c89a6840096e'
with tempfile.TemporaryDirectory() as dir:
@@ -687,11 +678,13 @@ def test_iter_sessions(self):
self.assertEqual(next(valid_sessions), self.session_path)
self.assertFalse(next(valid_sessions, False))
# makes sure that the session path returns itself on the iterator
- self.assertEqual(self.session_path, next(alfio.iter_sessions(self.session_path)))
+ path = next(alfio.iter_sessions(self.session_path))
+ self.assertEqual(self.session_path, path)
+ self.assertIsInstance(path, ALFPath)
# test pattern arg
- valid_sessions = alfio.iter_sessions(
- self.tempdir.name, pattern='*/Subjects/*/????-??-??/*')
- self.assertEqual(self.session_path, next(valid_sessions))
+ valid_sessions = list(alfio.iter_sessions(
+ self.tempdir.name, pattern='*/Subjects/*/????-??-??/*'))
+ self.assertEqual([ALFPath(self.session_path)], valid_sessions)
subjects_path = Path(self.tempdir.name, 'fakelab', 'Subjects')
valid_sessions = alfio.iter_sessions(subjects_path, pattern='*/????-??-??/*')
self.assertEqual(self.session_path, next(valid_sessions))
@@ -709,6 +702,7 @@ def test_iter_datasets(self):
ses_files = list(alfio.iter_datasets(self.session_path))
self.assertEqual([Path(*dset.parts[-2:])], ses_files)
+ self.assertIsInstance(ses_files[0], ALFPath)
class TestFindVariants(unittest.TestCase):
@@ -738,6 +732,8 @@ def test_unique(self):
dupes = alfio.find_variants(self.dsets)
self.assertCountEqual(self.dsets, dupes.keys(), 'expected keys to match input files')
self.assertFalse(any(map(any, dupes.values())), 'expected no duplicates')
+ paths = filter(None, (*dupes.keys(), *dupes.values()))
+ self.assertTrue(all(isinstance(x, ALFPath) for x in paths))
# With extra=False should treat files with extra parts as a variant
dupes = alfio.find_variants(self.dsets, extra=False)
diff --git a/one/tests/alf/test_alf_path.py b/one/tests/alf/test_alf_path.py
new file mode 100644
index 00000000..ef7562c0
--- /dev/null
+++ b/one/tests/alf/test_alf_path.py
@@ -0,0 +1,621 @@
+"""Unit tests for the one.alf.path module."""
+import unittest
+from unittest import mock
+import tempfile
+from types import GeneratorType
+from uuid import uuid4
+from pathlib import (
+ Path, PurePath, PureWindowsPath, PurePosixPath, _windows_flavour, _posix_flavour)
+
+from one.alf import path
+from one.alf.path import ALFPath, PureALFPath, ensure_alf_path
+from one.alf.exceptions import ALFInvalid
+
+
+class TestALFParse(unittest.TestCase):
+ """Tests for ALF parsing methods"""
+ def test_filename_parts(self):
+ """Test for one.alf.path.filename_parts"""
+ verifiable = path.filename_parts('_namespace_obj.times_timescale.extra.foo.ext')
+ expected = ('namespace', 'obj', 'times', 'timescale', 'extra.foo', 'ext')
+ self.assertEqual(expected, verifiable)
+
+ verifiable = path.filename_parts('spikes.clusters.npy', as_dict=True)
+ expected = {
+ 'namespace': None,
+ 'object': 'spikes',
+ 'attribute': 'clusters',
+ 'timescale': None,
+ 'extra': None,
+ 'extension': 'npy'}
+ self.assertEqual(expected, verifiable)
+
+ verifiable = path.filename_parts('spikes.times_ephysClock.npy')
+ expected = (None, 'spikes', 'times', 'ephysClock', None, 'npy')
+ self.assertEqual(expected, verifiable)
+
+ verifiable = path.filename_parts('_iblmic_audioSpectrogram.frequencies.npy')
+ expected = ('iblmic', 'audioSpectrogram', 'frequencies', None, None, 'npy')
+ self.assertEqual(expected, verifiable)
+
+ verifiable = path.filename_parts('_spikeglx_ephysData_g0_t0.imec.wiring.json')
+ expected = ('spikeglx', 'ephysData_g0_t0', 'imec', None, 'wiring', 'json')
+ self.assertEqual(expected, verifiable)
+
+ verifiable = path.filename_parts('_spikeglx_ephysData_g0_t0.imec0.lf.bin')
+ expected = ('spikeglx', 'ephysData_g0_t0', 'imec0', None, 'lf', 'bin')
+ self.assertEqual(expected, verifiable)
+
+ verifiable = path.filename_parts('_ibl_trials.goCue_times_bpod.csv')
+ expected = ('ibl', 'trials', 'goCue_times', 'bpod', None, 'csv')
+ self.assertEqual(expected, verifiable)
+
+ with self.assertRaises(ValueError):
+ path.filename_parts('badfile')
+ verifiable = path.filename_parts('badfile', assert_valid=False)
+ self.assertFalse(any(verifiable))
+
+ def test_rel_path_parts(self):
+ """Test for one.alf.path.rel_path_parts"""
+ alf_str = Path('collection/#revision#/_namespace_obj.times_timescale.extra.foo.ext')
+ verifiable = path.rel_path_parts(alf_str)
+ expected = ('collection', 'revision', 'namespace', 'obj', 'times',
+ 'timescale', 'extra.foo', 'ext')
+ self.assertEqual(expected, verifiable)
+
+ # Check as_dict
+ verifiable = path.rel_path_parts('spikes.clusters.npy', as_dict=True)
+ expected = {
+ 'collection': None,
+ 'revision': None,
+ 'namespace': None,
+ 'object': 'spikes',
+ 'attribute': 'clusters',
+ 'timescale': None,
+ 'extra': None,
+ 'extension': 'npy'}
+ self.assertEqual(expected, verifiable)
+
+ # Check assert valid
+ with self.assertRaises(ValueError):
+ path.rel_path_parts('bad/badfile')
+ verifiable = path.rel_path_parts('bad/badfile', assert_valid=False)
+ self.assertFalse(any(verifiable))
+
+ def test_session_path_parts(self):
+ """Test for one.alf.path.session_path_parts"""
+ session_path = '/home/user/Data/labname/Subjects/subject/2020-01-01/001/alf'
+ parsed = path.session_path_parts(session_path, as_dict=True)
+ expected = {
+ 'lab': 'labname',
+ 'subject': 'subject',
+ 'date': '2020-01-01',
+ 'number': '001'}
+ self.assertEqual(expected, parsed)
+ parsed = path.session_path_parts(session_path, as_dict=False)
+ self.assertEqual(tuple(expected.values()), parsed)
+ # Check Path as input
+ self.assertTrue(any(path.session_path_parts(Path(session_path))))
+ # Check parse fails
+ session_path = '/home/user/Data/labname/2020-01-01/alf/001/'
+ with self.assertRaises(ValueError):
+ path.session_path_parts(session_path, assert_valid=True)
+ parsed = path.session_path_parts(session_path, assert_valid=False, as_dict=True)
+ expected = dict.fromkeys(expected.keys())
+ self.assertEqual(expected, parsed)
+ parsed = path.session_path_parts(session_path, assert_valid=False, as_dict=False)
+ self.assertEqual(tuple([None] * 4), parsed)
+
+ def test_folder_parts(self):
+ """Test for one.alf.path.folder_parts"""
+ alfpath = Path(
+ '/home/user/Data/labname/Subjects/subject/2020-01-01/001/collection/#revision#/')
+ out = path.folder_parts(alfpath)
+ expected_values = ('labname', 'subject', '2020-01-01', '001', 'collection', 'revision')
+ self.assertEqual(expected_values, out)
+
+ alfpath = '/home/user/Data/labname/Subjects/subject/2020-01-01/001'
+ expected_values = ('labname', 'subject', '2020-01-01', '001', None, None)
+ self.assertEqual(expected_values, path.folder_parts(alfpath))
+
+ def test_full_path_parts(self):
+ """Test for one.alf.path.full_path_parts"""
+ fullpath = Path(
+ '/home/user/Data/labname/Subjects/subject/2020-01-01/001/'
+ 'collection/#revision#/_namespace_obj.times_timescale.extra.foo.ext'
+ )
+ # As dict
+ out = path.full_path_parts(fullpath, as_dict=True)
+ expected_keys = (
+ 'lab', 'subject', 'date', 'number', 'collection', 'revision',
+ 'namespace', 'object', 'attribute', 'timescale', 'extra', 'extension'
+ )
+ self.assertIsInstance(out, dict)
+ self.assertEqual(expected_keys, tuple(out.keys()))
+
+ # As tuple
+ out = path.full_path_parts(fullpath, as_dict=False)
+ self.assertIsInstance(out, tuple)
+ self.assertEqual(len(expected_keys), len(out))
+ self.assertTrue(all(out))
+
+ # Folders only
+ out = path.full_path_parts(fullpath.parent, as_dict=False)
+ self.assertTrue(all(out[:6]) and not any(out[6:]))
+
+ # Filename only
+ out = path.full_path_parts(fullpath.name, as_dict=False)
+ self.assertTrue(not any(out[:6]) and all(out[6:]))
+
+ def test_isdatetime(self):
+ """Test for one.alf.path._isdatetime"""
+ inp = ['açsldfkça', '12312', '2020-01-01', '01-01-2020', '2020-12-32']
+ out = [False, False, True, False, False]
+ for i, o in zip(inp, out):
+ self.assertEqual(o, path._isdatetime(i))
+
+ def test_add_uuid(self):
+ """Test for one.alf.path.add_uuid_string."""
+ _uuid = uuid4()
+
+ file_with_uuid = f'/titi/tutu.part1.part1.{_uuid}.json'
+ inout = [(file_with_uuid, Path(file_with_uuid)),
+ ('/tutu/tata.json', Path(f'/tutu/tata.{_uuid}.json')),
+ ('/tutu/tata.part1.json', Path(f'/tutu/tata.part1.{_uuid}.json'))]
+ for tup in inout:
+ self.assertEqual(tup[1], path.add_uuid_string(tup[0], _uuid))
+ self.assertEqual(tup[1], path.add_uuid_string(tup[0], str(_uuid)))
+
+ _uuid2 = uuid4()
+ with self.assertLogs(path.__name__, level=10) as cm:
+ expected = Path(f'/titi/tutu.part1.part1.{_uuid2}.json')
+ self.assertEqual(expected, path.add_uuid_string(file_with_uuid, _uuid2))
+ self.assertRegex(cm.output[0], 'Replacing [a-f0-9-]+ with [a-f0-9-]+')
+
+ with self.assertRaises(ValueError):
+ path.add_uuid_string('/foo/bar.npy', 'fake')
+
+ def test_remove_uuid(self):
+ """Test for one.alf.path.remove_uuid_string."""
+ # First test with full file
+ file_path = '/tmp/Subjects/CSHL063/2020-09-12/001/raw_ephys_data/probe00/' \
+ '_spikeglx_sync.channels.probe00.89c861ea-66aa-4729-a808-e79f84d08b81.npy'
+ desired_output = Path(file_path).with_name('_spikeglx_sync.channels.probe00.npy')
+ path.remove_uuid_string(file_path)
+ self.assertEqual(desired_output, path.remove_uuid_string(file_path))
+ self.assertEqual(desired_output, path.remove_uuid_string(desired_output))
+
+ # Test with just file name
+ file_path = 'toto.89c861ea-66aa-4729-a808-e79f84d08b81.npy'
+ desired_output = Path('toto.npy')
+ self.assertEqual(desired_output, path.remove_uuid_string(file_path))
+
+ def test_padded_sequence(self):
+ """Test for one.alf.path.padded_sequence."""
+ # Test with pure path file input
+ filepath = PureWindowsPath(r'F:\ScanImageAcquisitions\subject\2023-01-01\1\foo\bar.baz')
+ expected = PureWindowsPath(r'F:\ScanImageAcquisitions\subject\2023-01-01\001\foo\bar.baz')
+ self.assertEqual(path.padded_sequence(filepath), expected)
+
+ # Test with str input session path
+ session_path = '/mnt/s0/Data/Subjects/subject/2023-01-01/001'
+ expected = Path('/mnt/s0/Data/Subjects/subject/2023-01-01/001')
+ self.assertEqual(path.padded_sequence(session_path), expected)
+
+ # Test invalid ALF session path
+ self.assertRaises(ValueError, path.padded_sequence, '/foo/bar/baz')
+
+
+class TestALFGet(unittest.TestCase):
+ """Tests for path extraction functions"""
+ def test_get_session_folder(self):
+ """Test for one.alf.path.get_session_folder"""
+ inp = (Path('/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001/raw_behavior_data/'
+ '_iblrig_micData.raw.wav'),
+ Path('/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001'),
+ '/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001/raw_behavior_data'
+ '/_iblrig_micData.raw.wav',
+ '/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001',)
+ out = (Path('/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001'),
+ Path('/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001'),
+ Path('/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001'),
+ Path('/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001'),)
+ for i, o in zip(inp, out):
+ self.assertEqual(o, path.get_session_path(i))
+ # Test if None is passed
+ no_out = path.get_session_path(None)
+ self.assertTrue(no_out is None)
+
+ def test_get_alf_path(self):
+ """Test for one.alf.path.get_alf_path"""
+ alfpath = Path(
+ '/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001/'
+ 'raw_behavior_data/_iblrig_micData.raw.wav')
+ out = path.get_alf_path(alfpath)
+ self.assertEqual(out, '/'.join(alfpath.parts[-7:]))
+ alfpath = 'collection/trials.intervals_bpod.npy'
+ self.assertEqual(path.get_alf_path(alfpath), alfpath)
+ alfpath = '/trials.intervals_bpod.npy'
+ self.assertEqual(path.get_alf_path(alfpath), 'trials.intervals_bpod.npy')
+
+ def test_without_revision(self):
+ """Test for one.alf.path.without_revision function."""
+ alfpath = '/mnt/s0/Data/Subjects/ZM_1368/2019-04-19/001/alf/#2020-01-01#/obj.attr.ext'
+ out = path.without_revision(alfpath)
+ expected = Path(alfpath.replace('/#2020-01-01#', ''))
+ self.assertIsInstance(out, Path)
+ self.assertEqual(expected, out, 'failed to remove revision folder')
+ self.assertEqual(expected, path.without_revision(out)) # should do nothing to path
+ with self.assertRaises(ALFInvalid):
+ path.without_revision('foo/bar/baz.npy')
+
+
+class TestALFPath(unittest.TestCase):
+ """Tests for ALFPath class methods."""
+
+ def setUp(self):
+ self.alfpath = ALFPath(Path.home().joinpath(
+ 'foo', 'labname', 'Subjects', 'subject', '1900-01-01', '001',
+ 'alf', '#2020-01-01#', 'obj.attr.ext'
+ ))
+
+ def test_factory(self):
+ """Test PureALFPath and ALFPath new class methods."""
+ with mock.patch.object(_windows_flavour, 'is_supported', False), \
+ mock.patch.object(_posix_flavour, 'is_supported', False):
+ self.assertRaises(NotImplementedError, ALFPath, 'foo.bar.ext')
+
+ def test_is_valid_alf(self):
+ """Test for PureALFPath.is_valid_alf and ALFPath.is_valid_alf methods."""
+ self.assertTrue(self.alfpath.is_valid_alf())
+ self.assertTrue(PureALFPath.is_valid_alf(str(self.alfpath)))
+ self.assertFalse(PureALFPath.is_valid_alf(self.alfpath.with_name('foo.npy')))
+ self.assertFalse(ALFPath.is_valid_alf(self.alfpath.with_name('foo.npy')))
+ # A session path with invalid subject name should return False
+ self.assertFalse(PureALFPath.is_valid_alf('abc-@/2020-01-01/001'))
+ with tempfile.TemporaryDirectory() as tmp:
+ tmp_session = ALFPath(
+ tmp, 'foo', 'labname', 'Subjects', 'subject', '1900-01-01', '001')
+ # An ostensibly valid file that is actually a folder should be invalid
+ (fake_file := tmp_session.joinpath('obj.attr.ext')).mkdir(parents=True)
+ self.assertFalse(fake_file.is_valid_alf())
+ self.assertTrue(PureALFPath.is_valid_alf(str(fake_file)))
+ # An ostensibly valid folder that is actually a file should be invalid
+ (fake_folder := tmp_session.joinpath('#2020-01-01#')).touch()
+ self.assertFalse(ALFPath.is_valid_alf(str(fake_folder)))
+ self.assertTrue(PureALFPath(fake_folder).is_valid_alf())
+ # If it doesn't exist it should still be considered valid
+ self.assertTrue(tmp_session.is_valid_alf())
+
+ def test_is_dataset(self):
+ """Test for PureALFPath.is_dataset method."""
+ self.assertTrue(self.alfpath.is_dataset())
+ self.assertFalse(self.alfpath.parent.is_dataset())
+
+ def test_session_path(self):
+ """Test for PureALFPath.session_path method."""
+ expected = self.alfpath.parents[2]
+ self.assertEqual(expected, self.alfpath.session_path())
+
+ def test_without_revision(self):
+ """Test for PureALFPath.without_revision method."""
+ # Test with dataset
+ expected = self.alfpath.parents[1] / self.alfpath.name
+ self.assertEqual(expected, self.alfpath.without_revision())
+ # Test with revision folder
+ expected = self.alfpath.parents[1]
+ self.assertEqual(expected, self.alfpath.parent.without_revision())
+ # Test with other folder
+ expected = self.alfpath.parents[2]
+ self.assertEqual(expected, self.alfpath.parents[2].without_revision())
+ # Test with invalid path
+ alfpath = self.alfpath.parent.joinpath('foo.npy')
+ self.assertRaises(ALFInvalid, alfpath.without_revision)
+
+ def test_with_revision(self):
+ """Test for PureALFPath.with_revision method."""
+ # Test dataset with revision
+ expected = self.alfpath.parents[1] / '#bar#' / self.alfpath.name
+ self.assertEqual(expected, self.alfpath.with_revision('bar'))
+ # Test dataset without revision
+ expected = self.alfpath.parents[1] / '#baz#' / self.alfpath.name
+ alfpath = self.alfpath.parents[1] / self.alfpath.name
+ self.assertEqual(expected, alfpath.with_revision('baz'))
+ # Test revision folder
+ expected = self.alfpath.parents[1] / '#bar#'
+ self.assertEqual(expected, self.alfpath.parent.with_revision('bar'))
+ # Test non-revision folder
+ expected = self.alfpath.parents[1] / '#bar#'
+ self.assertEqual(expected, self.alfpath.parents[1].with_revision('bar'))
+ # Test path relative to session (currently not supported due to spec ambiguity)
+ alfpath = self.alfpath.relative_to_session()
+ self.assertRaises(ALFInvalid, alfpath.with_revision, 'bar')
+
+ def test_with_padded_sequence(self):
+ """Test for PureALFPath.with_padded_sequence method."""
+ # Test already padded
+ self.assertEqual(self.alfpath, self.alfpath.with_padded_sequence())
+ # Test not padded
+ alfpath = self.alfpath.parents[3].joinpath('1', *self.alfpath.parts[-3:])
+ self.assertEqual(self.alfpath, alfpath.with_padded_sequence())
+
+ def test_relative_to_session(self):
+ """Test for PureALFPath.relative_to_session method."""
+ expected = ALFPath(*self.alfpath.parts[-3:])
+ self.assertEqual(expected, self.alfpath.relative_to_session())
+ self.assertRaises(ValueError, expected.relative_to_session)
+
+ def test_session_path_short(self):
+ """Test for PureALFPath.session_path_short method."""
+ expected = 'subject/1900-01-01/001'
+ self.assertEqual(expected, self.alfpath.session_path_short())
+ expected = 'labname/subject/1900-01-01/001'
+ self.assertEqual(expected, self.alfpath.session_path_short(include_lab=True))
+
+ def test_without_lab(self):
+ """Test for PureALFPath.without_lab method."""
+ # Test with lab
+ expected = ALFPath(self.alfpath.as_posix().replace('labname/Subjects/', ''))
+ self.assertEqual(expected, self.alfpath.without_lab())
+ # Test without lab
+ self.assertEqual(expected, expected.without_lab())
+
+ def test_relative_to_lab(self):
+ """Test ALFPath.relative_to_lab method."""
+ # Test with lab
+ expected = ALFPath(*self.alfpath.parts[-6:])
+ self.assertEqual(expected, self.alfpath.relative_to_lab())
+ # Test without lab
+ self.assertRaises(ValueError, expected.relative_to_lab)
+
+ def test_without_uuid(self):
+ """Test for PureALFPath.without_uuid method."""
+ # Test file without uuid
+ self.assertEqual(self.alfpath, self.alfpath.without_uuid())
+ # Test file with uuid
+ alfpath = self.alfpath.parent / f'obj.attr.{uuid4()}.ext'
+ self.assertEqual(self.alfpath, alfpath.without_uuid())
+ # Test folder
+ self.assertEqual(self.alfpath.parent, alfpath.parent.without_uuid())
+
+ def test_with_uuid(self):
+ """Test for PureALFPath.with_uuid method."""
+ # Test file without uuid
+ uuid = uuid4()
+ expected = self.alfpath.parent / f'obj.attr.{uuid}.ext'
+ self.assertEqual(expected, self.alfpath.with_uuid(uuid))
+ # Test file with uuid
+ uuid = uuid4()
+ alfpath = expected.with_uuid(uuid)
+ expected = self.alfpath.parent / f'obj.attr.{uuid}.ext'
+ self.assertEqual(expected, alfpath)
+ # Test folder
+ self.assertRaises(ALFInvalid, alfpath.parent.with_uuid, uuid)
+
+ def test_is_session_path(self):
+ """Test PureALFPath and ALFPath.is_session_path methods."""
+ # Check PureALFPath w/o system calls
+ self.assertFalse(self.alfpath.is_session_path())
+ self.assertTrue(self.alfpath.parents[2].is_session_path())
+ self.assertTrue(PureALFPath(self.alfpath.parents[2]).is_session_path())
+ # Check ALFPath method with system call
+ with tempfile.TemporaryDirectory() as tmp:
+ tmp_session = ALFPath(
+ tmp, 'foo', 'labname', 'Subjects', 'subject', '1900-01-01', '001')
+ self.assertTrue(tmp_session.is_session_path())
+ # An ostensibly valid session path that is actually a file should be invalid
+ tmp_session.parent.mkdir(parents=True)
+ tmp_session.touch()
+ self.assertFalse(tmp_session.is_session_path())
+
+ def test_iter_datasets(self):
+ """Test ALFPath.iter_datasets method."""
+ with tempfile.TemporaryDirectory() as tmp:
+ tmp_session = ALFPath(
+ tmp, 'foo', 'labname', 'Subjects', 'subject', '1900-01-01', '001')
+ tmp_session.mkdir(parents=True)
+ for file in ('foo.bar', 'obj.attr.ext', 'bar.baz.foo', 'alf/foo.baz.bar'):
+ if file.startswith('alf'):
+ tmp_session.joinpath(file).parent.mkdir()
+ tmp_session.joinpath(file).touch()
+ dsets = tmp_session.iter_datasets()
+ self.assertIsInstance(dsets, GeneratorType)
+ dsets = list(dsets)
+ expected = [tmp_session / f for f in ('bar.baz.foo', 'obj.attr.ext')]
+ self.assertEqual(expected, dsets) # NB: Order important here
+ # Check recursive
+ dsets = list(tmp_session.iter_datasets(recursive=True))
+ self.assertEqual(3, len(dsets))
+ self.assertEqual(tmp_session / 'alf/foo.baz.bar', dsets[0])
+
+ def test_with_object(self):
+ """Test for PureALFPath.with_object method."""
+ # Test without namespace
+ expected = self.alfpath.with_name('foo.attr.ext')
+ self.assertEqual(expected, self.alfpath.with_object('foo'))
+ # Test with namespace
+ alfpath = self.alfpath.with_name('_ns_obj.attr.ext')
+ expected = self.alfpath.with_name('_ns_bar.attr.ext')
+ self.assertEqual(expected, alfpath.with_object('bar'))
+ self.assertRaises(ALFInvalid, alfpath.with_stem('foo').with_object, 'obj')
+
+ def test_with_namespace(self):
+ """Test for PureALFPath.with_namespace method."""
+ # Test without namespace
+ expected = self.alfpath.with_name('_ns_obj.attr.ext')
+ self.assertEqual(expected, self.alfpath.with_namespace('ns'))
+ # Test with namespace
+ alfpath = self.alfpath.with_name('_foo_obj.attr.ext')
+ self.assertEqual(expected, alfpath.with_namespace('ns'))
+ # Test removing namespace
+ self.assertEqual(self.alfpath, alfpath.with_namespace(''))
+ self.assertRaises(ALFInvalid, alfpath.with_stem('foo').with_namespace, 'ns')
+
+ def test_with_attribute(self):
+ """Test for PureALFPath.with_attribute method."""
+ # Test without timescale
+ expected = self.alfpath.with_name('obj.foo.ext')
+ self.assertEqual(expected, self.alfpath.with_attribute('foo'))
+ # Test with timescale
+ alfpath = self.alfpath.with_name('obj.attr_times_barClock.ext')
+ expected = self.alfpath.with_name('obj.foo_barClock.ext')
+ self.assertEqual(expected, alfpath.with_attribute('foo'))
+ self.assertRaises(ALFInvalid, alfpath.with_stem('foo').with_attribute, 'attr')
+
+ def test_with_timescale(self):
+ """Test for PureALFPath.with_timescale method."""
+ # Test without timescale
+ expected = self.alfpath.with_name('obj.attr_foo.ext')
+ self.assertEqual(expected, self.alfpath.with_timescale('foo'))
+ # Test with timescale
+ alfpath = self.alfpath.with_name('obj.attr_times_barClock.ext')
+ expected = self.alfpath.with_name('obj.attr_times_foo.ext')
+ self.assertEqual(expected, alfpath.with_timescale('foo'))
+ # Test removing timescale
+ expected = self.alfpath.with_name('obj.attr_times.ext')
+ self.assertEqual(expected, alfpath.with_timescale(''))
+ self.assertRaises(ALFInvalid, alfpath.with_stem('foo').with_timescale, 'bpod')
+
+ def test_with_extra(self):
+ """Test for PureALFPath.with_extra method."""
+ # Test without extra
+ expected = self.alfpath.with_name('obj.attr.extra.ext')
+ self.assertEqual(expected, self.alfpath.with_extra('extra'))
+ # Test with extra
+ alfpath = expected
+ expected = self.alfpath.with_name('obj.attr.foo.ext')
+ self.assertEqual(expected, alfpath.with_extra('foo'))
+ # Test append
+ alfpath = expected
+ expected = self.alfpath.with_name('obj.attr.foo.extra.ext')
+ self.assertEqual(expected, alfpath.with_extra('extra', append=True))
+ # Test list
+ self.assertEqual(expected, alfpath.with_extra(['foo', 'extra']))
+ # Test removing extra
+ self.assertEqual(self.alfpath, alfpath.with_extra(''))
+ self.assertRaises(ALFInvalid, alfpath.with_stem('foo').with_extra, 'extra')
+
+ def test_with_extension(self):
+ """Test for PureALFPath.with_extension method."""
+ expected = self.alfpath.with_suffix('.npy')
+ self.assertEqual(expected, self.alfpath.with_extension('npy'))
+ self.assertRaises(ValueError, self.alfpath.with_extension, '')
+ self.assertRaises(ALFInvalid, self.alfpath.with_stem('foo').with_extension, 'ext')
+
+ def test_parts_properties(self):
+ """Test the PureALFPath ALF dataset part properties."""
+ # Namespace
+ self.assertEqual('', self.alfpath.namespace)
+ self.assertEqual('ns', self.alfpath.with_stem('_ns_obj.attr').namespace)
+ self.assertEqual('', self.alfpath.with_stem('_ns_foo').namespace)
+ # Object
+ self.assertEqual('obj', self.alfpath.object)
+ self.assertEqual('', self.alfpath.with_stem('foo').object)
+ # Attribute
+ self.assertEqual('attr', self.alfpath.attribute)
+ self.assertEqual('', self.alfpath.with_stem('foo').attribute)
+ # Timescale
+ self.assertEqual('', self.alfpath.timescale)
+ self.assertEqual('bpod', self.alfpath.with_stem('obj.attr_times_bpod').timescale)
+ self.assertEqual('', self.alfpath.with_stem('foo').timescale)
+ # Extra
+ self.assertEqual('', self.alfpath.extra)
+ self.assertEqual('foo.bar', self.alfpath.with_stem('obj.att.foo.bar').extra)
+ self.assertEqual('', self.alfpath.with_stem('foo').extra)
+ # dataset_name_parts
+ self.assertEqual(('', 'obj', 'attr', '', '', 'ext'), self.alfpath.dataset_name_parts)
+ alfpath = self.alfpath.with_name('_ns_obj.attr_times_bpod.foo.bar.ext')
+ expected = ('ns', 'obj', 'attr_times', 'bpod', 'foo.bar', 'ext')
+ self.assertEqual(expected, alfpath.dataset_name_parts)
+ # session_parts
+ self.assertEqual(('labname', 'subject', '1900-01-01', '001'), self.alfpath.session_parts)
+ alfpath = ALFPath(*self.alfpath.parts[5:])
+ self.assertEqual(('', 'subject', '1900-01-01', '001'), alfpath.session_parts)
+ # alf_parts
+ alfpath = self.alfpath.with_name('_ns_obj.attr_times_bpod.foo.bar.ext')
+ expected = ('labname', 'subject', '1900-01-01', '001', 'alf', '2020-01-01',
+ 'ns', 'obj', 'attr_times', 'bpod', 'foo.bar', 'ext')
+ self.assertEqual(expected, alfpath.alf_parts)
+ expected = ('', '', '', '', '', '', '', '', '', '', '', '')
+ self.assertEqual(expected, ALFPath('foo').alf_parts)
+
+ def test_parse_alf_path(self):
+ """Test PureALFPath.parse_alf_path method."""
+ parsed = self.alfpath.parse_alf_path()
+ self.assertIsInstance(parsed, dict)
+ expected = dict(
+ lab='labname', subject='subject', date='1900-01-01', number='001', collection='alf',
+ revision='2020-01-01', namespace=None, object='obj', attribute='attr', timescale=None,
+ extra=None, extension='ext')
+ # NB: We assertEqual instead of assertDictEqual because the order must always be correct
+ self.assertEqual(expected, parsed)
+ # With session path
+ parsed = self.alfpath.session_path().parse_alf_path()
+ _expected = {**expected, **{k: None for k in list(expected.keys())[4:]}}
+ self.assertEqual(_expected, parsed)
+ # With dataset name
+ parsed = PureALFPath(self.alfpath.name).parse_alf_path()
+ _expected = {**expected, **{k: None for k in list(expected.keys())[:6]}}
+ self.assertEqual(_expected, parsed)
+ # With invalid path
+ parsed = PureALFPath(ALFPath('foo/bar/Subjects/baz.pie')).parse_alf_path()
+ _expected = dict.fromkeys(expected)
+ self.assertEqual(_expected, parsed)
+
+ def test_parse_alf_name(self):
+ """Test PureALFPath.parse_alf_name method."""
+ # With dataset name
+ parsed = self.alfpath.parse_alf_name()
+ self.assertIsInstance(parsed, dict)
+ expected = dict(
+ namespace=None, object='obj', attribute='attr',
+ timescale=None, extra=None, extension='ext')
+ # NB: We assertEqual instead of assertDictEqual because the order must always be correct
+ self.assertEqual(expected, parsed)
+ # With invalid dataset path
+ parsed = PureALFPath(ALFPath('foo/bar/Subjects/baz.pie')).parse_alf_name()
+ _expected = dict.fromkeys(expected)
+ self.assertEqual(_expected, parsed)
+
+ def test_ensure_alf_path(self):
+ """Test for one.alf.path.ensure_alf_path function."""
+ # Check str -> ALFPath
+ alfpath = ensure_alf_path(str(self.alfpath))
+ self.assertIsInstance(alfpath, ALFPath, 'failed to cast str to ALFPath')
+ # Check ALFPath -> ALFPath
+ alfpath = ensure_alf_path(self.alfpath)
+ self.assertIs(alfpath, self.alfpath, 'expected identity behaviour')
+ # Check PureALFPath -> PureALFPath
+ alfpath = ensure_alf_path(PureALFPath(self.alfpath))
+ self.assertIsInstance(alfpath, PureALFPath)
+ self.assertNotIsInstance(alfpath, ALFPath)
+ # Check PureWindowsPath -> PureWindowsALFPath
+ alfpath = ensure_alf_path(PureWindowsPath(self.alfpath))
+ self.assertIsInstance(alfpath, PureALFPath)
+ self.assertIsInstance(alfpath, PureWindowsPath)
+ self.assertNotIsInstance(alfpath, ALFPath)
+ # Check PurePosixPath -> PurePosixALFPath
+ alfpath = ensure_alf_path(PurePosixPath(self.alfpath))
+ self.assertIsInstance(alfpath, PureALFPath)
+ self.assertIsInstance(alfpath, PurePosixPath)
+ self.assertNotIsInstance(alfpath, ALFPath)
+ # Check arbitrary PurePath -> PureALFPath
+
+ class ArbitraryPurePath(PurePath):
+ @classmethod
+ def _parse_args(cls, args):
+ return self.alfpath._flavour.parse_parts(args[0].parts)
+ alfpath = ensure_alf_path(ArbitraryPurePath(self.alfpath))
+ self.assertIsInstance(alfpath, PureALFPath)
+ # Check Path -> ALFPath
+ alfpath = ensure_alf_path(Path(self.alfpath))
+ self.assertIsInstance(alfpath, ALFPath)
+ # Check operation on list
+ alfpaths = ensure_alf_path([str(self.alfpath)])
+ self.assertEqual([self.alfpath], alfpaths)
+ # Check assertions
+ self.assertRaises(TypeError, ensure_alf_path, 20)
+
+
+if __name__ == '__main__':
+ unittest.main(exit=False, verbosity=2)
diff --git a/one/tests/test_converters.py b/one/tests/test_converters.py
index f984e0a5..1c998fe8 100644
--- a/one/tests/test_converters.py
+++ b/one/tests/test_converters.py
@@ -11,6 +11,7 @@
from one import converters
from one.alf.path import add_uuid_string
from one.alf.cache import EMPTY_DATASETS_FRAME
+from one.alf.path import ALFPath, PurePosixALFPath, PureWindowsALFPath
from . import util, OFFLINE_ONLY, TEST_DB_2
@@ -78,8 +79,9 @@ def test_eid2path(self):
"""Test for ConversionMixin.eid2path"""
eid = 'd3372b15-f696-4279-9be5-98f15783b5bb'
verifiable = self.one.eid2path(eid)
- expected = Path(self.tempdir.name).joinpath(
+ expected = ALFPath(self.tempdir.name).joinpath(
'mainenlab', 'Subjects', 'ZFM-01935', '2021-02-05', '001')
+ self.assertIsInstance(verifiable, ALFPath)
self.assertEqual(expected, verifiable)
with self.assertRaises(ValueError):
@@ -276,10 +278,10 @@ def test_record2path(self):
# As pd.Series
alf_path = ('hoferlab/Subjects/SWC_043/2020-09-21/001/'
'alf/probe00/_phy_spikes_subset.channels.npy')
- expected = Path(self.one.alyx.cache_dir).joinpath(*alf_path.split('/'))
+ expected = ALFPath(self.one.alyx.cache_dir).joinpath(*alf_path.split('/'))
data_id = '00c234a3-a4ff-4f97-a522-939d15528a45'
path = self.one.record2path(rec.loc[(self.eid, data_id)])
- self.assertIsInstance(path, Path)
+ self.assertIsInstance(path, ALFPath)
self.assertEqual(expected, path)
# As pd.DataFrame
idx = rec.rel_path == 'alf/probe00/_phy_spikes_subset.channels.npy'
@@ -295,15 +297,18 @@ def test_record2path(self):
self.one.uuid_filenames = True
expected = expected.with_suffix(f'.{data_id}.npy')
self.assertEqual([expected], self.one.record2path(rec[idx])) # as pd.DataFrame
- self.assertEqual(expected, self.one.record2path(rec[idx].squeeze())) # as pd.Series
+ verifiable = self.one.record2path(rec[idx].squeeze())
+ self.assertEqual(expected, verifiable) # as pd.Series
+ self.assertIsInstance(verifiable, ALFPath)
finally:
self.one.uuid_filenames = False
def test_eid2path(self):
"""Test for OneAlyx.eid2path"""
verifiable = self.one.eid2path(self.eid, query_type='remote')
- expected = Path(self.one.cache_dir).joinpath('hoferlab', 'Subjects', 'SWC_043',
- '2020-09-21', '001',)
+ expected = ALFPath(self.one.cache_dir).joinpath(
+ 'hoferlab', 'Subjects', 'SWC_043', '2020-09-21', '001',)
+ self.assertIsInstance(verifiable, ALFPath)
self.assertEqual(expected, verifiable)
with self.assertRaises(ValueError):
@@ -428,6 +433,7 @@ def test_dsets_2_path(self):
# Test one_path_from_dataset
root = PurePosixPath('/one_root')
testable = converters.one_path_from_dataset(self.dset, one_cache=root)
+ self.assertIsInstance(testable, PurePosixALFPath)
self.assertEqual(str(testable), one_path)
# Check list input
testable = converters.one_path_from_dataset([self.dset], one_cache=root)
@@ -435,12 +441,13 @@ def test_dsets_2_path(self):
# Check handles string inputs
testable = converters.one_path_from_dataset(self.dset, one_cache='/one_root')
self.assertTrue(hasattr(testable, 'is_absolute'), 'Failed to return Path object')
+ self.assertIsInstance(testable, ALFPath)
self.assertEqual(str(testable).replace('\\', '/'), one_path)
# Test one_path_from_dataset using Windows path
one_path = PureWindowsPath(r'C:/Users/User/')
testable = converters.one_path_from_dataset(self.dset, one_cache=one_path)
- self.assertIsInstance(testable, PureWindowsPath)
+ self.assertIsInstance(testable, PureWindowsALFPath)
self.assertTrue(str(testable).startswith(str(one_path)))
self.assertTrue('hoferlab/Subjects' in testable.as_posix())
# Check repository arg
@@ -451,7 +458,7 @@ def test_dsets_2_path(self):
# Tests path_from_filerecord: when given a string, a system path object should be returned
fr = self.dset['file_records'][0]
testable = converters.path_from_filerecord(fr, root_path='C:\\')
- self.assertIsInstance(testable, Path)
+ self.assertIsInstance(testable, ALFPath)
# Check list
testable = converters.path_from_filerecord([fr], root_path='C:\\')
self.assertIsInstance(testable, list)
@@ -464,11 +471,13 @@ def test_session_record2path(self):
"""Test for one.converters.session_record2path"""
rec = {'subject': 'ALK01', 'date': '2020-01-01', 'number': 1}
path = converters.session_record2path(rec)
- self.assertEqual(path, PurePosixPath('ALK01/2020-01-01/001'))
+ self.assertIsInstance(path, PurePosixALFPath)
+ self.assertEqual(path, PurePosixALFPath('ALK01/2020-01-01/001'))
rec = {'date': datetime.datetime.fromisoformat('2020-01-01').date(),
'number': '001', 'lab': 'foo', 'subject': 'ALK01'}
path = converters.session_record2path(rec, str(Path.home()))
+ self.assertIsInstance(path, ALFPath)
self.assertEqual(path, Path.home() / 'foo/Subjects/ALK01/2020-01-01/001')
diff --git a/one/tests/test_one.py b/one/tests/test_one.py
index 3b6eb43f..f741af62 100644
--- a/one/tests/test_one.py
+++ b/one/tests/test_one.py
@@ -54,7 +54,7 @@
import one.alf.exceptions as alferr
from one.converters import datasets2records
from one.alf import spec
-from one.alf.files import get_alf_path
+from one.alf.path import get_alf_path
from one.alf.cache import EMPTY_DATASETS_FRAME, EMPTY_SESSIONS_FRAME
from . import util
from . import OFFLINE_ONLY, TEST_DB_1, TEST_DB_2 # 1 = TestAlyx; 2 = OpenAlyx
diff --git a/one/tests/test_registration.py b/one/tests/test_registration.py
index 9fd37cde..eb5426f6 100644
--- a/one/tests/test_registration.py
+++ b/one/tests/test_registration.py
@@ -223,7 +223,7 @@ def test_create_sessions(self):
self.assertEqual(session_paths[0], session_path)
def test_prepare_files(self):
- """Test for RegistrationClient.prepare_files"""
+ """Test for RegistrationClient.prepare_files method."""
session_path = self.session_path.parent / next_num_folder(self.session_path.parent)
session_path_2 = session_path.parent / next_num_folder(session_path)
@@ -247,7 +247,7 @@ def test_prepare_files(self):
self.assertListEqual(V[session_path_2], [versions[-1]])
def test_check_protected(self):
- """Test for RegistrationClient.check_protected_files"""
+ """Test for RegistrationClient.check_protected_files method."""
session_path, eid = self.client.create_new_session(self.subject)
file_name = session_path.joinpath('wheel.timestamps.npy')