Skip to content

Commit

Permalink
Merge pull request #964 from scitran/no-input-batch
Browse files Browse the repository at this point in the history
Allow batch processing of gears with no input (SDK gears)
  • Loading branch information
nagem authored Nov 7, 2017
2 parents 0406ad9 + b25331c commit 605ffa5
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 19 deletions.
40 changes: 40 additions & 0 deletions api/dao/containerstorage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime

import bson
import copy

from . import APIStorageException, APINotFoundException
from . import containerutil
Expand Down Expand Up @@ -187,6 +188,45 @@ def recalc_session_compliance(self, session_id, session=None, template=None, har
return True
return False

def get_all_for_targets(self, target_type, target_ids,
user=None, projection=None, include_archived=True):
"""
Given a container type and list of ids, get all sessions that are in those hierarchies.
For example, if target_type='projects' and target_ids=['id1', 'id2'], this method will return
all sessions that are in project id1 and project id2.
Params `target_ids` and `collection`
If user is supplied, will only return sessions with user in its perms list.
If projection is supplied, it will be applied to the session query.
If inlude_archived is false, it will ignore archived sessions.
"""

query = {}
if not include_archived:
query['archived'] = {'$ne': True}

target_type = containerutil.singularize(target_type)

if target_type == 'project':
query['project'] = {'$in':target_ids}

elif target_type == 'session':
query['_id'] = {'$in':target_ids}

elif target_type == 'acquisition':
a_query = copy.deepcopy(query)
a_query['_id'] = {'$in':target_ids}
session_ids = list(set([a['session'] for a in AcquisitionStorage().get_all_el(a_query, user, {'session':1})]))
query['_id'] = {'$in':session_ids}

else:
raise ValueError('Cannot get all sessions from target container {}'.format(target_type))

return self.get_all_el(query, user, projection)



class AcquisitionStorage(ContainerStorage):

Expand Down
41 changes: 34 additions & 7 deletions api/jobs/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Batch
"""
import bson
import copy
import datetime

from .. import config
Expand Down Expand Up @@ -134,6 +135,7 @@ def run(batch_job):
if not proposal:
raise APIStorageException('The batch job is not formatted correctly.')
proposed_inputs = proposal.get('inputs', [])
proposed_destinations = proposal.get('destinations', [])

gear_id = batch_job['gear_id']
gear = gears.get_gear(gear_id)
Expand All @@ -154,15 +156,19 @@ def run(batch_job):

jobs = []
job_ids = []

job_defaults = {
'config': config_,
'gear_id': gear_id,
'tags': tags,
'batch': str(batch_job.get('_id')),
'inputs': {}
}

for inputs in proposed_inputs:

job_map = {
'config': config_,
'gear_id': gear_id,
'inputs': inputs,
'tags': tags,
'batch': str(batch_job.get('_id'))
}
job_map = copy.deepcopy(job_defaults)
job_map['inputs'] = inputs

if gear.get('category') == 'analysis':

Expand All @@ -182,6 +188,27 @@ def run(batch_job):
jobs.append(job)
job_ids.append(job_id)

for dest in proposed_destinations:

job_map = copy.deepcopy(job_defaults)
job_map['destination'] = dest

if gear.get('category') == 'analysis':

# Create analysis
result = an_storage.create_job_and_analysis('sessions', bson.ObjectId(dest['id']), analysis, job_map, origin, None)
job = result.get('job')
job_id = result.get('job_id')

else:

job = Queue.enqueue_job(job_map, origin)
job_id = job.id_


jobs.append(job)
job_ids.append(job_id)

update(batch_job['_id'], {'state': 'running', 'jobs': job_ids})
return jobs

Expand Down
50 changes: 38 additions & 12 deletions api/jobs/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .. import util
from ..auth import require_login, has_access
from ..dao import APIPermissionException, APINotFoundException
from ..dao.containerstorage import ProjectStorage, AcquisitionStorage
from ..dao.containerstorage import ProjectStorage, SessionStorage, AcquisitionStorage
from ..dao.containerutil import ContainerReference
from ..web import base
from ..web.encoder import pseudo_consistent_json_encode
Expand Down Expand Up @@ -546,29 +546,51 @@ def post(self):
self.abort(400, 'targets must all be of same type.')
container_ids.append(t.get('id'))

# Get acquisitions associated with targets
objectIds = [bson.ObjectId(x) for x in container_ids]
containers = AcquisitionStorage().get_all_for_targets(container_type, objectIds,
collection_id=collection_id, include_archived=False)

# Determine if gear is no-input gear
file_inputs = False
for input_ in gear['gear'].get('inputs', {}).itervalues():
if input_['base'] == 'file':
file_inputs = True
break

if not file_inputs:
# Grab sessions rather than acquisitions
containers = SessionStorage().get_all_for_targets(container_type, objectIds, include_archived=False)

else:
# Get acquisitions associated with targets
containers = AcquisitionStorage().get_all_for_targets(container_type, objectIds,
collection_id=collection_id, include_archived=False)

if not containers:
self.abort(404, 'Could not find acquisitions from targets.')
self.abort(404, 'Could not find necessary containers from targets.')

improper_permissions = []
acquisitions = []
perm_checked_conts = []

# Make sure user has read-write access, add those to acquisition list
for c in containers:
if self.superuser_request or has_access(self.uid, c, 'rw'):
c.pop('permissions')
acquisitions.append(c)
perm_checked_conts.append(c)
else:
improper_permissions.append(c['_id'])

if not acquisitions:
if not perm_checked_conts:
self.abort(403, 'User does not have write access to targets.')

results = batch.find_matching_conts(gear, acquisitions, 'acquisition')
if not file_inputs:
# All containers become matched destinations

results = {
'matched': [{'id': str(x['_id']), 'type': 'session'} for x in containers]
}

else:
# Look for file matches in each acquisition
results = batch.find_matching_conts(gear, perm_checked_conts, 'acquisition')

matched = results['matched']
batch_proposal = {}
Expand All @@ -583,18 +605,22 @@ def post(self):
'state': 'pending',
'origin': self.origin,
'proposal': {
'inputs': [c.pop('inputs') for c in matched],
'analysis': analysis_data,
'tags': tags
}
}

if not file_inputs:
batch_proposal['proposal']['destinations'] = matched
else:
batch_proposal['proposal']['inputs'] = [c.pop('inputs') for c in matched]

batch.insert(batch_proposal)
batch_proposal.pop('proposal')

# Either way, return information about the status of the containers
batch_proposal['not_matched'] = results['not_matched']
batch_proposal['ambiguous'] = results['ambiguous']
batch_proposal['not_matched'] = results.get('not_matched', [])
batch_proposal['ambiguous'] = results.get('ambiguous', [])
batch_proposal['matched'] = matched
batch_proposal['improper_permissions'] = improper_permissions

Expand Down
131 changes: 131 additions & 0 deletions test/integration_tests/python/test_batch.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import bson
import time

def test_batch(data_builder, as_user, as_admin, as_root):
Expand Down Expand Up @@ -246,3 +247,133 @@ def test_batch(data_builder, as_user, as_admin, as_root):
# test batch is complete
r = as_admin.get('/batch/' + batch_id)
assert r.json()['state'] == 'failed'

def test_no_input_batch(data_builder, default_payload, randstr, as_admin, as_root, api_db):
project = data_builder.create_project()
session = data_builder.create_session(project=project)
acquisition = data_builder.create_acquisition(session=session)

gear_name = randstr()
gear_doc = default_payload['gear']
gear_doc['gear']['name'] = gear_name
gear_doc['gear']['inputs'] = {
'api_key': {
'base': 'api-key'
}
}


r = as_root.post('/gears/' + gear_name, json=gear_doc)
assert r.ok

gear = r.json()['_id']


# create a batch w/o inputs targeting session
r = as_admin.post('/batch', json={
'gear_id': gear,
'targets': [{'type': 'session', 'id': session}]
})
assert r.ok
batch1 = r.json()

assert len(batch1['matched']) == 1
assert batch1['matched'][0]['id'] == session

# create a batch w/o inputs targeting acquisition
r = as_admin.post('/batch', json={
'gear_id': gear,
'targets': [{'type': 'acquisition', 'id': acquisition}]
})
assert r.ok
batch2 = r.json()
assert len(batch2['matched']) == 1
assert batch2['matched'][0]['id'] == session

# create a batch w/o inputs targeting project
r = as_admin.post('/batch', json={
'gear_id': gear,
'targets': [{'type': 'project', 'id': project}]
})
assert r.ok
batch3 = r.json()
assert len(batch3['matched']) == 1
assert batch3['matched'][0]['id'] == session

batch_id = batch1['_id']

# run batch
r = as_admin.post('/batch/' + batch_id + '/run')
assert r.ok

# test batch.state after calling run
r = as_admin.get('/batch/' + batch_id)
assert r.json()['state'] == 'running'
jobs = r.json()['jobs']

for job in jobs:
# set jobs to failed
r = as_root.put('/jobs/' + job, json={'state': 'running'})
assert r.ok
r = as_root.put('/jobs/' + job, json={'state': 'complete'})
assert r.ok

# test batch is complete
r = as_admin.get('/batch/' + batch_id)
assert r.json()['state'] == 'complete'

## Test no-input anlaysis gear ##

gear_name = randstr()
gear_doc = default_payload['gear']
gear_doc['category'] = 'analysis'
gear_doc['gear']['name'] = gear_name
gear_doc['gear']['inputs'] = {
'api_key': {
'base': 'api-key'
}
}

r = as_root.post('/gears/' + gear_name, json=gear_doc)
assert r.ok

gear2 = r.json()['_id']

# create a batch w/o inputs targeting session
r = as_admin.post('/batch', json={
'gear_id': gear2,
'targets': [{'type': 'session', 'id': session}]
})
assert r.ok
batch4 = r.json()

assert len(batch4['matched']) == 1
assert batch4['matched'][0]['id'] == session
batch_id = batch4['_id']

# run batch
r = as_admin.post('/batch/' + batch_id + '/run')
assert r.ok

# test batch.state after calling run
r = as_admin.get('/batch/' + batch_id)
assert r.json()['state'] == 'running'
jobs = r.json()['jobs']

for job in jobs:
# set jobs to failed
r = as_root.put('/jobs/' + job, json={'state': 'running'})
assert r.ok
r = as_root.put('/jobs/' + job, json={'state': 'complete'})
assert r.ok

# cleanup

r = as_root.delete('/gears/' + gear)
assert r.ok

r = as_root.delete('/gears/' + gear2)
assert r.ok

# must remove jobs manually because gears were added manually
api_db.jobs.remove({'gear_id': {'$in': [gear, gear2]}})

0 comments on commit 605ffa5

Please sign in to comment.