From b884bc3745f77d6262125245cd29e203a7c0e588 Mon Sep 17 00:00:00 2001 From: Megan Henning Date: Tue, 20 Feb 2018 15:52:46 -0600 Subject: [PATCH 1/5] Add handling of replaced files --- api/jobs/rules.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/api/jobs/rules.py b/api/jobs/rules.py index b111baf59..4fa4335e3 100644 --- a/api/jobs/rules.py +++ b/api/jobs/rules.py @@ -208,13 +208,18 @@ def create_potential_jobs(db, container, container_type, file_): return potential_jobs -def create_jobs(db, container_before, container_after, container_type): +def create_jobs(db, container_before, container_after, container_type, replaced_files=None): """ Given a before and after set of file attributes, enqueue a list of jobs that would only be possible after the changes. Returns the algorithm names that were queued. """ + # A list of FileContainerReferences that have been completely replaced + # Jobs with these as inputs should get enqueue even if they are in the jobs_before list + if not replaced_files: + replaced_files = [] + jobs_before, jobs_after, potential_jobs = [], [], [] files_before = container_before.get('files', []) @@ -229,13 +234,18 @@ def create_jobs(db, container_before, container_after, container_type): # Using a uniqueness constraint, create a list of the set difference of jobs_after \ jobs_before # (members of jobs_after that are not in jobs_before) for ja in jobs_after: - new_job = True - for jb in jobs_before: - if ja['job'].intention_equals(jb['job']): - new_job = False - break # this job matched in both before and after, ignore - if new_job: + + if set(replaced_files).intersection(set(ja['job'].inputs.itervalues())): + # one of the replaced files is an input potential_jobs.append(ja) + else: + should_enqueue_job = True + for jb in jobs_before: + if ja['job'].intention_equals(jb['job']): + should_enqueue_job = False + break # this job matched in both before and after, ignore + if should_enqueue_job: + potential_jobs.append(ja) spawned_jobs = [] From 4e3b3a6efd83268b1c3e021cbfabf1f57084d59f Mon Sep 17 00:00:00 2001 From: Megan Henning Date: Wed, 21 Feb 2018 16:19:25 -0600 Subject: [PATCH 2/5] Save, Ignore and Replace files accurately --- api/dao/hierarchy.py | 65 ++++++++++++++++++++++++-------------------- api/placer.py | 50 +++++++++++++++++----------------- 2 files changed, 60 insertions(+), 55 deletions(-) diff --git a/api/dao/hierarchy.py b/api/dao/hierarchy.py index e3bce623a..cc5183383 100644 --- a/api/dao/hierarchy.py +++ b/api/dao/hierarchy.py @@ -229,47 +229,45 @@ def upsert_fileinfo(cont_name, _id, fileinfo): _id = bson.ObjectId(_id) container_before = config.db[cont_name].find_one({'_id': _id}) - container_after, file_before = None, None + container_after = None + saved_state = 'saved' + + # Look to see if file with the same name already exists in the container for f in container_before.get('files',[]): - # Fine file in result and set to file_after + + # File already exists, respond accordingly if f['name'] == fileinfo['name']: + + # If the existing file is deleted, always replace (But this is not considered a "replaced" saved state) + # This creates a gap in the delete functionality, ie. this file cannot be restored from this point on. if 'deleted' in f: - # Ugly hack: remove already existing file that has the 'deleted' tag - # This creates a gap in the delete functionality, ie. this file cannot be restored from this point on. - # Note that the previous file in storage will be unreferenced from the DB (unless CAS edge case...) - config.db[cont_name].find_one_and_update( - {'_id': _id, 'files.name': fileinfo['name']}, - {'$pull': {'files': {'name': fileinfo['name']}}} - ) + remove_file(cont_name, _id, fileinfo['name']) + saved_state = 'saved' + + + # Files from a failed job should never replaced existing files that are "accepted" (unless they are deleted) + elif fileinfo.get('from_failed_job') and not f.get('from_failed_job'): + saved_state = 'ignored' + + # The file object is different than the existing, remove existing and replace it + elif f.get('hash') != fileinfo['hash']: + remove_file(cont_name, _id, fileinfo['name']) + saved_state = 'replaced' + + # The hash was the same, ignore else: - file_before = f + saved_state = 'ignored' + break - if file_before is None: + + if saved_state != 'ignored': fileinfo['created'] = fileinfo['modified'] container_after = add_fileinfo(cont_name, _id, fileinfo) - else: - container_after = update_fileinfo(cont_name, _id, fileinfo) - return container_before, container_after + return container_before, container_after, saved_state -def update_fileinfo(cont_name, _id, fileinfo): - if fileinfo.get('size') is not None: - if type(fileinfo['size']) != int: - log.warn('Fileinfo passed with non-integer size') - fileinfo['size'] = int(fileinfo['size']) - - update_set = {'files.$.modified': datetime.datetime.utcnow()} - # in this method, we are overriding an existing file. - # update_set allows to update all the fileinfo like size, hash, etc. - for k,v in fileinfo.iteritems(): - update_set['files.$.' + k] = v - return config.db[cont_name].find_one_and_update( - {'_id': _id, 'files.name': fileinfo['name']}, - {'$set': update_set}, - return_document=pymongo.collection.ReturnDocument.AFTER - ) def add_fileinfo(cont_name, _id, fileinfo): if fileinfo.get('size') is not None: @@ -283,6 +281,13 @@ def add_fileinfo(cont_name, _id, fileinfo): return_document=pymongo.collection.ReturnDocument.AFTER ) +def remove_file(cont_name, _id, filename): + config.db[cont_name].find_one_and_update( + {'_id': _id, 'files.name': filename}, + {'$pull': {'files': {'name': filename}}} + ) + + def _group_id_fuzzy_match(group_id, project_label): existing_group_ids = [g['_id'] for g in config.db.groups.find(None, ['_id'])] if group_id.lower() in existing_group_ids: diff --git a/api/placer.py b/api/placer.py index 0c642d9fc..235282033 100644 --- a/api/placer.py +++ b/api/placer.py @@ -44,6 +44,9 @@ def __init__(self, container_type, container, id_, metadata, timestamp, origin, # A list of files that have been saved via save_file() usually returned by finalize() self.saved = [] + # A list of files that have been ignored by save_file() because a file with the same name and hash already existed + self.ignored = [] + def check(self): """ @@ -91,11 +94,24 @@ def save_file(self, field=None, file_attrs=None): # Update the DB if file_attrs is not None: - container_before, self.container = hierarchy.upsert_fileinfo(self.container_type, self.id_, file_attrs) + container_before, self.container, saved_state = hierarchy.upsert_fileinfo(self.container_type, self.id_, file_attrs) + + # If this file was ignored because an existing file with the same name and hash existed on this project, + # add the file to the ignored list and move on + if saved_state == 'ignored': + self.ignored.append(file_attrs) + + else: + self.saved.append(file_attrs) + + # create_jobs handles files that have been replaced differently + replaced_files = [] + if saved_state == 'replaced': + replaced_files.append(containerutil.FileReference(self.container_type, self.id_, file_attrs['name'])) + + rules.create_jobs(config.db, container_before, self.container, self.container_type, replaced_files=replaced_files) + - # Queue any jobs as a result of this upload, uploading to a gear will not make jobs though - if self.container_type != 'gear': - rules.create_jobs(config.db, container_before, self.container, self.container_type) def recalc_session_compliance(self): if self.container_type in ['session', 'acquisition'] and self.id_: @@ -121,7 +137,7 @@ def process_file_field(self, field, file_attrs): if self.metadata: file_attrs.update(self.metadata) self.save_file(field, file_attrs) - self.saved.append(file_attrs) + def finalize(self): self.recalc_session_compliance() @@ -181,21 +197,11 @@ def process_file_field(self, field, file_attrs): r_metadata = target['metadata'] file_attrs.update(r_metadata) - if container.level != 'subject': - self.container_type = container.level - self.id_ = container.id_ - self.container = container.container - self.save_file(field, file_attrs) - else: - if field is not None: - files.move_form_file_field_into_cas(field) - if file_attrs is not None: - container.upsert_file(file_attrs) - - # # Queue any jobs as a result of this upload - # rules.create_jobs(config.db, self.container, self.container_type, info) - self.saved.append(file_attrs) + self.container_type = container.level + self.id_ = container.id_ + self.container = container.container + self.save_file(field, file_attrs) def finalize(self): # Check that there is at least one file being uploaded @@ -294,7 +300,6 @@ def process_file_field(self, field, file_attrs): file_attrs['from_failed_job'] = True self.save_file(field, file_attrs) - self.saved.append(file_attrs) def finalize(self): if self.metadata is not None: @@ -305,7 +310,6 @@ def finalize(self): for file_md in file_mds: if file_md['name'] not in saved_file_names: self.save_file(None, file_md) # save file_attrs update only - self.saved.append(file_md) # Remove file metadata as it was already updated in process_file_field for k in self.metadata.keys(): @@ -653,7 +657,6 @@ def check(self): def process_file_field(self, field, file_attrs): self.save_file(field) - self.saved.append(file_attrs) def finalize(self): # we are going to merge the "hard" infos from the processed upload @@ -689,7 +692,6 @@ def process_file_field(self, field, file_attrs): file_attrs['output'] = True file_attrs['created'] = file_attrs['modified'] self.save_file(field) - self.saved.append(file_attrs) def finalize(self): # Search the sessions table for analysis, replace file field @@ -720,9 +722,7 @@ def process_file_field(self, field, file_attrs): self.metadata.update({'exchange': {'rootfs-hash': proper_hash, 'git-commit': 'local', 'rootfs-url': 'INVALID'}}) - # self.metadata['hash'] = file_attrs.get('hash') self.save_file(field) - self.saved.append(file_attrs) self.saved.append(self.metadata) def finalize(self): From f75c39b4f3d936e78ec98747164b2a3587b32d54 Mon Sep 17 00:00:00 2001 From: Megan Henning Date: Tue, 27 Feb 2018 16:20:13 -0600 Subject: [PATCH 3/5] Add file versioning support --- api/dao/hierarchy.py | 69 ++++++++++++++++++++++++++++++++------------ api/upload.py | 3 +- bin/database.py | 22 +++++++++++++- 3 files changed, 73 insertions(+), 21 deletions(-) diff --git a/api/dao/hierarchy.py b/api/dao/hierarchy.py index cc5183383..aa040ed5c 100644 --- a/api/dao/hierarchy.py +++ b/api/dao/hierarchy.py @@ -223,7 +223,7 @@ def check_cont(cont, reqs): return False return True -def upsert_fileinfo(cont_name, _id, fileinfo): +def upsert_fileinfo(cont_name, _id, fileinfo, ignore_hash_replace=False): cont_name = containerutil.pluralize(cont_name) _id = bson.ObjectId(_id) @@ -240,9 +240,9 @@ def upsert_fileinfo(cont_name, _id, fileinfo): if f['name'] == fileinfo['name']: # If the existing file is deleted, always replace (But this is not considered a "replaced" saved state) - # This creates a gap in the delete functionality, ie. this file cannot be restored from this point on. if 'deleted' in f: remove_file(cont_name, _id, fileinfo['name']) + container_after = add_file(cont_name, _id, fileinfo) saved_state = 'saved' @@ -250,41 +250,72 @@ def upsert_fileinfo(cont_name, _id, fileinfo): elif fileinfo.get('from_failed_job') and not f.get('from_failed_job'): saved_state = 'ignored' - # The file object is different than the existing, remove existing and replace it - elif f.get('hash') != fileinfo['hash']: - remove_file(cont_name, _id, fileinfo['name']) - saved_state = 'replaced' + # The file object is the same as an existing file and the caller has chosen to ignore in this situation + elif f.get('hash') == fileinfo['hash'] and ignore_hash_replace: + saved_state = 'ignored' - # The hash was the same, ignore + # No special circumstances, proceed with a replace else: - saved_state = 'ignored' + container_after = replace_file(cont_name, _id, f, fileinfo) + saved_state = 'replaced' break - if saved_state != 'ignored': - fileinfo['created'] = fileinfo['modified'] - container_after = add_fileinfo(cont_name, _id, fileinfo) + else: - return container_before, container_after, saved_state + # File was not already in container, add as normal + container_after = add_file(cont_name, _id, fileinfo) + return container_before, container_after, saved_state -def add_fileinfo(cont_name, _id, fileinfo): - if fileinfo.get('size') is not None: - if type(fileinfo['size']) != int: - log.warn('Fileinfo passed with non-integer size') - fileinfo['size'] = int(fileinfo['size']) +def add_file(cont_name, _id, fileinfo): return config.db[cont_name].find_one_and_update( {'_id': _id}, {'$push': {'files': fileinfo}}, return_document=pymongo.collection.ReturnDocument.AFTER ) +def replace_file(cont_name, _id, existing_fileinfo, fileinfo): + + while True: + + # Update the version and add existing file to "other_versions" list + fileinfo['version'] = existing_fileinfo['version']+1 + fileinfo['other_versions'] = existing_fileinfo.pop('other_versions', []) + fileinfo['other_versions'].append(existing_fileinfo) + + # Only update if the existing file has not changed since last load + result = config.db[cont_name].replace_one( + {'_id': _id, 'files': {'$elemMatch': { + 'name': fileinfo['name'], + 'modified': existing_fileinfo['modified'], + 'version': existing_fileinfo['version']}}}, + fileinfo + ) + + if not result.modified_count: + # The existing file must have changed, grab update from db + c = config.db[cont_name].find_one({'_id': _id, 'files.name': fileinfo['name']}) + for f in c.get('files'): + if f['name'] == fileinfo['name']: + existing_fileinfo = f + break + else: + # The file wasn't found, must have been removed + fileinfo['version'] = 1 + return add_file(cont_name, _id, fileinfo) + + return fileinfo + + + def remove_file(cont_name, _id, filename): - config.db[cont_name].find_one_and_update( + return config.db[cont_name].find_one_and_update( {'_id': _id, 'files.name': filename}, - {'$pull': {'files': {'name': filename}}} + {'$pull': {'files': {'name': filename}}}, + return_document=pymongo.collection.ReturnDocument.AFTER ) diff --git a/api/upload.py b/api/upload.py index 617a30355..476943380 100644 --- a/api/upload.py +++ b/api/upload.py @@ -122,7 +122,8 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None # Stands in for a dedicated object... for now. file_attrs = { 'name': field.filename, - 'modified': field.modified, # + 'created': timestamp, + 'modified': timestamp, 'size': field.size, 'mimetype': field.mimetype, 'hash': field.hash, diff --git a/bin/database.py b/bin/database.py index 375c4b76f..9ff6da52f 100755 --- a/bin/database.py +++ b/bin/database.py @@ -22,7 +22,7 @@ from api.types import Origin from api.jobs import batch -CURRENT_DATABASE_VERSION = 42 # An int that is bumped when a new schema change is made +CURRENT_DATABASE_VERSION = 43 # An int that is bumped when a new schema change is made def get_db_version(): @@ -1360,6 +1360,26 @@ def upgrade_to_42(): process_cursor(cursor, upgrade_to_42_closure, context=cont_name) +def upgrade_to_43_closure(cont, cont_name): + """ + Update all files in a collection that do not have a version + """ + files = cont.get('files', []) + for f in files: + if 'version' not in f: + f['version'] = 1 + config.db[cont_name].update_one({'_id': cont['_id']}, {'$set': {'files': files}}) + return True + + +def upgrade_to_43(): + """ + Add initial file versioning to all files + """ + for cont_name in ['projects', 'sessions', 'acquisitions', 'analyses', 'collections']: + config.db[cont_name].find({'files': { '$gt': [] }, 'files.version': {'$exists': False }}) + + ### ### BEGIN RESERVED UPGRADE SECTION ### From ebcddc296dc720f6f3883f6ea6b763369bfeae9d Mon Sep 17 00:00:00 2001 From: Megan Henning Date: Wed, 28 Feb 2018 13:18:46 -0600 Subject: [PATCH 4/5] Fix db update script --- bin/database.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bin/database.py b/bin/database.py index 9ff6da52f..835a86eb0 100755 --- a/bin/database.py +++ b/bin/database.py @@ -1377,7 +1377,8 @@ def upgrade_to_43(): Add initial file versioning to all files """ for cont_name in ['projects', 'sessions', 'acquisitions', 'analyses', 'collections']: - config.db[cont_name].find({'files': { '$gt': [] }, 'files.version': {'$exists': False }}) + cursor = config.db[cont_name].find({'files': { '$gt': [] }, 'files.version': {'$exists': False }}) + process_cursor(cursor, upgrade_to_43_closure, context=cont_name) ### From c80c9ce866c165ee8d6bf45b1a1b4dc905c711f0 Mon Sep 17 00:00:00 2001 From: Megan Henning Date: Thu, 1 Mar 2018 15:38:12 -0600 Subject: [PATCH 5/5] Small bug fixes --- api/jobs/rules.py | 2 +- api/upload.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/api/jobs/rules.py b/api/jobs/rules.py index 4fa4335e3..697aed600 100644 --- a/api/jobs/rules.py +++ b/api/jobs/rules.py @@ -223,7 +223,7 @@ def create_jobs(db, container_before, container_after, container_type, replaced_ jobs_before, jobs_after, potential_jobs = [], [], [] files_before = container_before.get('files', []) - files_after = container_after['files'] # It should always have at least one file after + files_after = container_after.get('files', []) for f in files_before: jobs_before.extend(create_potential_jobs(db, container_before, container_type, f)) diff --git a/api/upload.py b/api/upload.py index 476943380..67c996b67 100644 --- a/api/upload.py +++ b/api/upload.py @@ -128,6 +128,7 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None 'mimetype': field.mimetype, 'hash': field.hash, 'origin': origin, + 'version': 1, 'type': None, 'modality': None,