Skip to content

Commit

Permalink
Handle specially the analysis during storage migration to avoid input…
Browse files Browse the repository at this point in the history
… files duplication
  • Loading branch information
davidfarkas committed Mar 8, 2018
1 parent df6eeeb commit 48162ad
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 73 deletions.
166 changes: 98 additions & 68 deletions bin/oneoffs/migrate_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@
log = logging.getLogger('migrate_storage')
log.setLevel(logging.INFO)

COLLECTIONS_PREFIXES = [('projects', 'files'),
('acquisitions', 'files'),
('analyses', 'files'),
('sessions', 'files'),
('sessions', 'subject.files'),
('collections', 'files')]


class MigrationError(Exception):
pass
Expand All @@ -44,16 +37,16 @@ def cleanup_empty_folders():
os.rmdir(_dirpath)


def get_collections_files():
def get_containers_files(containers_prefixes):
_files = []

for collection, prefix in COLLECTIONS_PREFIXES:
cursor = config.db.get_collection(collection).find({})
for container, prefix in containers_prefixes:
cursor = config.db.get_collection(container).find({})
for document in cursor:
for f in get_files_by_prefix(document, prefix):
f_dict = {
'collection_id': document.get('_id'),
'collection': collection,
'container_id': document.get('_id'),
'container': container,
'fileinfo': f,
'prefix': prefix
}
Expand All @@ -78,60 +71,97 @@ def get_gears_files():
return _files


def migrate_collections():
log.info('Migrate collection files...')

_files = get_collections_files()

for i, f in enumerate(_files):
try:
file_id = f['fileinfo'].get('_id', '')
if file_id:
file_path = util.path_from_uuid(file_id)
if not config.fs.isfile(file_path):
"""Copy file from legacy to new storage"""

log.debug('copy file between the legacy and new storage: %s' % file_path)

dst_dir = fs.path.dirname(file_path)
config.fs.makedirs(dst_dir, recreate=True)
fs.move.copy_file(src_fs=config.legacy_fs, src_path=file_path, dst_fs=config.fs, dst_path=file_path)
else:
"""generate uuuid, set the id field and copy the file"""
file_id = str(uuid.uuid4())
f_old_path = util.path_from_hash(f['fileinfo']['hash'])
f_new_path = util.path_from_uuid(file_id)
def migrate_file(f):
try:
file_id = f['fileinfo'].get('_id', '')
if file_id:
file_path = util.path_from_uuid(file_id)
if not config.fs.isfile(file_path):
"""Copy file from legacy to new storage"""

log.debug('copy file %s to %s' % (f_old_path, f_new_path))
log.debug('copy file between the legacy and new storage: %s' % file_path)

dst_dir = fs.path.dirname(f_new_path)
dst_dir = fs.path.dirname(file_path)
config.fs.makedirs(dst_dir, recreate=True)
fs.move.copy_file(src_fs=config.legacy_fs, src_path=f_old_path, dst_fs=config.fs, dst_path=f_new_path)

update_set = {
f['prefix'] + '.$.modified': datetime.datetime.utcnow(),
f['prefix'] + '.$._id': file_id
}
log.debug('update file in mongo: %s' % update_set)
# Update the file with the newly generated UUID
updated_doc = config.db[f['collection']].find_one_and_update(
{'_id': f['collection_id'], f['prefix'] + '.name': f['fileinfo']['name'],
f['prefix'] + '.hash': f['fileinfo']['hash']},
{'$set': update_set}
)

if not updated_doc:
log.info('Probably the following file has been updated during the migration '
'and its hash is changed, cleaning up from the new filesystem')
config.fs.remove(f_new_path)

show_progress(i + 1, len(_files))

except Exception as e:
log.exception(e)
raise MigrationError('Wasn\'t able to migrate the \'%s\' '
'file in the \'%s\' collection (collection id: %s)' %
(f['fileinfo']['name'], f['collection'], str(f['collection_id'])), e)
fs.move.copy_file(src_fs=config.legacy_fs, src_path=file_path, dst_fs=config.fs, dst_path=file_path)
else:
"""generate uuuid, set the id field and copy the file"""
file_id = str(uuid.uuid4())
f_old_path = util.path_from_hash(f['fileinfo']['hash'])
f_new_path = util.path_from_uuid(file_id)

log.debug('copy file %s to %s' % (f_old_path, f_new_path))

dst_dir = fs.path.dirname(f_new_path)
config.fs.makedirs(dst_dir, recreate=True)
fs.move.copy_file(src_fs=config.legacy_fs, src_path=f_old_path, dst_fs=config.fs, dst_path=f_new_path)

update_set = {
f['prefix'] + '.$.modified': datetime.datetime.utcnow(),
f['prefix'] + '.$._id': file_id
}
log.debug('update file in mongo: %s' % update_set)
# Update the file with the newly generated UUID
updated_doc = config.db[f['container']].find_one_and_update(
{'_id': f['container_id'],
f['prefix'] + '.name': f['fileinfo']['name'],
f['prefix'] + '.hash': f['fileinfo']['hash']},
{'$set': update_set}
)

if not updated_doc:
log.info('Probably the following file has been updated during the migration '
'and its hash is changed, cleaning up from the new filesystem')
config.fs.remove(f_new_path)

except Exception as e:
log.exception(e)
raise MigrationError('Wasn\'t able to migrate the \'%s\' '
'file in the \'%s\' container (container id: %s)' %
(f['fileinfo']['name'], f['container'], str(f['container_id'])), e)


def migrate_containers():
log.info('Migrate container (projects, acquisitions, sessions, subject, collections) files...')

container_files = get_containers_files([('projects', 'files'),
('acquisitions', 'files'),
('sessions', 'files'),
('sessions', 'subject.files'),
('collections', 'files')])

for i, f in enumerate(container_files):
migrate_file(f)
show_progress(i + 1, len(container_files))

log.info('Migrate analysis files...')
# Refresh the list of container files
container_files = get_containers_files([('projects', 'files'),
('acquisitions', 'files'),
('sessions', 'files'),
('sessions', 'subject.files'),
('collections', 'files')])
analysis_files = get_containers_files([('analyses', 'files')])

for i, f in enumerate(analysis_files):
match = [cf for cf in container_files if cf['fileinfo']['hash'] == f['fileinfo']['hash'] and cf['fileinfo'].get('_id')]
# The file is already migrated
if len(match) > 0 and not f['fileinfo'].get('_id'):
update_set = {
f['prefix'] + '.$.modified': match[0]['fileinfo']['modified'],
f['prefix'] + '.$._id': match[0]['fileinfo']['_id']
}
log.debug('update file in mongo: %s' % update_set)
# Update the file with the newly generated UUID
config.db[f['container']].find_one_and_update(
{'_id': f['container_id'],
f['prefix'] + '.name': f['fileinfo']['name'],
f['prefix'] + '.hash': f['fileinfo']['hash']},
{'$set': update_set}
)
else:
migrate_file(f)
show_progress(i + 1, len(analysis_files))


def migrate_gears():
Expand Down Expand Up @@ -189,20 +219,20 @@ def migrate_storage():
"""

parser = argparse.ArgumentParser(prog='Migrate storage')
parser.add_argument('--collections', action='store_true', help='Migrate collections')
parser.add_argument('--containers', action='store_true', help='Migrate containers')
parser.add_argument('--gears', action='store_true', help='Migrate gears')
parser.add_argument('--delete-files', action='store_true', help='Delete files from legacy storage')


args = parser.parse_args()

try:
if not (args.collections or args.gears):
migrate_collections()
if not (args.containers or args.gears):
migrate_containers()
migrate_gears()

if args.collections:
migrate_collections()
if args.containers:
migrate_containers()

if args.gears:
migrate_gears()
Expand Down
10 changes: 5 additions & 5 deletions tests/integration_tests/python/test_migrate_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def files_to_migrate(data_builder, api_db, as_admin, randstr, file_form):
except:
pass

def test_migrate_collections(files_to_migrate, as_admin, migrate_storage):
def test_migrate_containers(files_to_migrate, as_admin, migrate_storage):
"""Testing collection migration"""

# get file storing by hash in legacy storage
Expand All @@ -194,7 +194,7 @@ def test_migrate_collections(files_to_migrate, as_admin, migrate_storage):
# download the file
assert as_admin.get(url_2, params={'ticket': ticket}).ok
# run the migration
migrate_storage.migrate_collections()
migrate_storage.migrate_containers()

# delete files from the legacy storage
config.legacy_fs.remove(file_path_1)
Expand All @@ -217,7 +217,7 @@ def test_migrate_collections(files_to_migrate, as_admin, migrate_storage):
# download the file
assert as_admin.get(url_2, params={'ticket': ticket}).ok

def test_migrate_collections_error(files_to_migrate, migrate_storage):
def test_migrate_containers_error(files_to_migrate, migrate_storage):
"""Testing that the migration script throws an exception if it couldn't migrate a file"""

# get file storing by hash in legacy storage
Expand All @@ -229,7 +229,7 @@ def test_migrate_collections_error(files_to_migrate, migrate_storage):
config.legacy_fs.remove(file_path_1)

with pytest.raises(migrate_storage.MigrationError):
migrate_storage.migrate_collections()
migrate_storage.migrate_containers()

# clean up
config.legacy_fs.remove(file_path_2)
Expand Down Expand Up @@ -296,7 +296,7 @@ def mocked(*args, **kwargs):
(_, file_name_2, url_2, file_path_2) = files_to_migrate[1]

# run the migration
migrate_storage.migrate_collections()
migrate_storage.migrate_containers()

file_1_id = api_db['projects'].find_one(
{'files.name': file_name_1}
Expand Down

0 comments on commit 48162ad

Please sign in to comment.