From 03784dff0329a37470033355321b38851a90e0b3 Mon Sep 17 00:00:00 2001 From: Ambrus Simon Date: Mon, 19 Mar 2018 17:54:53 +0100 Subject: [PATCH] Fix ticket-based job state transition for analysis jobs --- api/placer.py | 75 +++++----- tests/integration_tests/python/test_jobs.py | 150 ++++++++++++++++---- 2 files changed, 163 insertions(+), 62 deletions(-) diff --git a/api/placer.py b/api/placer.py index 9be774485..0f2d7732f 100644 --- a/api/placer.py +++ b/api/placer.py @@ -331,6 +331,8 @@ def finalize(self): job_ticket = JobTicket.get(self.context.get('job_ticket_id')) job = Job.get(job_ticket['job']) success = job_ticket['success'] + elif self.context.get('job_id'): + job = Job.get(self.context.get('job_id')) if self.metadata is not None: bid = bson.ObjectId(self.id_) @@ -350,23 +352,11 @@ def finalize(self): hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type) if job_ticket is not None: - if success: - Queue.mutate(job, { - 'state': 'complete', - 'profile': { - 'elapsed': job_ticket['elapsed'] - } - }) - else: - Queue.mutate(job, { - 'state': 'failed', - 'profile': { - 'elapsed': job_ticket['elapsed'] - } - }) + Queue.mutate(job, {'state': 'complete' if success else 'failed', + 'profile': {'elapsed': job_ticket['elapsed']}}) + job = Job.get(job.id_) - if self.context.get('job_id'): - job = Job.get(self.context.get('job_id')) + if job is not None: job.saved_files = [f['name'] for f in self.saved] job.produced_metadata = self.metadata job.save() @@ -725,8 +715,7 @@ def finalize(self): class AnalysisJobPlacer(Placer): def check(self): - if self.id_ is None: - raise Exception('Must specify a target analysis') + self.requireTarget() # Check that required state exists if self.context.get('job_id'): @@ -743,27 +732,45 @@ def process_file_field(self, field, file_attrs): file_attrs.update(file_md) break + if self.context.get('job_ticket_id'): + job_ticket = JobTicket.get(self.context.get('job_ticket_id')) + + if not job_ticket['success']: + file_attrs['from_failed_job'] = True + file_attrs['created'] = file_attrs['modified'] self.save_file(field) self.saved.append(file_attrs) def finalize(self): - # Search the sessions table for analysis, replace file field - if self.saved: - q = {'_id': self.id_} - u = {'$push': {'files': {'$each': self.saved}}} - job_id = self.context.get('job_id') - if job_id: - # If the original job failed, update the analysis with the job that succeeded - u['$set'] = {'job': job_id} - - # Update the job with saved files list - job = Job.get(job_id) - job.saved_files = [f['name'] for f in self.saved] - job.save() - - config.db.analyses.update_one(q, u) - return self.saved + job = None + job_ticket = None + success = True + + if self.context.get('job_ticket_id'): + job_ticket = JobTicket.get(self.context.get('job_ticket_id')) + job = Job.get(job_ticket['job']) + success = job_ticket['success'] + elif self.context.get('job_id'): + job = Job.get(self.context.get('job_id')) + + # Replace analysis files (and job in case it's re-run) + query = {'_id': self.id_} + update = {'$set': {'files': self.saved}} + if job is not None: + update['$set']['job'] = job.id_ + config.db.analyses.update_one(query, update) + + if job_ticket is not None: + Queue.mutate(job, {'state': 'complete' if success else 'failed', + 'profile': {'elapsed': job_ticket['elapsed']}}) + job = Job.get(job.id_) + + if job is not None: + job.saved_files = [f['name'] for f in self.saved] + job.save() + + return self.saved class GearPlacer(Placer): diff --git a/tests/integration_tests/python/test_jobs.py b/tests/integration_tests/python/test_jobs.py index ee020eda9..3fc606306 100644 --- a/tests/integration_tests/python/test_jobs.py +++ b/tests/integration_tests/python/test_jobs.py @@ -362,7 +362,8 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_ project = data_builder.create_project() session = data_builder.create_session() acquisition = data_builder.create_acquisition() - assert as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip')).ok + r = as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip')) + assert r.ok # create rule for text files r = as_admin.post('/projects/' + project + '/rules', json={ @@ -391,8 +392,7 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_ }) assert r.ok job = r.json()['_id'] - - api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set':{'state': 'running'}}) + api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}}) # prepare completion (send success status before engine upload) r = as_drone.post('/jobs/' + job + '/prepare-complete', json={'success': False, 'elapsed': -1}) @@ -403,36 +403,36 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_ assert job_ticket['success'] == False # engine upload - metadata = { - 'project':{ - 'label': 'engine project', - 'info': {'test': 'p'} - }, - 'session':{ - 'label': 'engine session', - 'subject': {'code': 'engine subject'}, - 'info': {'test': 's'} - }, - 'acquisition':{ - 'label': 'engine acquisition', - 'timestamp': '2016-06-20T21:57:36+00:00', - 'info': {'test': 'a'}, - 'files': [{ - 'name': 'result.txt', - 'type': 'text', - 'info': {'test': 'f0'} - }] - } - } - - api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set':{'state': 'running'}}) - r = as_drone.post('/engine', params={'level': 'acquisition', 'id': acquisition, 'job': job, 'job_ticket': job_ticket['_id']}, - files=file_form('result.txt', meta=metadata) + files=file_form('result.txt', meta={ + 'project': { + 'label': 'engine project', + 'info': {'test': 'p'} + }, + 'session': { + 'label': 'engine session', + 'subject': {'code': 'engine subject'}, + 'info': {'test': 's'} + }, + 'acquisition': { + 'label': 'engine acquisition', + 'timestamp': '2016-06-20T21:57:36+00:00', + 'info': {'test': 'a'}, + 'files': [{ + 'name': 'result.txt', + 'type': 'text', + 'info': {'test': 'f0'} + }] + } + }) ) assert r.ok + # verify job was transitioned to failed state + job_doc = as_admin.get('/jobs/' + job).json() + assert job_doc['state'] == 'failed' + # verify metadata wasn't applied acq = as_admin.get('/acquisitions/' + acquisition).json() assert 'test' not in acq.get('info', {}) @@ -471,6 +471,100 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_ jobs = [j for j in api_db.jobs.find({'gear_id': gear2})] assert len(jobs) == 1 + +def test_job_state_transition_from_ticket(data_builder, default_payload, as_admin, as_drone, api_db, file_form): + # create gear + gear_doc = default_payload['gear']['gear'] + gear_doc['inputs'] = {'dicom': {'base': 'file'}} + gear = data_builder.create_gear(gear=gear_doc) + + # create acq with file (for input) + acquisition = data_builder.create_acquisition() + r = as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip')) + assert r.ok + + # create job + r = as_admin.post('/jobs/add', json={ + 'gear_id': gear, + 'config': {}, + 'inputs': {'dicom': {'type': 'acquisition', 'id': acquisition, 'name': 'test.zip'}}, + 'destination': {'type': 'acquisition', 'id': acquisition} + }) + assert r.ok + job = r.json()['_id'] + api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}}) + + # prepare completion (send success status before engine upload) + r = as_drone.post('/jobs/' + job + '/prepare-complete', json={'success': True, 'elapsed': 3}) + assert r.ok + job_ticket = r.json()['ticket'] + + # engine upload (should trigger state transition based on ticket) + r = as_drone.post('/engine', + params={'level': 'acquisition', 'id': acquisition, 'job': job, 'job_ticket': job_ticket}, + files=file_form('result.txt', meta={ + 'acquisition': {'files': [{'name': 'result.txt', 'type': 'text'}]} + }) + ) + assert r.ok + + # verify job was transitioned to complete state + job_doc = as_admin.get('/jobs/' + job).json() + assert job_doc['state'] == 'complete' + + # test with success: False + api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}}) + api_db.job_tickets.update_one({'_id': bson.ObjectId(job_ticket)}, {'$set': {'success': False}}) + r = as_drone.post('/engine', + params={'level': 'acquisition', 'id': acquisition, 'job': job, 'job_ticket': job_ticket}, + files=file_form('result.txt', meta={ + 'acquisition': {'files': [{'name': 'result.txt', 'type': 'text'}]} + }) + ) + assert r.ok + job_doc = as_admin.get('/jobs/' + job).json() + assert job_doc['state'] == 'failed' + + # create session, analysis and job + session = data_builder.create_session() + r = as_admin.post('/sessions/' + session + '/analyses', json={ + 'label': 'online', + 'job': {'gear_id': gear, + 'inputs': {'dicom': {'type': 'acquisition', 'id': acquisition, 'name': 'test.zip'}}} + }) + assert r.ok + analysis = r.json()['_id'] + + r = as_admin.get('/analyses/' + analysis) + assert r.ok + job = r.json().get('job') + api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}}) + + # prepare completion (send success status before engine upload) + r = as_drone.post('/jobs/' + job + '/prepare-complete', json={'success': True, 'elapsed': 3}) + assert r.ok + job_ticket = r.json()['ticket'] + + r = as_drone.post('/engine', + params={'level': 'analysis', 'id': analysis, 'job': job, 'job_ticket': job_ticket}, + files=file_form('result.txt', meta={'type': 'text'})) + assert r.ok + + # verify job was transitioned to complete state + job_doc = as_admin.get('/jobs/' + job).json() + assert job_doc['state'] == 'complete' + + # test with success: False + api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set': {'state': 'running'}}) + api_db.job_tickets.update_one({'_id': bson.ObjectId(job_ticket)}, {'$set': {'success': False}}) + r = as_drone.post('/engine', + params={'level': 'analysis', 'id': analysis, 'job': job, 'job_ticket': job_ticket}, + files=file_form('result.txt', meta={'type': 'text'})) + assert r.ok + job_doc = as_admin.get('/jobs/' + job).json() + assert job_doc['state'] == 'failed' + + def test_analysis_job_creation_errors(data_builder, default_payload, as_admin, file_form): project = data_builder.create_project() session = data_builder.create_session()