Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ticket-based job state transition for analysis jobs #1105

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 41 additions & 34 deletions api/placer.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ def finalize(self):
job_ticket = JobTicket.get(self.context.get('job_ticket_id'))
job = Job.get(job_ticket['job'])
success = job_ticket['success']
elif self.context.get('job_id'):
job = Job.get(self.context.get('job_id'))

if self.metadata is not None:
bid = bson.ObjectId(self.id_)
Expand All @@ -350,23 +352,11 @@ def finalize(self):
hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type)

if job_ticket is not None:
if success:
Queue.mutate(job, {
'state': 'complete',
'profile': {
'elapsed': job_ticket['elapsed']
}
})
else:
Queue.mutate(job, {
'state': 'failed',
'profile': {
'elapsed': job_ticket['elapsed']
}
})
Queue.mutate(job, {'state': 'complete' if success else 'failed',
'profile': {'elapsed': job_ticket['elapsed']}})
job = Job.get(job.id_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will raise an APINotFoundException, not return None.

It looks like we call it above too so I'm curious why you're checking for None below. Race condition that it gets deleted between the Queue.mutate call and save?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queue.mutate does not update the loaded job's attributes (except partially in this special case: https://github.com/scitran/core/blob/master/api/jobs/queue.py#L82)
Therefore calling job.save() once more (when setting saved_files below) would try to revert the state transition.
Was thinking of updating mutate instead, but wanted to limit the scope of the change and the possible errors introduced.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on that, I ran into that edge case myself earlier.


if self.context.get('job_id'):
job = Job.get(self.context.get('job_id'))
if job is not None:
job.saved_files = [f['name'] for f in self.saved]
job.produced_metadata = self.metadata
job.save()
Expand Down Expand Up @@ -725,8 +715,7 @@ def finalize(self):

class AnalysisJobPlacer(Placer):
def check(self):
if self.id_ is None:
raise Exception('Must specify a target analysis')
self.requireTarget()

# Check that required state exists
if self.context.get('job_id'):
Expand All @@ -743,27 +732,45 @@ def process_file_field(self, field, file_attrs):
file_attrs.update(file_md)
break

if self.context.get('job_ticket_id'):
job_ticket = JobTicket.get(self.context.get('job_ticket_id'))

if not job_ticket['success']:
file_attrs['from_failed_job'] = True

file_attrs['created'] = file_attrs['modified']
self.save_file(field)
self.saved.append(file_attrs)

def finalize(self):
# Search the sessions table for analysis, replace file field
if self.saved:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sanity check: you're sure we don't need the self.saved conditional? I'm not super familiar here.

q = {'_id': self.id_}
u = {'$push': {'files': {'$each': self.saved}}}
job_id = self.context.get('job_id')
if job_id:
# If the original job failed, update the analysis with the job that succeeded
u['$set'] = {'job': job_id}

# Update the job with saved files list
job = Job.get(job_id)
job.saved_files = [f['name'] for f in self.saved]
job.save()

config.db.analyses.update_one(q, u)
return self.saved
job = None
job_ticket = None
success = True

if self.context.get('job_ticket_id'):
job_ticket = JobTicket.get(self.context.get('job_ticket_id'))
job = Job.get(job_ticket['job'])
success = job_ticket['success']
elif self.context.get('job_id'):
job = Job.get(self.context.get('job_id'))

# Replace analysis files (and job in case it's re-run)
query = {'_id': self.id_}
update = {'$set': {'files': self.saved}}
if job is not None:
update['$set']['job'] = job.id_
config.db.analyses.update_one(query, update)

if job_ticket is not None:
Queue.mutate(job, {'state': 'complete' if success else 'failed',
'profile': {'elapsed': job_ticket['elapsed']}})
job = Job.get(job.id_)

if job is not None:
job.saved_files = [f['name'] for f in self.saved]
job.save()

return self.saved


class GearPlacer(Placer):
Expand Down
150 changes: 122 additions & 28 deletions tests/integration_tests/python/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
project = data_builder.create_project()
session = data_builder.create_session()
acquisition = data_builder.create_acquisition()
assert as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip')).ok
r = as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip'))
assert r.ok

# create rule for text files
r = as_admin.post('/projects/' + project + '/rules', json={
Expand Down Expand Up @@ -391,8 +392,7 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
})
assert r.ok
job = r.json()['_id']

api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set':{'state': 'running'}})
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})

# prepare completion (send success status before engine upload)
r = as_drone.post('/jobs/' + job + '/prepare-complete', json={'success': False, 'elapsed': -1})
Expand All @@ -403,36 +403,36 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
assert job_ticket['success'] == False

# engine upload
metadata = {
'project':{
'label': 'engine project',
'info': {'test': 'p'}
},
'session':{
'label': 'engine session',
'subject': {'code': 'engine subject'},
'info': {'test': 's'}
},
'acquisition':{
'label': 'engine acquisition',
'timestamp': '2016-06-20T21:57:36+00:00',
'info': {'test': 'a'},
'files': [{
'name': 'result.txt',
'type': 'text',
'info': {'test': 'f0'}
}]
}
}

api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set':{'state': 'running'}})

r = as_drone.post('/engine',
params={'level': 'acquisition', 'id': acquisition, 'job': job, 'job_ticket': job_ticket['_id']},
files=file_form('result.txt', meta=metadata)
files=file_form('result.txt', meta={
'project': {
'label': 'engine project',
'info': {'test': 'p'}
},
'session': {
'label': 'engine session',
'subject': {'code': 'engine subject'},
'info': {'test': 's'}
},
'acquisition': {
'label': 'engine acquisition',
'timestamp': '2016-06-20T21:57:36+00:00',
'info': {'test': 'a'},
'files': [{
'name': 'result.txt',
'type': 'text',
'info': {'test': 'f0'}
}]
}
})
)
assert r.ok

# verify job was transitioned to failed state
job_doc = as_admin.get('/jobs/' + job).json()
assert job_doc['state'] == 'failed'

# verify metadata wasn't applied
acq = as_admin.get('/acquisitions/' + acquisition).json()
assert 'test' not in acq.get('info', {})
Expand Down Expand Up @@ -471,6 +471,100 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
jobs = [j for j in api_db.jobs.find({'gear_id': gear2})]
assert len(jobs) == 1


def test_job_state_transition_from_ticket(data_builder, default_payload, as_admin, as_drone, api_db, file_form):
# create gear
gear_doc = default_payload['gear']['gear']
gear_doc['inputs'] = {'dicom': {'base': 'file'}}
gear = data_builder.create_gear(gear=gear_doc)

# create acq with file (for input)
acquisition = data_builder.create_acquisition()
r = as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip'))
assert r.ok

# create job
r = as_admin.post('/jobs/add', json={
'gear_id': gear,
'config': {},
'inputs': {'dicom': {'type': 'acquisition', 'id': acquisition, 'name': 'test.zip'}},
'destination': {'type': 'acquisition', 'id': acquisition}
})
assert r.ok
job = r.json()['_id']
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})

# prepare completion (send success status before engine upload)
r = as_drone.post('/jobs/' + job + '/prepare-complete', json={'success': True, 'elapsed': 3})
assert r.ok
job_ticket = r.json()['ticket']

# engine upload (should trigger state transition based on ticket)
r = as_drone.post('/engine',
params={'level': 'acquisition', 'id': acquisition, 'job': job, 'job_ticket': job_ticket},
files=file_form('result.txt', meta={
'acquisition': {'files': [{'name': 'result.txt', 'type': 'text'}]}
})
)
assert r.ok

# verify job was transitioned to complete state
job_doc = as_admin.get('/jobs/' + job).json()
assert job_doc['state'] == 'complete'

# test with success: False
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})
api_db.job_tickets.update_one({'_id': bson.ObjectId(job_ticket)}, {'$set': {'success': False}})
r = as_drone.post('/engine',
params={'level': 'acquisition', 'id': acquisition, 'job': job, 'job_ticket': job_ticket},
files=file_form('result.txt', meta={
'acquisition': {'files': [{'name': 'result.txt', 'type': 'text'}]}
})
)
assert r.ok
job_doc = as_admin.get('/jobs/' + job).json()
assert job_doc['state'] == 'failed'

# create session, analysis and job
session = data_builder.create_session()
r = as_admin.post('/sessions/' + session + '/analyses', json={
'label': 'online',
'job': {'gear_id': gear,
'inputs': {'dicom': {'type': 'acquisition', 'id': acquisition, 'name': 'test.zip'}}}
})
assert r.ok
analysis = r.json()['_id']

r = as_admin.get('/analyses/' + analysis)
assert r.ok
job = r.json().get('job')
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})

# prepare completion (send success status before engine upload)
r = as_drone.post('/jobs/' + job + '/prepare-complete', json={'success': True, 'elapsed': 3})
assert r.ok
job_ticket = r.json()['ticket']

r = as_drone.post('/engine',
params={'level': 'analysis', 'id': analysis, 'job': job, 'job_ticket': job_ticket},
files=file_form('result.txt', meta={'type': 'text'}))
assert r.ok

# verify job was transitioned to complete state
job_doc = as_admin.get('/jobs/' + job).json()
assert job_doc['state'] == 'complete'

# test with success: False
api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}})
api_db.job_tickets.update_one({'_id': bson.ObjectId(job_ticket)}, {'$set': {'success': False}})
r = as_drone.post('/engine',
params={'level': 'analysis', 'id': analysis, 'job': job, 'job_ticket': job_ticket},
files=file_form('result.txt', meta={'type': 'text'}))
assert r.ok
job_doc = as_admin.get('/jobs/' + job).json()
assert job_doc['state'] == 'failed'


def test_analysis_job_creation_errors(data_builder, default_payload, as_admin, file_form):
project = data_builder.create_project()
session = data_builder.create_session()
Expand Down