Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly handle replacing of files, rerun jobs when file is replaced #1082

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 71 additions & 35 deletions api/dao/hierarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,66 +223,102 @@ def check_cont(cont, reqs):
return False
return True

def upsert_fileinfo(cont_name, _id, fileinfo):
def upsert_fileinfo(cont_name, _id, fileinfo, ignore_hash_replace=False):

cont_name = containerutil.pluralize(cont_name)
_id = bson.ObjectId(_id)

container_before = config.db[cont_name].find_one({'_id': _id})
container_after, file_before = None, None
container_after = None
saved_state = 'saved'


# Look to see if file with the same name already exists in the container
for f in container_before.get('files',[]):
# Fine file in result and set to file_after

# File already exists, respond accordingly
if f['name'] == fileinfo['name']:

# If the existing file is deleted, always replace (But this is not considered a "replaced" saved state)
if 'deleted' in f:
# Ugly hack: remove already existing file that has the 'deleted' tag
# This creates a gap in the delete functionality, ie. this file cannot be restored from this point on.
# Note that the previous file in storage will be unreferenced from the DB (unless CAS edge case...)
config.db[cont_name].find_one_and_update(
{'_id': _id, 'files.name': fileinfo['name']},
{'$pull': {'files': {'name': fileinfo['name']}}}
)
remove_file(cont_name, _id, fileinfo['name'])
container_after = add_file(cont_name, _id, fileinfo)
saved_state = 'saved'


# Files from a failed job should never replaced existing files that are "accepted" (unless they are deleted)
elif fileinfo.get('from_failed_job') and not f.get('from_failed_job'):
saved_state = 'ignored'

# The file object is the same as an existing file and the caller has chosen to ignore in this situation
elif f.get('hash') == fileinfo['hash'] and ignore_hash_replace:
saved_state = 'ignored'

# No special circumstances, proceed with a replace
else:
file_before = f
container_after = replace_file(cont_name, _id, f, fileinfo)
saved_state = 'replaced'

break

if file_before is None:
fileinfo['created'] = fileinfo['modified']
container_after = add_fileinfo(cont_name, _id, fileinfo)

else:
container_after = update_fileinfo(cont_name, _id, fileinfo)

return container_before, container_after
# File was not already in container, add as normal
container_after = add_file(cont_name, _id, fileinfo)

def update_fileinfo(cont_name, _id, fileinfo):
if fileinfo.get('size') is not None:
if type(fileinfo['size']) != int:
log.warn('Fileinfo passed with non-integer size')
fileinfo['size'] = int(fileinfo['size'])
return container_before, container_after, saved_state

update_set = {'files.$.modified': datetime.datetime.utcnow()}
# in this method, we are overriding an existing file.
# update_set allows to update all the fileinfo like size, hash, etc.
for k,v in fileinfo.iteritems():
update_set['files.$.' + k] = v

def add_file(cont_name, _id, fileinfo):
return config.db[cont_name].find_one_and_update(
{'_id': _id, 'files.name': fileinfo['name']},
{'$set': update_set},
{'_id': _id},
{'$push': {'files': fileinfo}},
return_document=pymongo.collection.ReturnDocument.AFTER
)

def add_fileinfo(cont_name, _id, fileinfo):
if fileinfo.get('size') is not None:
if type(fileinfo['size']) != int:
log.warn('Fileinfo passed with non-integer size')
fileinfo['size'] = int(fileinfo['size'])
def replace_file(cont_name, _id, existing_fileinfo, fileinfo):

while True:

# Update the version and add existing file to "other_versions" list
fileinfo['version'] = existing_fileinfo['version']+1
fileinfo['other_versions'] = existing_fileinfo.pop('other_versions', [])
fileinfo['other_versions'].append(existing_fileinfo)

# Only update if the existing file has not changed since last load
result = config.db[cont_name].replace_one(
{'_id': _id, 'files': {'$elemMatch': {
'name': fileinfo['name'],
'modified': existing_fileinfo['modified'],
'version': existing_fileinfo['version']}}},
fileinfo
)

if not result.modified_count:
# The existing file must have changed, grab update from db
c = config.db[cont_name].find_one({'_id': _id, 'files.name': fileinfo['name']})
for f in c.get('files'):
if f['name'] == fileinfo['name']:
existing_fileinfo = f
break
else:
# The file wasn't found, must have been removed
fileinfo['version'] = 1
return add_file(cont_name, _id, fileinfo)

return fileinfo



def remove_file(cont_name, _id, filename):
return config.db[cont_name].find_one_and_update(
{'_id': _id},
{'$push': {'files': fileinfo}},
{'_id': _id, 'files.name': filename},
{'$pull': {'files': {'name': filename}}},
return_document=pymongo.collection.ReturnDocument.AFTER
)


def _group_id_fuzzy_match(group_id, project_label):
existing_group_ids = [g['_id'] for g in config.db.groups.find(None, ['_id'])]
if group_id.lower() in existing_group_ids:
Expand Down
26 changes: 18 additions & 8 deletions api/jobs/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,22 @@ def create_potential_jobs(db, container, container_type, file_):

return potential_jobs

def create_jobs(db, container_before, container_after, container_type):
def create_jobs(db, container_before, container_after, container_type, replaced_files=None):
"""
Given a before and after set of file attributes, enqueue a list of jobs that would only be possible
after the changes.
Returns the algorithm names that were queued.
"""

# A list of FileContainerReferences that have been completely replaced
# Jobs with these as inputs should get enqueue even if they are in the jobs_before list
if not replaced_files:
replaced_files = []

jobs_before, jobs_after, potential_jobs = [], [], []

files_before = container_before.get('files', [])
files_after = container_after['files'] # It should always have at least one file after
files_after = container_after.get('files', [])

for f in files_before:
jobs_before.extend(create_potential_jobs(db, container_before, container_type, f))
Expand All @@ -229,13 +234,18 @@ def create_jobs(db, container_before, container_after, container_type):
# Using a uniqueness constraint, create a list of the set difference of jobs_after \ jobs_before
# (members of jobs_after that are not in jobs_before)
for ja in jobs_after:
new_job = True
for jb in jobs_before:
if ja['job'].intention_equals(jb['job']):
new_job = False
break # this job matched in both before and after, ignore
if new_job:

if set(replaced_files).intersection(set(ja['job'].inputs.itervalues())):
# one of the replaced files is an input
potential_jobs.append(ja)
else:
should_enqueue_job = True
for jb in jobs_before:
if ja['job'].intention_equals(jb['job']):
should_enqueue_job = False
break # this job matched in both before and after, ignore
if should_enqueue_job:
potential_jobs.append(ja)


spawned_jobs = []
Expand Down
50 changes: 25 additions & 25 deletions api/placer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def __init__(self, container_type, container, id_, metadata, timestamp, origin,
# A list of files that have been saved via save_file() usually returned by finalize()
self.saved = []

# A list of files that have been ignored by save_file() because a file with the same name and hash already existed
self.ignored = []


def check(self):
"""
Expand Down Expand Up @@ -91,11 +94,24 @@ def save_file(self, field=None, file_attrs=None):

# Update the DB
if file_attrs is not None:
container_before, self.container = hierarchy.upsert_fileinfo(self.container_type, self.id_, file_attrs)
container_before, self.container, saved_state = hierarchy.upsert_fileinfo(self.container_type, self.id_, file_attrs)

# If this file was ignored because an existing file with the same name and hash existed on this project,
# add the file to the ignored list and move on
if saved_state == 'ignored':
self.ignored.append(file_attrs)

else:
self.saved.append(file_attrs)

# create_jobs handles files that have been replaced differently
replaced_files = []
if saved_state == 'replaced':
replaced_files.append(containerutil.FileReference(self.container_type, self.id_, file_attrs['name']))

rules.create_jobs(config.db, container_before, self.container, self.container_type, replaced_files=replaced_files)


# Queue any jobs as a result of this upload, uploading to a gear will not make jobs though
if self.container_type != 'gear':
rules.create_jobs(config.db, container_before, self.container, self.container_type)

def recalc_session_compliance(self):
if self.container_type in ['session', 'acquisition'] and self.id_:
Expand All @@ -121,7 +137,7 @@ def process_file_field(self, field, file_attrs):
if self.metadata:
file_attrs.update(self.metadata)
self.save_file(field, file_attrs)
self.saved.append(file_attrs)


def finalize(self):
self.recalc_session_compliance()
Expand Down Expand Up @@ -181,21 +197,11 @@ def process_file_field(self, field, file_attrs):
r_metadata = target['metadata']
file_attrs.update(r_metadata)

if container.level != 'subject':
self.container_type = container.level
self.id_ = container.id_
self.container = container.container
self.save_file(field, file_attrs)
else:
if field is not None:
files.move_form_file_field_into_cas(field)
if file_attrs is not None:
container.upsert_file(file_attrs)

# # Queue any jobs as a result of this upload
# rules.create_jobs(config.db, self.container, self.container_type, info)

self.saved.append(file_attrs)
self.container_type = container.level
self.id_ = container.id_
self.container = container.container
self.save_file(field, file_attrs)

def finalize(self):
# Check that there is at least one file being uploaded
Expand Down Expand Up @@ -294,7 +300,6 @@ def process_file_field(self, field, file_attrs):
file_attrs['from_failed_job'] = True

self.save_file(field, file_attrs)
self.saved.append(file_attrs)

def finalize(self):
if self.metadata is not None:
Expand All @@ -305,7 +310,6 @@ def finalize(self):
for file_md in file_mds:
if file_md['name'] not in saved_file_names:
self.save_file(None, file_md) # save file_attrs update only
self.saved.append(file_md)

# Remove file metadata as it was already updated in process_file_field
for k in self.metadata.keys():
Expand Down Expand Up @@ -653,7 +657,6 @@ def check(self):

def process_file_field(self, field, file_attrs):
self.save_file(field)
self.saved.append(file_attrs)

def finalize(self):
# we are going to merge the "hard" infos from the processed upload
Expand Down Expand Up @@ -689,7 +692,6 @@ def process_file_field(self, field, file_attrs):
file_attrs['output'] = True
file_attrs['created'] = file_attrs['modified']
self.save_file(field)
self.saved.append(file_attrs)

def finalize(self):
# Search the sessions table for analysis, replace file field
Expand Down Expand Up @@ -720,9 +722,7 @@ def process_file_field(self, field, file_attrs):
self.metadata.update({'exchange': {'rootfs-hash': proper_hash,
'git-commit': 'local',
'rootfs-url': 'INVALID'}})
# self.metadata['hash'] = file_attrs.get('hash')
self.save_file(field)
self.saved.append(file_attrs)
self.saved.append(self.metadata)

def finalize(self):
Expand Down
4 changes: 3 additions & 1 deletion api/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,13 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None
# Stands in for a dedicated object... for now.
file_attrs = {
'name': field.filename,
'modified': field.modified, #
'created': timestamp,
'modified': timestamp,
'size': field.size,
'mimetype': field.mimetype,
'hash': field.hash,
'origin': origin,
'version': 1,

'type': None,
'modality': None,
Expand Down
23 changes: 22 additions & 1 deletion bin/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from api.types import Origin
from api.jobs import batch

CURRENT_DATABASE_VERSION = 42 # An int that is bumped when a new schema change is made
CURRENT_DATABASE_VERSION = 43 # An int that is bumped when a new schema change is made

def get_db_version():

Expand Down Expand Up @@ -1360,6 +1360,27 @@ def upgrade_to_42():
process_cursor(cursor, upgrade_to_42_closure, context=cont_name)


def upgrade_to_43_closure(cont, cont_name):
"""
Update all files in a collection that do not have a version
"""
files = cont.get('files', [])
for f in files:
if 'version' not in f:
f['version'] = 1
config.db[cont_name].update_one({'_id': cont['_id']}, {'$set': {'files': files}})
return True


def upgrade_to_43():
"""
Add initial file versioning to all files
"""
for cont_name in ['projects', 'sessions', 'acquisitions', 'analyses', 'collections']:
cursor = config.db[cont_name].find({'files': { '$gt': [] }, 'files.version': {'$exists': False }})
process_cursor(cursor, upgrade_to_43_closure, context=cont_name)


###
### BEGIN RESERVED UPGRADE SECTION
###
Expand Down