From 6b48d3ba17c1a0938a6b6363820f216a0b17a6d5 Mon Sep 17 00:00:00 2001 From: Nathaniel Kofalt Date: Wed, 31 Jan 2018 17:43:41 -0600 Subject: [PATCH 1/5] Revamp --- api/config.py | 2 +- api/jobs/handlers.py | 182 +++++++++++++------------------------------ api/jobs/jobs.py | 31 ++++++++ api/jobs/queue.py | 30 +++++-- api/placer.py | 16 ++-- api/util.py | 29 ++++++- 6 files changed, 144 insertions(+), 146 deletions(-) diff --git a/api/config.py b/api/config.py index 8f8838642..c2bbaf696 100644 --- a/api/config.py +++ b/api/config.py @@ -246,7 +246,7 @@ def initialize_db(): create_or_recreate_ttl_index('authtokens', 'timestamp', 2592000) create_or_recreate_ttl_index('uploads', 'timestamp', 60) create_or_recreate_ttl_index('downloads', 'timestamp', 60) - create_or_recreate_ttl_index('job_tickets', 'timestamp', 300) + create_or_recreate_ttl_index('job_tickets', 'timestamp', 3600) # IMPORTANT: this controls job orphan logic. Ref queue.py now = datetime.datetime.utcnow() db.groups.update_one({'_id': 'unknown'}, {'$setOnInsert': { 'created': now, 'modified': now, 'label': 'Unknown', 'permissions': []}}, upsert=True) diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py index 1895ad018..4b6c69908 100644 --- a/api/jobs/handlers.py +++ b/api/jobs/handlers.py @@ -8,24 +8,24 @@ from jsonschema import ValidationError from urlparse import urlparse +from . import batch +from .. import config from .. import upload from .. import util -from ..auth import require_drone, require_login, has_access +from ..auth import require_drone, require_login, require_admin, has_access +from ..auth.apikeys import JobApiKey from ..dao import hierarchy from ..dao.containerstorage import ProjectStorage, SessionStorage, AcquisitionStorage from ..dao.containerutil import ContainerReference, pluralize +from ..util import humanize_validation_error, set_for_download +from ..validators import validate_data, verify_payload_exists from ..web import base from ..web.encoder import pseudo_consistent_json_encode from ..web.errors import APIPermissionException, APINotFoundException, InputValidationException from ..web.request import AccessType -from .. import config -from . import batch -from ..validators import validate_data, verify_payload_exists - -from ..auth.apikeys import JobApiKey from .gears import validate_gear_config, get_gears, get_gear, get_invocation_schema, remove_gear, upsert_gear, suggest_container, get_gear_by_name, check_for_gear_insertion -from .jobs import Job, Logs +from .jobs import Job, JobTicket, Logs from .batch import check_state, update from .queue import Queue from .rules import create_jobs, validate_regexes @@ -35,12 +35,10 @@ class GearsHandler(base.RequestHandler): """Provide /gears API routes.""" + @require_login def get(self): """List all gears.""" - if self.public_request: - self.abort(403, 'Request requires login') - gears = get_gears() filters = self.request.GET.getall('filter') @@ -49,13 +47,9 @@ def get(self): return gears + @require_login def check(self): - """ - Check if a gear upload is likely to succeed. - """ - - if self.public_request: - self.abort(403, 'Request requires login') + """Check if a gear upload is likely to succeed.""" check_for_gear_insertion(self.request.json) return None @@ -63,27 +57,16 @@ def check(self): class GearHandler(base.RequestHandler): """Provide /gears/x API routes.""" + @require_login def get(self, _id): - """Detail a gear.""" - - if self.public_request: - self.abort(403, 'Request requires login') - return get_gear(_id) + @require_login def get_invocation(self, _id): + return get_invocation_schema(get_gear(_id)) - if self.public_request: - self.abort(403, 'Request requires login') - - gear = get_gear(_id) - return get_invocation_schema(gear) - + @require_login def suggest(self, _id, cont_name, cid): - - if self.public_request: - self.abort(403, 'Request requires login') - cr = ContainerReference(cont_name, cid) if not self.superuser_request: cr.check_access(self.uid, 'ro') @@ -91,13 +74,11 @@ def suggest(self, _id, cont_name, cid): gear = get_gear(_id) return suggest_container(gear, cont_name+'s', cid) + @require_admin def upload(self): # pragma: no cover - """Upload new gear tarball file""" - if not self.user_is_admin: - self.abort(403, 'Request requires admin') - r = upload.process_upload(self.request, upload.Strategy.gear, container_type='gear', origin=self.origin, metadata=self.request.headers.get('metadata')) gear_id = upsert_gear(r[1]) + config.db.gears.update_one({'_id': gear_id}, {'$set': { 'exchange.rootfs-url': '/api/gears/temp/' + str(gear_id)} }) @@ -110,40 +91,26 @@ def download(self, **kwargs): # pragma: no cover gear = get_gear(dl_id) hash_ = gear['exchange']['rootfs-hash'] filepath = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash('v0-' + hash_.replace(':', '-'))) - self.response.app_iter = open(filepath, 'rb') - # self.response.headers['Content-Length'] = str(gear['size']) # must be set after setting app_iter - self.response.headers['Content-Type'] = 'application/octet-stream' - self.response.headers['Content-Disposition'] = 'attachment; filename="gear.tar"' + stream = open(filepath, 'rb') + set_for_download(self.response, stream=stream, filename='gear.tar') + @require_admin def post(self, _id): - """Upsert an entire gear document.""" - if not self.superuser_request and not self.user_is_admin: - self.abort(403, 'Request requires admin') - - doc = self.request.json + payload = self.request.json - if _id != doc.get('gear', {}).get('name', ''): + if _id != payload.get('gear', {}).get('name', ''): self.abort(400, 'Name key must be present and match URL') try: - result = upsert_gear(self.request.json) + result = upsert_gear(payload) return { '_id': str(result) } except ValidationError as err: - key = "none" - if len(err.relative_path) > 0: - key = err.relative_path[0] - - message = err.message.replace("u'", "'") - - raise InputValidationException('Gear manifest does not match schema on key ' + key + ': ' + message) + raise InputValidationException(humanize_validation_error(err)) + @require_admin def delete(self, _id): - """Delete a gear. Generally not recommended.""" - if not self.superuser_request and not self.user_is_admin: - self.abort(403, 'Request requires admin') - return remove_gear(_id) class RulesHandler(base.RequestHandler): @@ -164,7 +131,6 @@ def get(self, cid): return config.db.project_rules.find({'project_id' : cid}, projection=projection) - @verify_payload_exists def post(self, cid): """Add a rule""" @@ -177,18 +143,18 @@ def post(self, cid): if not self.user_is_admin and not has_access(self.uid, project, 'admin'): raise APIPermissionException('Adding rules to a project can only be done by a project admin.') - doc = self.request.json + payload = self.request.json - validate_data(doc, 'rule-new.json', 'input', 'POST', optional=True) - validate_regexes(doc) + validate_data(payload, 'rule-new.json', 'input', 'POST', optional=True) + validate_regexes(payload) try: - get_gear_by_name(doc['alg']) + get_gear_by_name(payload['alg']) except APINotFoundException: - self.abort(400, 'Cannot find gear for alg {}, alg not valid'.format(doc['alg'])) + self.abort(400, 'Cannot find gear for alg {}, alg not valid'.format(payload['alg'])) - doc['project_id'] = cid + payload['project_id'] = cid - result = config.db.project_rules.insert_one(doc) + result = config.db.project_rules.insert_one(payload) return { '_id': result.inserted_id } class RuleHandler(base.RequestHandler): @@ -243,9 +209,6 @@ def put(self, cid, rid): doc.update(updates) config.db.project_rules.replace_one({'_id': bson.ObjectId(rid)}, doc) - return - - def delete(self, cid, rid): """Remove a rule""" @@ -261,14 +224,12 @@ def delete(self, cid, rid): result = config.db.project_rules.delete_one({'project_id' : cid, '_id': bson.ObjectId(rid)}) if result.deleted_count != 1: raise APINotFoundException('Rule not found.') - return class JobsHandler(base.RequestHandler): - """Provide /jobs API routes.""" + + @require_admin def get(self): # pragma: no cover (no route) """List all jobs.""" - if not self.superuser_request and not self.user_is_admin: - self.abort(403, 'Request requires admin') return list(config.db.jobs.find()) def add(self): @@ -279,14 +240,11 @@ def add(self): if not self.superuser_request: uid = self.uid - job = Queue.enqueue_job(payload,self.origin, perm_check_uid=uid) - + job = Queue.enqueue_job(payload, self.origin, perm_check_uid=uid) return { '_id': job.id_ } + @require_admin def stats(self): - if not self.superuser_request and not self.user_is_admin: - self.abort(403, 'Request requires admin') - all_flag = self.is_true('all') unique = self.is_true('unique') tags = self.request.GET.getall('tags') @@ -301,22 +259,16 @@ def stats(self): return Queue.get_statistics(tags=tags, last=last, unique=unique, all_flag=all_flag) + @require_admin def pending(self): - if not self.superuser_request and not self.user_is_admin: - self.abort(403, 'Request requires admin') - tags = self.request.GET.getall('tags') if len(tags) == 1: tags = tags[0].split(',') return Queue.get_pending(tags=tags) + @require_admin def next(self): - - if not self.superuser_request and not self.user_is_admin: - self.abort(403, 'Request requires admin') - - tags = self.request.GET.getall('tags') if len(tags) <= 0: tags = None @@ -328,40 +280,30 @@ def next(self): else: return job + @require_admin def reap_stale(self): - if not self.superuser_request and not self.user_is_admin: - self.abort(403, 'Request requires admin') - count = Queue.scan_for_orphans() return { 'orphaned': count } class JobHandler(base.RequestHandler): """Provides /Jobs/ routes.""" + @require_admin def get(self, _id): - - if not self.superuser_request and not self.user_is_admin: - self.abort(403, 'Request requires admin') - return Job.get(_id) + @require_admin def get_config(self, _id): """Get a job's config""" - if not self.superuser_request and not self.user_is_admin: - self.abort(403, 'Request requires superuser') - j = Job.get(_id) c = j.config if c is None: c = {} - # Serve config as formatted json file - self.response.headers['Content-Type'] = 'application/octet-stream' - self.response.headers['Content-Disposition'] = 'attachment; filename="config.json"' - # Detect if config is old- or new-style. # TODO: remove this logic with a DB upgrade, ref database.py's reserved upgrade section. + encoded = None if 'config' in c and c.get('inputs') is not None: # New behavior @@ -394,13 +336,14 @@ def get_config(self, _id): } encoded = pseudo_consistent_json_encode(c) - self.response.app_iter = StringIO.StringIO(encoded) - self.response.headers['Content-Length'] = str(len(encoded.encode('utf-8'))) # must be set after app_iter - else: - # Legacy behavior + + else: # Legacy behavior encoded = pseudo_consistent_json_encode({"config": c}) - self.response.app_iter = StringIO.StringIO(encoded) - self.response.headers['Content-Length'] = str(len(encoded.encode('utf-8'))) # must be set after app_iter + + stream = StringIO.StringIO(encoded) + length = len(encoded.encode('utf-8')) + + set_for_download(self.response, stream=stream, filename='config.json', length=length) @require_login def put(self, _id): @@ -455,15 +398,12 @@ def get_logs_text(self, _id): """Get a job's logs in raw text""" self._log_read_check(_id) + filename = 'job-' + _id + '-logs.txt' - self.response.headers['Content-Type'] = 'application/octet-stream' - self.response.headers['Content-Disposition'] = 'attachment; filename="job-' + _id + '-logs.txt"' - + set_for_download(self.response, filename=filename) for output in Logs.get_text_generator(_id): self.response.write(output) - return - def get_logs_html(self, _id): """Get a job's logs in html""" @@ -474,18 +414,12 @@ def get_logs_html(self, _id): return + @require_admin def add_logs(self, _id): - """Add to a job's logs""" - if not self.superuser_request and not self.user_is_admin: - self.abort(403, 'Request requires admin') - - doc = self.request.json - try: - Job.get(_id) - except Exception: # pylint: disable=broad-except - raise APINotFoundException('Job not found') + j = Job.get(_id) + Queue.mutate(j, {}) # Unconditionally heartbeat return Logs.add(_id, doc) @@ -506,17 +440,13 @@ def retry(self, _id): new_id = Queue.retry(j, force=True) return { "_id": new_id } - @require_drone def prepare_complete(self, _id): - j = Job.get(_id) payload = self.request.json - ticket = { - 'job': j.id_, - 'success': payload.get('success', False), - } - return {'ticket': config.db.job_tickets.insert_one(ticket).inserted_id} + success = payload['success'] + ticket = JobTicket.create(_id, success) + return { 'ticket': ticket } @require_login def accept_failed_output(self, _id): diff --git a/api/jobs/jobs.py b/api/jobs/jobs.py index c3061d5e7..35be594fd 100644 --- a/api/jobs/jobs.py +++ b/api/jobs/jobs.py @@ -370,6 +370,32 @@ def generate_request(self, gear): self.request = r return self.request +class JobTicket(object): + """ + A JobTicket represents an attempt to complete a job. + """ + + @staticmethod + def get(_id): + return config.db.job_tickets.find_one({'_id': bson.ObjectId(_id)}) + + @staticmethod + def create(job_id, success): + j = Job.get(job_id) + + result = config.db.job_tickets.insert_one({ + 'job': j.id_, + 'success': success, + }) + + return result.inserted_id + + @staticmethod + def find(job_id): + """Find any tickets with job ID""" + return list(config.db.job_tickets.find({'job': job_id})) + + class Logs(object): @staticmethod @@ -422,6 +448,11 @@ def get_html_generator(_id): @staticmethod def add(_id, doc): + + # Silently ignore adding no logs + if len(doc) <= 0: + return + log = config.db.job_logs.find_one({'_id': _id}) if log is None: # Race diff --git a/api/jobs/queue.py b/api/jobs/queue.py index 4d2ab8ce9..89ea8cc10 100644 --- a/api/jobs/queue.py +++ b/api/jobs/queue.py @@ -8,7 +8,7 @@ import datetime from .. import config -from .jobs import Job, Logs +from .jobs import Job, Logs, JobTicket from .gears import get_gear, validate_gear_config, fill_gear_default_values from ..dao.containerutil import create_filereference_from_dictionary, create_containerreference_from_dictionary, create_containerreference_from_filereference from ..web.errors import InputValidationException @@ -417,13 +417,26 @@ def scan_for_orphans(): """ orphaned = 0 + query = { + 'state': 'running', + 'modified': {'$lt': datetime.datetime.utcnow() - datetime.timedelta(seconds=100)}, + } while True: + orphan_candidate = config.db.jobs.find_one(query) + if orphan_candidate is None: + break + + # If the job is currently attempting to complete, do not orphan. + ticket = JobTicket.find(orphan_candidate['_id']) + if ticket is not None and len(ticket) > 0: + continue + + # CAS this job, since it does not have a ticket + select = { '_id': orphan_candidate['_id'] } + doc = config.db.jobs.find_one_and_update( - { - 'state': 'running', - 'modified': {'$lt': datetime.datetime.utcnow() - datetime.timedelta(seconds=100)}, - }, + dict(query, **select), { '$set': { 'state': 'failed', }, @@ -432,11 +445,12 @@ def scan_for_orphans(): ) if doc is None: - break + log.info('Job %s was heartbeat during a ticket lookup and thus not orhpaned', orphan_candidate['_id']) else: orphaned += 1 j = Job.load(doc) - Logs.add(j.id_, [{"msg":"The job did not report in for a long time and was canceled.", "fd":-1}]) + Logs.add(j.id_, [{'msg':'The job did not report in for a long time and was canceled.', 'fd':-1}]) new_id = Queue.retry(j) - Logs.add(j.id_, [{"msg": "Retried job as " + str(new_id) if new_id else "Job retries exceeded maximum allowed", "fd":-1}]) + Logs.add(j.id_, [{'msg': 'Retried job as ' + str(new_id) if new_id else 'Job retries exceeded maximum allowed', 'fd':-1}]) + return orphaned diff --git a/api/placer.py b/api/placer.py index 0c642d9fc..bc7643294 100644 --- a/api/placer.py +++ b/api/placer.py @@ -15,7 +15,7 @@ from .dao.containerstorage import SessionStorage, AcquisitionStorage from .dao import containerutil, hierarchy from .jobs import rules -from .jobs.jobs import Job +from .jobs.jobs import Job, JobTicket from .types import Origin from .web import encoder from .web.errors import FileFormException @@ -245,7 +245,7 @@ class EnginePlacer(Placer): """ A placer that can accept files and/or metadata sent to it from the engine - It uses update_container_hierarchy to update the container and it's parents' fields from the metadata + It uses update_container_hierarchy to update the container and its parents' fields from the metadata """ def check(self): @@ -288,9 +288,9 @@ def process_file_field(self, field, file_attrs): break if self.context.get('job_ticket_id'): - job_ticket_id = bson.ObjectId(self.context.get('job_ticket_id')) - job_ticket = config.db.job_tickets.find_one({'_id': job_ticket_id}) - if not job_ticket.get('success'): + job_ticket = JobTicket.get(self.context.get('job_ticket_id')) + + if not job_ticket['success']: file_attrs['from_failed_job'] = True self.save_file(field, file_attrs) @@ -312,9 +312,9 @@ def finalize(self): self.metadata[k].pop('files', {}) if self.context.get('job_ticket_id'): - job_ticket_id = bson.ObjectId(self.context.get('job_ticket_id')) - job_ticket = config.db.job_tickets.find_one({'_id': job_ticket_id}) - if job_ticket.get('success'): + job_ticket = JobTicket.get(self.context.get('job_ticket_id')) + + if job_ticket['success']: hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type) else: hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type) diff --git a/api/util.py b/api/util.py index 75bf6d463..54cc03c2d 100644 --- a/api/util.py +++ b/api/util.py @@ -177,6 +177,18 @@ def sanitize_string_to_filename(value): keepcharacters = (' ', '.', '_', '-') return "".join([c for c in value if c.isalnum() or c in keepcharacters]).rstrip() +def humanize_validation_error(val_err): + """ + Takes a jsonschema.ValidationError, returns a human-friendly string + """ + + key = 'none' + if len(val_err.relative_path) > 0: + key = val_err.relative_path[0] + message = val_err.message.replace("u'", "'") + + return 'Gear manifest does not match schema on key ' + key + ': ' + message + def obj_from_map(_map): """ Creates an anonymous object with properties determined by the passed (shallow) map. @@ -199,6 +211,20 @@ def path_from_hash(hash_): path = (hash_version, hash_alg, first_stanza, second_stanza, hash_) return os.path.join(*path) +def set_for_download(response, stream=None, filename=None, length=None): + """Takes a self.response, and various download options.""" + + # If an app_iter is to be set, it MUST be before these other headers are set. + if stream is not None: + response.app_iter = stream + + response.headers['Content-Type'] = 'application/octet-stream' + + if filename is not None: + response.headers['Content-Disposition'] = 'attachment; filename="' + filename + '"' + + if length is not None: + response.headers['Content-Length'] = str(length) def format_hash(hash_alg, hash_): """ @@ -206,7 +232,6 @@ def format_hash(hash_alg, hash_): """ return '-'.join(('v0', hash_alg, hash_)) - def create_json_http_exception_response(message, code, request_id, custom=None): content = { 'message': message, @@ -217,14 +242,12 @@ def create_json_http_exception_response(message, code, request_id, custom=None): content.update(custom) return content - def send_json_http_exception(response, message, code, request_id, custom=None): response.set_status(code) json_content = json.dumps(create_json_http_exception_response(message, code, request_id, custom)) response.headers['Content-Type'] = 'application/json; charset=utf-8' response.write(json_content) - class Enum(baseEnum.Enum): # Enum strings are prefixed by their class: "Category.classifier". # This overrides that behaviour and removes the prefix. From c1538a48903d0e1f2d563e7635e4882da3fed9fd Mon Sep 17 00:00:00 2001 From: Nathaniel Kofalt Date: Thu, 15 Feb 2018 10:42:46 -0600 Subject: [PATCH 2/5] Correct inaccurate util string --- api/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/util.py b/api/util.py index 54cc03c2d..64d2f85d1 100644 --- a/api/util.py +++ b/api/util.py @@ -187,7 +187,7 @@ def humanize_validation_error(val_err): key = val_err.relative_path[0] message = val_err.message.replace("u'", "'") - return 'Gear manifest does not match schema on key ' + key + ': ' + message + return 'Object does not match schema on key ' + key + ': ' + message def obj_from_map(_map): """ From ba5cdaf992ea127556b7fff05d9754e7a121ecba Mon Sep 17 00:00:00 2001 From: Nathaniel Kofalt Date: Thu, 15 Feb 2018 11:30:13 -0600 Subject: [PATCH 3/5] Calm jobs-next logging --- api/jobs/handlers.py | 2 +- api/web/base.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py index 4b6c69908..ebef965d6 100644 --- a/api/jobs/handlers.py +++ b/api/jobs/handlers.py @@ -276,7 +276,7 @@ def next(self): job = Queue.start_job(tags=tags) if job is None: - self.abort(400, 'No jobs to process') + raise InputValidationException('No jobs to process') else: return job diff --git a/api/web/base.py b/api/web/base.py index 2423142b0..f5283f96d 100644 --- a/api/web/base.py +++ b/api/web/base.py @@ -335,7 +335,6 @@ def handle_exception(self, exception, debug, return_json=False): # pylint: disab code = exception.code elif isinstance(exception, errors.InputValidationException): code = 400 - self.request.logger.warning(str(exception)) elif isinstance(exception, errors.APIAuthProviderException): code = 401 elif isinstance(exception, errors.APIRefreshTokenException): From 15051398d6f046b68bc98ec75e7decfe513a4b89 Mon Sep 17 00:00:00 2001 From: Nathaniel Kofalt Date: Tue, 20 Feb 2018 06:51:42 -0600 Subject: [PATCH 4/5] Save job mutation when using job ticket --- api/placer.py | 24 +++++++++++++++++------- api/tempdir.py | 2 +- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/api/placer.py b/api/placer.py index bc7643294..e11d3b5be 100644 --- a/api/placer.py +++ b/api/placer.py @@ -12,10 +12,11 @@ from . import tempdir as tempfile from . import util from . import validators -from .dao.containerstorage import SessionStorage, AcquisitionStorage from .dao import containerutil, hierarchy +from .dao.containerstorage import SessionStorage, AcquisitionStorage from .jobs import rules from .jobs.jobs import Job, JobTicket +from .jobs.queue import Queue from .types import Origin from .web import encoder from .web.errors import FileFormException @@ -277,7 +278,6 @@ def check(self): self.metadata[self.container_type]['files'] = files_ ### - def process_file_field(self, field, file_attrs): if self.metadata is not None: file_mds = self.metadata.get(self.container_type, {}).get('files', []) @@ -297,6 +297,15 @@ def process_file_field(self, field, file_attrs): self.saved.append(file_attrs) def finalize(self): + job = None + job_ticket = None + success = True + + if self.context.get('job_ticket_id'): + job_ticket = JobTicket.get(self.context.get('job_ticket_id')) + job = Job.get(job_ticket['job']) + success = job_ticket['success'] + if self.metadata is not None: bid = bson.ObjectId(self.id_) @@ -311,13 +320,14 @@ def finalize(self): for k in self.metadata.keys(): self.metadata[k].pop('files', {}) - if self.context.get('job_ticket_id'): - job_ticket = JobTicket.get(self.context.get('job_ticket_id')) + if success: + hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type) - if job_ticket['success']: - hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type) + if job_ticket is not None: + if success: + Queue.mutate(job, {'state': 'complete'}) else: - hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type) + Queue.mutate(job, {'state': 'failed'}) if self.context.get('job_id'): job = Job.get(self.context.get('job_id')) diff --git a/api/tempdir.py b/api/tempdir.py index ff4b30f0b..876ea85e0 100644 --- a/api/tempdir.py +++ b/api/tempdir.py @@ -61,7 +61,7 @@ def __exit__(self, exc, value, tb): def __del__(self): # Issue a Warning if implicit cleanup needed - self.cleanup(_warn=True) + self.cleanup() # The following code attempts to make # this class tolerant of the module nulling out process From adf5b5baf5817623cea9b8fe28de3c72d3dd7c9e Mon Sep 17 00:00:00 2001 From: Nathaniel Kofalt Date: Tue, 20 Feb 2018 06:52:31 -0600 Subject: [PATCH 5/5] Jobs must be running before results are uploaded --- tests/integration_tests/python/test_jobs.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/tests/integration_tests/python/test_jobs.py b/tests/integration_tests/python/test_jobs.py index 57e09e919..72d76622f 100644 --- a/tests/integration_tests/python/test_jobs.py +++ b/tests/integration_tests/python/test_jobs.py @@ -418,6 +418,8 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_ } } + api_db.jobs.update_one({'_id': bson.ObjectId(job)}, {'$set':{'state': 'running'}}) + r = as_drone.post('/engine', params={'level': 'acquisition', 'id': acquisition, 'job': job, 'job_ticket': job_ticket['_id']}, files=file_form('result.txt', meta=metadata) @@ -441,16 +443,6 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_ r = as_user.post('/jobs/' + job + '/accept-failed-output') assert r.status_code == 403 - # try to accept failed output - job is not in failed state yet - r = as_admin.post('/jobs/' + job + '/accept-failed-output') - assert r.status_code == 400 - - # set job state to failed - r = as_drone.put('/jobs/' + job, json={'state': 'running'}) - assert r.ok - r = as_drone.put('/jobs/' + job, json={'state': 'failed'}) - assert r.ok - # accept failed output r = as_admin.post('/jobs/' + job + '/accept-failed-output') assert r.ok