Skip to content

Commit

Permalink
Fix ticket-based job state transition for analysis jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
ambrussimon committed Mar 20, 2018
1 parent 5da2763 commit 03784df
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 62 deletions.
75 changes: 41 additions & 34 deletions api/placer.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ def finalize(self):
job_ticket = JobTicket.get(self.context.get('job_ticket_id'))
job = Job.get(job_ticket['job'])
success = job_ticket['success']
elif self.context.get('job_id'):
job = Job.get(self.context.get('job_id'))

if self.metadata is not None:
bid = bson.ObjectId(self.id_)
Expand All @@ -350,23 +352,11 @@ def finalize(self):
hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type)

if job_ticket is not None:
if success:
Queue.mutate(job, {
'state': 'complete',
'profile': {
'elapsed': job_ticket['elapsed']
}
})
else:
Queue.mutate(job, {
'state': 'failed',
'profile': {
'elapsed': job_ticket['elapsed']
}
})
Queue.mutate(job, {'state': 'complete' if success else 'failed',
'profile': {'elapsed': job_ticket['elapsed']}})
job = Job.get(job.id_)

if self.context.get('job_id'):
job = Job.get(self.context.get('job_id'))
if job is not None:
job.saved_files = [f['name'] for f in self.saved]
job.produced_metadata = self.metadata
job.save()
Expand Down Expand Up @@ -725,8 +715,7 @@ def finalize(self):

class AnalysisJobPlacer(Placer):
def check(self):
if self.id_ is None:
raise Exception('Must specify a target analysis')
self.requireTarget()

# Check that required state exists
if self.context.get('job_id'):
Expand All @@ -743,27 +732,45 @@ def process_file_field(self, field, file_attrs):
file_attrs.update(file_md)
break

if self.context.get('job_ticket_id'):
job_ticket = JobTicket.get(self.context.get('job_ticket_id'))

if not job_ticket['success']:
file_attrs['from_failed_job'] = True

file_attrs['created'] = file_attrs['modified']
self.save_file(field)
self.saved.append(file_attrs)

def finalize(self):
# Search the sessions table for analysis, replace file field
if self.saved:
q = {'_id': self.id_}
u = {'$push': {'files': {'$each': self.saved}}}
job_id = self.context.get('job_id')
if job_id:
# If the original job failed, update the analysis with the job that succeeded
u['$set'] = {'job': job_id}

# Update the job with saved files list
job = Job.get(job_id)
job.saved_files = [f['name'] for f in self.saved]
job.save()

config.db.analyses.update_one(q, u)
return self.saved
job = None
job_ticket = None
success = True

if self.context.get('job_ticket_id'):
job_ticket = JobTicket.get(self.context.get('job_ticket_id'))
job = Job.get(job_ticket['job'])
success = job_ticket['success']
elif self.context.get('job_id'):
job = Job.get(self.context.get('job_id'))

# Replace analysis files (and job in case it's re-run)
query = {'_id': self.id_}
update = {'$set': {'files': self.saved}}
if job is not None:
update['$set']['job'] = job.id_
config.db.analyses.update_one(query, update)

if job_ticket is not None:
Queue.mutate(job, {'state': 'complete' if success else 'failed',
'profile': {'elapsed': job_ticket['elapsed']}})
job = Job.get(job.id_)

if job is not None:
job.saved_files = [f['name'] for f in self.saved]
job.save()

return self.saved


class GearPlacer(Placer):
Expand Down
150 changes: 122 additions & 28 deletions tests/integration_tests/python/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
project = data_builder.create_project()
session = data_builder.create_session()
acquisition = data_builder.create_acquisition()
assert as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip')).ok
r = as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip'))
assert r.ok

# create rule for text files
r = as_admin.post('/projects/' + project + '/rules', json={
Expand Down Expand Up @@ -391,8 +392,7 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
})
assert r.ok
job = r.json()['_id']

api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set':{'state': 'running'}})
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})

# prepare completion (send success status before engine upload)
r = as_drone.post('/jobs/' + job + '/prepare-complete', json={'success': False, 'elapsed': -1})
Expand All @@ -403,36 +403,36 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
assert job_ticket['success'] == False

# engine upload
metadata = {
'project':{
'label': 'engine project',
'info': {'test': 'p'}
},
'session':{
'label': 'engine session',
'subject': {'code': 'engine subject'},
'info': {'test': 's'}
},
'acquisition':{
'label': 'engine acquisition',
'timestamp': '2016-06-20T21:57:36+00:00',
'info': {'test': 'a'},
'files': [{
'name': 'result.txt',
'type': 'text',
'info': {'test': 'f0'}
}]
}
}

api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set':{'state': 'running'}})

r = as_drone.post('/engine',
params={'level': 'acquisition', 'id': acquisition, 'job': job, 'job_ticket': job_ticket['_id']},
files=file_form('result.txt', meta=metadata)
files=file_form('result.txt', meta={
'project': {
'label': 'engine project',
'info': {'test': 'p'}
},
'session': {
'label': 'engine session',
'subject': {'code': 'engine subject'},
'info': {'test': 's'}
},
'acquisition': {
'label': 'engine acquisition',
'timestamp': '2016-06-20T21:57:36+00:00',
'info': {'test': 'a'},
'files': [{
'name': 'result.txt',
'type': 'text',
'info': {'test': 'f0'}
}]
}
})
)
assert r.ok

# verify job was transitioned to failed state
job_doc = as_admin.get('/jobs/' + job).json()
assert job_doc['state'] == 'failed'

# verify metadata wasn't applied
acq = as_admin.get('/acquisitions/' + acquisition).json()
assert 'test' not in acq.get('info', {})
Expand Down Expand Up @@ -471,6 +471,100 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
jobs = [j for j in api_db.jobs.find({'gear_id': gear2})]
assert len(jobs) == 1


def test_job_state_transition_from_ticket(data_builder, default_payload, as_admin, as_drone, api_db, file_form):
# create gear
gear_doc = default_payload['gear']['gear']
gear_doc['inputs'] = {'dicom': {'base': 'file'}}
gear = data_builder.create_gear(gear=gear_doc)

# create acq with file (for input)
acquisition = data_builder.create_acquisition()
r = as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip'))
assert r.ok

# create job
r = as_admin.post('/jobs/add', json={
'gear_id': gear,
'config': {},
'inputs': {'dicom': {'type': 'acquisition', 'id': acquisition, 'name': 'test.zip'}},
'destination': {'type': 'acquisition', 'id': acquisition}
})
assert r.ok
job = r.json()['_id']
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})

# prepare completion (send success status before engine upload)
r = as_drone.post('/jobs/' + job + '/prepare-complete', json={'success': True, 'elapsed': 3})
assert r.ok
job_ticket = r.json()['ticket']

# engine upload (should trigger state transition based on ticket)
r = as_drone.post('/engine',
params={'level': 'acquisition', 'id': acquisition, 'job': job, 'job_ticket': job_ticket},
files=file_form('result.txt', meta={
'acquisition': {'files': [{'name': 'result.txt', 'type': 'text'}]}
})
)
assert r.ok

# verify job was transitioned to complete state
job_doc = as_admin.get('/jobs/' + job).json()
assert job_doc['state'] == 'complete'

# test with success: False
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})
api_db.job_tickets.update_one({'_id': bson.ObjectId(job_ticket)}, {'$set': {'success': False}})
r = as_drone.post('/engine',
params={'level': 'acquisition', 'id': acquisition, 'job': job, 'job_ticket': job_ticket},
files=file_form('result.txt', meta={
'acquisition': {'files': [{'name': 'result.txt', 'type': 'text'}]}
})
)
assert r.ok
job_doc = as_admin.get('/jobs/' + job).json()
assert job_doc['state'] == 'failed'

# create session, analysis and job
session = data_builder.create_session()
r = as_admin.post('/sessions/' + session + '/analyses', json={
'label': 'online',
'job': {'gear_id': gear,
'inputs': {'dicom': {'type': 'acquisition', 'id': acquisition, 'name': 'test.zip'}}}
})
assert r.ok
analysis = r.json()['_id']

r = as_admin.get('/analyses/' + analysis)
assert r.ok
job = r.json().get('job')
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})

# prepare completion (send success status before engine upload)
r = as_drone.post('/jobs/' + job + '/prepare-complete', json={'success': True, 'elapsed': 3})
assert r.ok
job_ticket = r.json()['ticket']

r = as_drone.post('/engine',
params={'level': 'analysis', 'id': analysis, 'job': job, 'job_ticket': job_ticket},
files=file_form('result.txt', meta={'type': 'text'}))
assert r.ok

# verify job was transitioned to complete state
job_doc = as_admin.get('/jobs/' + job).json()
assert job_doc['state'] == 'complete'

# test with success: False
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})
api_db.job_tickets.update_one({'_id': bson.ObjectId(job_ticket)}, {'$set': {'success': False}})
r = as_drone.post('/engine',
params={'level': 'analysis', 'id': analysis, 'job': job, 'job_ticket': job_ticket},
files=file_form('result.txt', meta={'type': 'text'}))
assert r.ok
job_doc = as_admin.get('/jobs/' + job).json()
assert job_doc['state'] == 'failed'


def test_analysis_job_creation_errors(data_builder, default_payload, as_admin, file_form):
project = data_builder.create_project()
session = data_builder.create_session()
Expand Down

0 comments on commit 03784df

Please sign in to comment.