Skip to content

Commit

Permalink
UUID filenames (#100)
Browse files Browse the repository at this point in the history
* Flag for uuid filenames on local filesystem
* record2path supports UUID in filename
* Minor fixes, tests, and docstrings
  • Loading branch information
k1o0 authored Nov 1, 2023
1 parent 31bfe65 commit c94c75a
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 25 deletions.
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog
## [Latest](https://github.com/int-brain-lab/ONE/commits/main) [2.4.0]
## [Latest](https://github.com/int-brain-lab/ONE/commits/main) [2.5.0]

### Added

- One.uuid_filenames property supports local files containing the dataset UUID

### Modified

- One._save_cache lock file timeout changed from 30 to 5 seconds

## [2.4.0]

### Added

Expand Down
2 changes: 1 addition & 1 deletion one/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""The Open Neurophysiology Environment (ONE) API."""
__version__ = '2.4.0'
__version__ = '2.5.0'
80 changes: 59 additions & 21 deletions one/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class One(ConversionMixin):
'dataset', 'date_range', 'laboratory', 'number', 'projects', 'subject', 'task_protocol'
)

uuid_filenames = None
"""bool: whether datasets on disk have a UUID in their filename"""

def __init__(self, cache_dir=None, mode='auto', wildcards=True, tables_dir=None):
"""An API for searching and loading data on a local filesystem
Expand Down Expand Up @@ -72,6 +75,8 @@ def __init__(self, cache_dir=None, mode='auto', wildcards=True, tables_dir=None)
self.mode = mode
self.wildcards = wildcards # Flag indicating whether to use regex or wildcards
self.record_loaded = False
# assign property here as different instances may work on separate filesystems
self.uuid_filenames = False
# init the cache file
self._reset_cache()
self.load_cache()
Expand Down Expand Up @@ -177,7 +182,7 @@ def _save_cache(self, save_dir=None, force=False):
force : bool
If True, the cache is saved regardless of modification time.
"""
TIMEOUT = 30 # Delete lock file this many seconds after creation/modification or waiting
TIMEOUT = 5 # Delete lock file this many seconds after creation/modification or waiting
lock_file = Path(self.cache_dir).joinpath('.cache.lock')
save_dir = Path(save_dir or self.cache_dir)
meta = self._cache['_meta']
Expand Down Expand Up @@ -569,6 +574,8 @@ def _check_filesystem(self, datasets, offline=None, update_exists=True):
# First go through datasets and check if file exists and hash matches
for i, rec in datasets.iterrows():
file = Path(self.cache_dir, *rec[['session_path', 'rel_path']])
if self.uuid_filenames:
file = alfiles.add_uuid_string(file, i[1] if isinstance(i, tuple) else i)
if file.exists():
# Check if there's a hash mismatch
# If so, add this index to list of datasets that need downloading
Expand Down Expand Up @@ -2138,31 +2145,61 @@ def _add_date(records):

def _download_datasets(self, dsets, **kwargs) -> List[Path]:
"""
Download a single or multitude of datasets if stored on AWS, otherwise calls
OneAlyx._download_dataset.
Download a single or multitude of datasets if stored on AWS, otherwise calls
OneAlyx._download_dataset.
NB: This will not skip files that are already present. Use check_filesystem instead.
NB: This will not skip files that are already present. Use check_filesystem instead.
Parameters
----------
dset : dict, str, pd.Series
A single or multitude of dataset dictionaries.
Parameters
----------
dset : dict, str, pandas.Series, pandas.DataFrame
A single or multitude of dataset dictionaries. For AWS downloads the input must be a
data frame.
Returns
-------
list of pathlib.Path
A list of local file paths.
"""
# determine whether to remove the UUID after download, this may be overridden by user
kwargs['keep_uuid'] = kwargs.get('keep_uuid', self.uuid_filenames)

Returns
-------
pathlib.Path
A local file path or list of paths.
"""
# If all datasets exist on AWS, download from there.
try:
if not isinstance(dsets, pd.DataFrame):
raise TypeError('Input datasets must be a pandas data frame for AWS download.')
if 'exists_aws' in dsets and np.all(np.equal(dsets['exists_aws'].values, True)):
_logger.info('Downloading from AWS')
return self._download_aws(map(lambda x: x[1], dsets.iterrows()), **kwargs)
except Exception as ex:
_logger.debug(ex)
return self._download_dataset(dsets, **kwargs)

def _download_aws(self, dsets, update_exists=True, **_) -> List[Path]:
def _download_aws(self, dsets, update_exists=True, keep_uuid=None, **_) -> List[Path]:
"""
Download datasets from an AWS S3 instance using boto3.
Parameters
----------
dsets : list of pandas.Series
An iterable for datasets as a pandas Series.
update_exists : bool
If true, the 'exists_aws' field of the cache table is set to False for any missing
datasets.
keep_uuid : bool
If false, the dataset UUID is removed from the downloaded filename. If None, the
`uuid_filenames` attribute determined whether the UUID is kept (default is false).
Returns
-------
list of pathlib.Path
A list the length of `dsets` of downloaded dataset file paths. Missing datasets are
returned as None.
See Also
--------
one.remote.aws.s3_download_file - The AWS download function.
"""
# Download datasets from AWS
import one.remote.aws as aws
s3, bucket_name = aws.get_s3_from_alyx(self.alyx)
Expand Down Expand Up @@ -2191,8 +2228,9 @@ def _download_aws(self, dsets, update_exists=True, **_) -> List[Path]:
continue
source_path = PurePosixPath(record['data_repository_path'], record['relative_path'])
source_path = alfiles.add_uuid_string(source_path, uuid)
local_path = alfiles.remove_uuid_string(
self.cache_dir.joinpath(dset['session_path'], dset['rel_path']))
local_path = self.cache_dir.joinpath(dset['session_path'], dset['rel_path'])
if keep_uuid is True or (keep_uuid is None and self.uuid_filenames is True):
local_path = alfiles.add_uuid_string(local_path, uuid)
local_path.parent.mkdir(exist_ok=True, parents=True)
out_files.append(aws.s3_download_file(
source_path, local_path, s3=s3, bucket_name=bucket_name, overwrite=update_exists))
Expand Down Expand Up @@ -2275,7 +2313,7 @@ def _download_dataset(self, dset, cache_dir=None, update_cache=True, **kwargs) -
Returns
-------
pathlib.Path, list
list of pathlib.Path
A local file path or list of paths.
"""
cache_dir = cache_dir or self.cache_dir
Expand Down Expand Up @@ -2317,7 +2355,7 @@ def _tag_mismatched_file_record(self, url):
f'Failed to tag remote file record mismatch: {ex}\n'
'Please contact the database administrator.')

def _download_file(self, url, target_dir, keep_uuid=False, file_size=None, hash=None):
def _download_file(self, url, target_dir, keep_uuid=None, file_size=None, hash=None):
"""
Downloads a single file or multitude of files from an HTTP webserver.
The webserver in question is set by the AlyxClient object.
Expand All @@ -2329,15 +2367,15 @@ def _download_file(self, url, target_dir, keep_uuid=False, file_size=None, hash=
target_dir : str, list
Absolute path of directory to download file to (including alf path).
keep_uuid : bool
If true, the UUID is not removed from the file name (default is False).
If true, the UUID is not removed from the file name. See `uuid_filenames' property.
file_size : int, list
The expected file size or list of file sizes to compare with downloaded file.
hash : str, list
The expected file hash or list of file hashes to compare with downloaded file.
Returns
-------
pathlib.Path
pathlib.Path or list of pathlib.Path
The file path of the downloaded file or files.
Example
Expand All @@ -2360,7 +2398,7 @@ def _download_file(self, url, target_dir, keep_uuid=False, file_size=None, hash=
self._check_hash_and_file_size_mismatch(*args)

# check if we are keeping the uuid on the list of file names
if keep_uuid:
if keep_uuid is True or (keep_uuid is None and self.uuid_filenames):
return local_path

# remove uuids from list of file names
Expand Down
3 changes: 3 additions & 0 deletions one/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ def record2path(self, dataset) -> Optional[Path]:
assert isinstance(dataset, pd.Series) or len(dataset) == 1
session_path, rel_path = dataset[['session_path', 'rel_path']].to_numpy().flatten()
file = Path(self.cache_dir, session_path, rel_path)
if self.uuid_filenames:
i = dataset.name if isinstance(dataset, pd.Series) else dataset.index[0]
file = add_uuid_string(file, i[1] if isinstance(i, tuple) else i)
return file # files[0] if len(datasets) == 1 else files

@recurse
Expand Down
12 changes: 11 additions & 1 deletion one/tests/test_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,23 @@ def test_record2path(self):
alf_path = ('hoferlab/Subjects/SWC_043/2020-09-21/001/'
'alf/probe00/_phy_spikes_subset.channels.npy')
expected = Path(self.one.alyx.cache_dir).joinpath(*alf_path.split('/'))
path = self.one.record2path(rec.loc[(self.eid, '00c234a3-a4ff-4f97-a522-939d15528a45')])
data_id = '00c234a3-a4ff-4f97-a522-939d15528a45'
path = self.one.record2path(rec.loc[(self.eid, data_id)])
self.assertIsInstance(path, Path)
self.assertEqual(expected, path)
# As pd.DataFrame
idx = rec.rel_path == 'alf/probe00/_phy_spikes_subset.channels.npy'
path = self.one.record2path(rec[idx])
self.assertEqual(expected, path)
# With UUID in file name
try:
self.one.uuid_filenames = True
expected = expected.with_suffix(f'.{data_id}.npy')
self.assertEqual(expected, self.one.record2path(rec[idx])) # as pd.DataFrame
self.assertEqual(expected, self.one.record2path(rec[idx].squeeze())) # as pd.Series
self.assertEqual(expected, self.one.record2path(rec[idx].droplevel(0))) # no eid
finally:
self.one.uuid_filenames = False

def test_eid2path(self):
"""Test for OneAlyx.eid2path"""
Expand Down
35 changes: 34 additions & 1 deletion one/tests/test_one.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,16 @@ def test_check_filesystem(self):
# Attempt the same with the eid index missing
datasets = datasets.droplevel(0).drop('session_path', axis=1)
self.assertEqual(files, self.one._check_filesystem(datasets))
# Test with uuid_filenames as True
self.one.uuid_filenames = True
try:
for file, (uuid, _) in zip(files, datasets.iterrows()):
file.rename(file.with_suffix(f'.{uuid}{file.suffix}'))
files = self.one._check_filesystem(datasets)
self.assertTrue(all(files))
self.assertIn(datasets.index[0], files[0].name)
finally:
self.one.uuid_filenames = False

def test_load_dataset(self):
"""Test One.load_dataset"""
Expand Down Expand Up @@ -735,6 +745,17 @@ def test_save_cache(self):
raw_modified = One(cache_dir=tdir)._cache['_meta']['raw']['datasets']['date_modified']
expected = self.one._cache['_meta']['modified_time'].strftime('%Y-%m-%d %H:%M')
self.assertEqual(raw_modified, expected)
# Test file lock
t_now = time.time()
with mock.patch('one.api.time') as time_mock:
lock_file = Path(self.one.cache_dir).joinpath('.cache.lock')
lock_file.touch()
# We expect the process to sleep at first, then after skipping time,
# the stale lock file should be removed.
time_mock.time.side_effect = (t_now, t_now + 30)
self.one._save_cache(save_dir=tdir, force=True) # force flag ignores modified time
self.assertFalse(lock_file.exists(), 'failed to remove stale lock file')
time_mock.sleep.assert_called()

def test_update_cache_from_records(self):
"""Test One._update_cache_from_records"""
Expand Down Expand Up @@ -1525,6 +1546,14 @@ def test_download_aws(self):
mock.patch('one.remote.aws.s3_download_file', return_value=file) as method:
self.one._download_datasets(dsets)
self.assertEqual(len(dsets), method.call_count)
# Check output filename
_, local = method.call_args.args
self.assertTrue(local.as_posix().startswith(self.one.cache_dir.as_posix()))
self.assertTrue(local.as_posix().endswith(dsets.iloc[-1, -1]))
# Check keep_uuid = True
self.one._download_datasets(dsets, keep_uuid=True)
_, local = method.call_args.args
self.assertIn(dsets.iloc[-1].name, local.name)

# Test behaviour when dataset not remotely accessible
dsets = dsets[:1].copy()
Expand All @@ -1544,7 +1573,11 @@ def test_download_aws(self):
with mock.patch('one.remote.aws.get_s3_from_alyx', side_effect=RuntimeError), \
mock.patch.object(self.one, '_download_dataset') as mock_method:
self.one._download_datasets(dsets)
mock_method.assert_called_with(dsets)
mock_method.assert_called_with(dsets, keep_uuid=False)
# Test type check (download_aws only works with data frames)
with mock.patch.object(self.one, '_download_dataset') as mock_method:
self.one._download_datasets(dsets.to_dict('records'))
mock_method.assert_called()

def test_tag_mismatched_file_record(self):
"""Test for OneAlyx._tag_mismatched_file_record.
Expand Down

0 comments on commit c94c75a

Please sign in to comment.