From a56f116aecf42cfaec037fc56fe8f30ed8d9211b Mon Sep 17 00:00:00 2001 From: David Farkas Date: Thu, 1 Mar 2018 15:14:45 +0100 Subject: [PATCH] Handle specially the analysis during storage migration to avoid input files duplication --- bin/oneoffs/migrate_storage.py | 166 +++++++++++------- .../python/test_migrate_storage.py | 10 +- 2 files changed, 103 insertions(+), 73 deletions(-) diff --git a/bin/oneoffs/migrate_storage.py b/bin/oneoffs/migrate_storage.py index 278fc48f4..0df7fed36 100644 --- a/bin/oneoffs/migrate_storage.py +++ b/bin/oneoffs/migrate_storage.py @@ -13,13 +13,6 @@ log = logging.getLogger('migrate_storage') log.setLevel(logging.INFO) -COLLECTIONS_PREFIXES = [('projects', 'files'), - ('acquisitions', 'files'), - ('analyses', 'files'), - ('sessions', 'files'), - ('sessions', 'subject.files'), - ('collections', 'files')] - class MigrationError(Exception): pass @@ -44,16 +37,16 @@ def cleanup_empty_folders(): os.rmdir(_dirpath) -def get_collections_files(): +def get_containers_files(containers_prefixes): _files = [] - for collection, prefix in COLLECTIONS_PREFIXES: - cursor = config.db.get_collection(collection).find({}) + for container, prefix in containers_prefixes: + cursor = config.db.get_collection(container).find({}) for document in cursor: for f in get_files_by_prefix(document, prefix): f_dict = { - 'collection_id': document.get('_id'), - 'collection': collection, + 'container_id': document.get('_id'), + 'container': container, 'fileinfo': f, 'prefix': prefix } @@ -78,60 +71,97 @@ def get_gears_files(): return _files -def migrate_collections(): - log.info('Migrate collection files...') - - _files = get_collections_files() - - for i, f in enumerate(_files): - try: - file_id = f['fileinfo'].get('_id', '') - if file_id: - file_path = util.path_from_uuid(file_id) - if not config.fs.isfile(file_path): - """Copy file from legacy to new storage""" - - log.debug('copy file between the legacy and new storage: %s' % file_path) - - dst_dir = fs.path.dirname(file_path) - config.fs.makedirs(dst_dir, recreate=True) - fs.move.copy_file(src_fs=config.legacy_fs, src_path=file_path, dst_fs=config.fs, dst_path=file_path) - else: - """generate uuuid, set the id field and copy the file""" - file_id = str(uuid.uuid4()) - f_old_path = util.path_from_hash(f['fileinfo']['hash']) - f_new_path = util.path_from_uuid(file_id) +def migrate_file(f): + try: + file_id = f['fileinfo'].get('_id', '') + if file_id: + file_path = util.path_from_uuid(file_id) + if not config.fs.isfile(file_path): + """Copy file from legacy to new storage""" - log.debug('copy file %s to %s' % (f_old_path, f_new_path)) + log.debug('copy file between the legacy and new storage: %s' % file_path) - dst_dir = fs.path.dirname(f_new_path) + dst_dir = fs.path.dirname(file_path) config.fs.makedirs(dst_dir, recreate=True) - fs.move.copy_file(src_fs=config.legacy_fs, src_path=f_old_path, dst_fs=config.fs, dst_path=f_new_path) - - update_set = { - f['prefix'] + '.$.modified': datetime.datetime.utcnow(), - f['prefix'] + '.$._id': file_id - } - log.debug('update file in mongo: %s' % update_set) - # Update the file with the newly generated UUID - updated_doc = config.db[f['collection']].find_one_and_update( - {'_id': f['collection_id'], f['prefix'] + '.name': f['fileinfo']['name'], - f['prefix'] + '.hash': f['fileinfo']['hash']}, - {'$set': update_set} - ) - - if not updated_doc: - log.info('Probably the following file has been updated during the migration ' - 'and its hash is changed, cleaning up from the new filesystem') - config.fs.remove(f_new_path) - - show_progress(i + 1, len(_files)) - - except Exception as e: - log.exception(e) - raise MigrationError('Wasn\'t able to migrate the \'%s\' ' - 'file in the \'%s\' collection (collection id: %s)' % - (f['fileinfo']['name'], f['collection'], str(f['collection_id'])), e) + fs.move.copy_file(src_fs=config.legacy_fs, src_path=file_path, dst_fs=config.fs, dst_path=file_path) + else: + """generate uuuid, set the id field and copy the file""" + file_id = str(uuid.uuid4()) + f_old_path = util.path_from_hash(f['fileinfo']['hash']) + f_new_path = util.path_from_uuid(file_id) + + log.debug('copy file %s to %s' % (f_old_path, f_new_path)) + + dst_dir = fs.path.dirname(f_new_path) + config.fs.makedirs(dst_dir, recreate=True) + fs.move.copy_file(src_fs=config.legacy_fs, src_path=f_old_path, dst_fs=config.fs, dst_path=f_new_path) + + update_set = { + f['prefix'] + '.$.modified': datetime.datetime.utcnow(), + f['prefix'] + '.$._id': file_id + } + log.debug('update file in mongo: %s' % update_set) + # Update the file with the newly generated UUID + updated_doc = config.db[f['container']].find_one_and_update( + {'_id': f['container_id'], + f['prefix'] + '.name': f['fileinfo']['name'], + f['prefix'] + '.hash': f['fileinfo']['hash']}, + {'$set': update_set} + ) + + if not updated_doc: + log.info('Probably the following file has been updated during the migration ' + 'and its hash is changed, cleaning up from the new filesystem') + config.fs.remove(f_new_path) + + except Exception as e: + log.exception(e) + raise MigrationError('Wasn\'t able to migrate the \'%s\' ' + 'file in the \'%s\' container (container id: %s)' % + (f['fileinfo']['name'], f['container'], str(f['container_id'])), e) + + +def migrate_containers(): + log.info('Migrate container (projects, acquisitions, sessions, subject, collections) files...') + + container_files = get_containers_files([('projects', 'files'), + ('acquisitions', 'files'), + ('sessions', 'files'), + ('sessions', 'subject.files'), + ('collections', 'files')]) + + for i, f in enumerate(container_files): + migrate_file(f) + show_progress(i + 1, len(container_files)) + + log.info('Migrate analysis files...') + # Refresh the list of container files + container_files = get_containers_files([('projects', 'files'), + ('acquisitions', 'files'), + ('sessions', 'files'), + ('sessions', 'subject.files'), + ('collections', 'files')]) + analysis_files = get_containers_files([('analyses', 'files')]) + + for i, f in enumerate(analysis_files): + match = [cf for cf in container_files if cf['fileinfo']['hash'] == f['fileinfo']['hash'] and cf['fileinfo'].get('_id')] + # The file is already migrated + if len(match) > 0 and not f['fileinfo'].get('_id'): + update_set = { + f['prefix'] + '.$.modified': match[0]['fileinfo']['modified'], + f['prefix'] + '.$._id': match[0]['fileinfo']['_id'] + } + log.debug('update file in mongo: %s' % update_set) + # Update the file with the newly generated UUID + config.db[f['container']].find_one_and_update( + {'_id': f['container_id'], + f['prefix'] + '.name': f['fileinfo']['name'], + f['prefix'] + '.hash': f['fileinfo']['hash']}, + {'$set': update_set} + ) + else: + migrate_file(f) + show_progress(i + 1, len(analysis_files)) def migrate_gears(): @@ -189,7 +219,7 @@ def migrate_storage(): """ parser = argparse.ArgumentParser(prog='Migrate storage') - parser.add_argument('--collections', action='store_true', help='Migrate collections') + parser.add_argument('--containers', action='store_true', help='Migrate containers') parser.add_argument('--gears', action='store_true', help='Migrate gears') parser.add_argument('--delete-files', action='store_true', help='Delete files from legacy storage') @@ -197,12 +227,12 @@ def migrate_storage(): args = parser.parse_args() try: - if not (args.collections or args.gears): - migrate_collections() + if not (args.containers or args.gears): + migrate_containers() migrate_gears() - if args.collections: - migrate_collections() + if args.containers: + migrate_containers() if args.gears: migrate_gears() diff --git a/tests/integration_tests/python/test_migrate_storage.py b/tests/integration_tests/python/test_migrate_storage.py index 764d932bf..5cc179984 100644 --- a/tests/integration_tests/python/test_migrate_storage.py +++ b/tests/integration_tests/python/test_migrate_storage.py @@ -170,7 +170,7 @@ def files_to_migrate(data_builder, api_db, as_admin, randstr, file_form): except: pass -def test_migrate_collections(files_to_migrate, as_admin, migrate_storage): +def test_migrate_containers(files_to_migrate, as_admin, migrate_storage): """Testing collection migration""" # get file storing by hash in legacy storage @@ -194,7 +194,7 @@ def test_migrate_collections(files_to_migrate, as_admin, migrate_storage): # download the file assert as_admin.get(url_2, params={'ticket': ticket}).ok # run the migration - migrate_storage.migrate_collections() + migrate_storage.migrate_containers() # delete files from the legacy storage config.legacy_fs.remove(file_path_1) @@ -217,7 +217,7 @@ def test_migrate_collections(files_to_migrate, as_admin, migrate_storage): # download the file assert as_admin.get(url_2, params={'ticket': ticket}).ok -def test_migrate_collections_error(files_to_migrate, migrate_storage): +def test_migrate_containers_error(files_to_migrate, migrate_storage): """Testing that the migration script throws an exception if it couldn't migrate a file""" # get file storing by hash in legacy storage @@ -229,7 +229,7 @@ def test_migrate_collections_error(files_to_migrate, migrate_storage): config.legacy_fs.remove(file_path_1) with pytest.raises(migrate_storage.MigrationError): - migrate_storage.migrate_collections() + migrate_storage.migrate_containers() # clean up config.legacy_fs.remove(file_path_2) @@ -296,7 +296,7 @@ def mocked(*args, **kwargs): (_, file_name_2, url_2, file_path_2) = files_to_migrate[1] # run the migration - migrate_storage.migrate_collections() + migrate_storage.migrate_containers() file_1_id = api_db['projects'].find_one( {'files.name': file_name_1}