From 270b8b1b917bceeb9d86e99da8784cd4ff34fa8b Mon Sep 17 00:00:00 2001 From: David Farkas Date: Wed, 13 Dec 2017 16:37:13 +0100 Subject: [PATCH] Create placeholders for gear files and migrate them --- api/jobs/handlers.py | 1 - bin/oneoffs/create_placeholders.py | 53 +++++-- bin/oneoffs/remove_cas.py | 238 ++++++++++++++++++++--------- 3 files changed, 209 insertions(+), 83 deletions(-) diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py index 315af99a9..48c761afa 100644 --- a/api/jobs/handlers.py +++ b/api/jobs/handlers.py @@ -3,7 +3,6 @@ """ import bson import copy -import os import StringIO from jsonschema import ValidationError from urlparse import urlparse diff --git a/bin/oneoffs/create_placeholders.py b/bin/oneoffs/create_placeholders.py index d4d9723ed..534ea88c3 100755 --- a/bin/oneoffs/create_placeholders.py +++ b/bin/oneoffs/create_placeholders.py @@ -15,10 +15,16 @@ def get_files_by_prefix(document, prefix): return document -def create_placeholders(): - """ - Create placeholder files to help testing a system using sanitized customer DBs without the corresponding data files. - """ +def create_placeholder_file(f_path, extra_content): + target_dir = os.path.dirname(f_path) + if not os.path.exists(target_dir): + os.makedirs(target_dir) + with open(f_path, 'w') as placeholder: + placeholder.write('%s %s' % (f_path, extra_content)) + + +def placeholders_for_collections(): + log.info('Create placeholders for colelctions') COLLECTIONS_PREFIXES = [('projects', 'files'), ('acquisitions', 'files'), ('analyses', 'files'), @@ -43,19 +49,40 @@ def create_placeholders(): base = config.get_item('persistent', 'data_path') for i, f in enumerate(_files): f_path = os.path.join(base, util.path_from_hash(f['fileinfo']['hash'])) - - target_dir = os.path.dirname(f_path) - if not os.path.exists(target_dir): - os.makedirs(target_dir) - with open(f_path, 'w') as placeholder: - placeholder.write('%s %s' % (f_path, f['fileinfo']['size'])) + create_placeholder_file(f_path, f['fileinfo']['size']) # Show progress - if i % (len(_files) / 10) == 0: + if i % (len(_files) / 10 + 1) == 0: log.info('Processed %s files of total %s files ...' % (i, len(_files))) -if __name__ == '__main__': - create_placeholders() +def placeholders_for_gears(): + log.info('Create placeholders for gears') + cursor = config.db.get_collection('gears').find({}) + _files = [] + for document in cursor: + if document['exchange']['git-commit'] == 'local': + f_dict = { + 'gear_id': document['_id'], + 'gear_name': document['gear']['name'], + 'exchange': document['exchange'] + } + _files.append(f_dict) + + base = config.get_item('persistent', 'data_path') + for i, f in enumerate(_files): + f_hash = 'v0-' + f['exchange']['rootfs-hash'].replace(':', '-') + f_path = os.path.join(base, util.path_from_hash(f_hash)) + create_placeholder_file(f_path, f['gear_name']) + + # Show progress + if i % (len(_files) / 10 + 1) == 0: + log.info('Processed %s gear files of total %s files ...' % (i, len(_files))) +if __name__ == '__main__': + """ + Create placeholder files to help testing a system using sanitized customer DBs without the corresponding data files. + """ + placeholders_for_collections() + placeholders_for_gears() diff --git a/bin/oneoffs/remove_cas.py b/bin/oneoffs/remove_cas.py index a8906bdb4..2cbb852d4 100755 --- a/bin/oneoffs/remove_cas.py +++ b/bin/oneoffs/remove_cas.py @@ -12,9 +12,11 @@ log = logging.getLogger('remove_cas') log.setLevel(logging.INFO) + class MigrationError(Exception): pass + def get_files_by_prefix(document, prefix): for key in prefix.split('.'): document = document.get(key, {}) @@ -23,31 +25,32 @@ def get_files_by_prefix(document, prefix): def copy_file(path, target_path): target_dir = os.path.dirname(target_path) + if not os.path.exists(target_dir): os.makedirs(target_dir) shutil.copy(path, target_path) +def show_progress(current_index, total_files): + if current_index % (total_files / 10 + 1) == 0: + log.info('Processed %s files of total %s files ...' % (current_index, total_files)) + + def cleanup_empty_folders(): - # Cleanup the empty folders log.info('Cleanup empty folders') + for _dirpath, _, _ in os.walk(config.get_item('persistent', 'data_path'), topdown=False): if not (os.listdir(_dirpath) or config.get_item('persistent', 'data_path') == _dirpath): os.rmdir(_dirpath) -def remove_cas(): - """ - Remove CAS logic, generate UUID for the files and rename them on the filesystem, make a copy of the file if more - than one container using the same hash. - """ +def get_collections_files_hashes(): COLLECTIONS_PREFIXES = [('projects', 'files'), ('acquisitions', 'files'), ('analyses', 'files'), ('sessions', 'files'), ('sessions', 'subject.files'), ('collections', 'files')] - _hashes = [] _files = [] @@ -55,10 +58,6 @@ def remove_cas(): cursor = config.db.get_collection(collection).find({}) for document in cursor: for f in get_files_by_prefix(document, prefix): - u = f.get('_id', '') - if u: - continue - _hashes.append(f.get('hash', '')) f_dict = { 'collection_id': document.get('_id'), @@ -68,71 +67,172 @@ def remove_cas(): } _files.append(f_dict) + return _files, _hashes + + +def get_gears_files(): + cursor = config.db.get_collection('gears').find({}) + _files = [] + + for document in cursor: + if document['exchange']['git-commit'] == 'local': + f_dict = { + 'gear_id': document['_id'], + 'gear_name': document['gear']['name'], + 'exchange': document['exchange'] + } + _files.append(f_dict) + + return _files + + +def migrate_collections(): + log.info('Migrate collections...') + + _files, _hashes = get_collections_files_hashes() counter = Counter(_hashes) + base = config.get_item('persistent', 'data_path') + + for i, f in enumerate(_files): + if f['fileinfo'].get('_id', ''): + counter[f['fileinfo']['hash']] -= 1 + continue + try: + f_uuid = str(uuid.uuid4()) + f_path = os.path.join(base, util.path_from_hash(f['fileinfo']['hash'])) + log.debug('copy file %s to %s' % (f_path, util.path_from_uuid(f_uuid))) + copy_file(f_path, files.get_file_abs_path(f_uuid)) + + update_set = { + f['prefix'] + '.$.modified': datetime.datetime.utcnow(), + f['prefix'] + '.$._id': f_uuid + } + log.debug('update file in mongo: %s' % update_set) + # Update the file with the newly generated UUID + config.db[f['collection']].find_one_and_update( + {'_id': f['collection_id'], f['prefix'] + '.name': f['fileinfo']['name']}, + {'$set': update_set} + ) + + # Decrease the count of the current hash, so we will know when we can remove the original file + counter[f['fileinfo']['hash']] -= 1 + + if counter[f['fileinfo']['hash']] == 0: + log.debug('remove old file: %s' % f_path) + os.remove(f_path) + + show_progress(i+1, len(_files)) + except Exception as e: + log.exception(e) + raise MigrationError('Wasn\'t able to migrate the \'%s\' ' + 'file in the \'%s\' collection (collection id: %s)' % + (f['fileinfo']['name'], f['collection'], str(f['collection_id'])), e) + + +def rollback_collections(): + log.info('Rollback collections...') + + _files, _ = get_collections_files_hashes() + base = config.get_item('persistent', 'data_path') + + for i, f in enumerate(_files): + if f['fileinfo'].get('_id', ''): + hash_path = os.path.join(base, util.path_from_hash(f['fileinfo']['hash'])) + uuid_path = files.get_file_abs_path(f['fileinfo']['_id']) + if os.path.exists(hash_path) and os.path.exists(uuid_path): + os.remove(uuid_path) + elif os.path.exists(uuid_path): + copy_file(uuid_path, hash_path) + os.remove(uuid_path) + + config.db[f['collection']].find_one_and_update( + {'_id': f['collection_id'], f['prefix'] + '.name': f['fileinfo']['name']}, + {'$unset': {f['prefix'] + '.$._id': ''}} + ) + + show_progress(i + 1, len(_files)) + + +def migrate_gears(): + log.info('Migrate gears...') + + _files = get_gears_files() + base = config.get_item('persistent', 'data_path') + + for i, f in enumerate(_files): + if f['exchange'].get('rootfs-id', ''): + continue + try: + f_uuid = str(uuid.uuid4()) + f_hash = 'v0-' + f['exchange']['rootfs-hash'].replace(':', '-') + f_path = os.path.join(base, util.path_from_hash(f_hash)) + log.debug('copy file %s to %s' % (f_path, util.path_from_uuid(f_uuid))) + copy_file(f_path, files.get_file_abs_path(f_uuid)) + + update_set = { + 'modified': datetime.datetime.utcnow(), + 'exchange.rootfs-id': f_uuid + } + + log.debug('update file in mongo: %s' % update_set) + # Update the gear with the newly generated UUID + config.db['gears'].find_one_and_update( + {'_id': f['gear_id']}, + {'$set': update_set} + ) + + log.debug('remove old file: %s' % f_path) + os.remove(f_path) + + show_progress(i + 1, len(_files)) + except Exception as e: + log.exception(e) + raise MigrationError('Wasn\'t able to migrate the \'%s\' gear (gear id: %s)' % + (f['gear_name'], f['gear_id']), e) + + +def rollback_gears(): + log.info('Rollback gears...') + + _files = get_gears_files() + base = config.get_item('persistent', 'data_path') + + for i, f in enumerate(_files): + if f['exchange'].get('rootfs-id', ''): + f_hash = 'v0-' + f['exchange']['rootfs-hash'].replace(':', '-') + hash_path = os.path.join(base, util.path_from_hash(f_hash)) + uuid_path = files.get_file_abs_path(f['exchange']['rootfs-id']) + if os.path.exists(hash_path) and os.path.exists(uuid_path): + os.remove(uuid_path) + elif os.path.exists(uuid_path): + copy_file(uuid_path, hash_path) + os.remove(uuid_path) + + config.db['gears'].find_one_and_update( + {'_id': f['gear_id']}, + {'$unset': {'exchange.rootfs-id': ''}} + ) + + show_progress(i + 1, len(_files)) + + +def remove_cas(): + """ + Remove CAS logic, generate UUID for the files and rename them on the filesystem, make a copy of the file if more + than one container using the same hash. + """ try: - base = config.get_item('persistent', 'data_path') - for i, f in enumerate(_files): - try: - f_uuid = str(uuid.uuid4()) - f['_id'] = f_uuid - f_path = os.path.join(base, util.path_from_hash(f['fileinfo']['hash'])) - log.debug('copy file %s to %s' % (f_path, util.path_from_uuid(f_uuid))) - copy_file(f_path, files.get_file_abs_path(f_uuid)) - - update_set = { - f['prefix'] + '.$.modified': datetime.datetime.utcnow(), - f['prefix'] + '.$._id': f_uuid - } - log.debug('update file in mongo: %s' % update_set) - # Update the file with the newly generated UUID - config.db[f['collection']].find_one_and_update( - {'_id': f['collection_id'], f['prefix'] + '.name': f['fileinfo']['name']}, - {'$set': update_set} - ) - - # Decrease the count of the current hash, so we will know when we can remove the original file - counter[f['fileinfo']['hash']] -= 1 - - if counter[f['fileinfo']['hash']] == 0: - log.debug('remove old file: %s' % f_path) - os.remove(f_path) - - # Show progress - if i % (len(_files) / 10) == 0: - log.info('Processed %s files of total %s files ...' % (i, len(_files))) - - except Exception as e: - log.exception(e) - raise MigrationError('Wasn\'t able to migrate the \'%s\' ' - 'file in the \'%s\' collection (collection id: %s)' % - (f['fileinfo']['name'], f['collection'], str(f['collection_id'])), e) + migrate_collections() + migrate_gears() except MigrationError as e: log.exception(e) - log.info('Rollback...') - base = config.get_item('persistent', 'data_path') - for i, f in enumerate(_files): - if f.get('_id', ''): - hash_path = os.path.join(base, util.path_from_hash(f['fileinfo']['hash'])) - uuid_path = files.get_file_abs_path(f['_id']) - if os.path.exists(hash_path) and os.path.exists(uuid_path): - os.remove(uuid_path) - elif os.path.exists(uuid_path): - copy_file(uuid_path, hash_path) - os.remove(uuid_path) - config.db[f['collection']].find_one_and_update( - {'_id': f['collection_id'], f['prefix'] + '.name': f['fileinfo']['name']}, - {'$unset': {f['prefix'] + '.$._id': ''}} - ) - # Show progress - if i % (len(_files) / 10) == 0: - log.info('Processed %s files of total %s files ...' % (i, len(_files))) - cleanup_empty_folders() + rollback_collections() + rollback_gears() exit(1) - - cleanup_empty_folders() + finally: + cleanup_empty_folders() if __name__ == '__main__': remove_cas() -