Skip to content
Snippets Groups Projects
Unverified Commit 605ffa59 authored by Megan Henning's avatar Megan Henning Committed by GitHub
Browse files

Merge pull request #964 from scitran/no-input-batch

Allow batch processing of gears with no input (SDK gears)
parents 0406ad9f b25331c2
No related branches found
No related tags found
No related merge requests found
import datetime
import bson
import copy
from . import APIStorageException, APINotFoundException
from . import containerutil
......@@ -187,6 +188,45 @@ class SessionStorage(ContainerStorage):
return True
return False
def get_all_for_targets(self, target_type, target_ids,
user=None, projection=None, include_archived=True):
"""
Given a container type and list of ids, get all sessions that are in those hierarchies.
For example, if target_type='projects' and target_ids=['id1', 'id2'], this method will return
all sessions that are in project id1 and project id2.
Params `target_ids` and `collection`
If user is supplied, will only return sessions with user in its perms list.
If projection is supplied, it will be applied to the session query.
If inlude_archived is false, it will ignore archived sessions.
"""
query = {}
if not include_archived:
query['archived'] = {'$ne': True}
target_type = containerutil.singularize(target_type)
if target_type == 'project':
query['project'] = {'$in':target_ids}
elif target_type == 'session':
query['_id'] = {'$in':target_ids}
elif target_type == 'acquisition':
a_query = copy.deepcopy(query)
a_query['_id'] = {'$in':target_ids}
session_ids = list(set([a['session'] for a in AcquisitionStorage().get_all_el(a_query, user, {'session':1})]))
query['_id'] = {'$in':session_ids}
else:
raise ValueError('Cannot get all sessions from target container {}'.format(target_type))
return self.get_all_el(query, user, projection)
class AcquisitionStorage(ContainerStorage):
......
......@@ -2,6 +2,7 @@
Batch
"""
import bson
import copy
import datetime
from .. import config
......@@ -134,6 +135,7 @@ def run(batch_job):
if not proposal:
raise APIStorageException('The batch job is not formatted correctly.')
proposed_inputs = proposal.get('inputs', [])
proposed_destinations = proposal.get('destinations', [])
gear_id = batch_job['gear_id']
gear = gears.get_gear(gear_id)
......@@ -154,15 +156,19 @@ def run(batch_job):
jobs = []
job_ids = []
job_defaults = {
'config': config_,
'gear_id': gear_id,
'tags': tags,
'batch': str(batch_job.get('_id')),
'inputs': {}
}
for inputs in proposed_inputs:
job_map = {
'config': config_,
'gear_id': gear_id,
'inputs': inputs,
'tags': tags,
'batch': str(batch_job.get('_id'))
}
job_map = copy.deepcopy(job_defaults)
job_map['inputs'] = inputs
if gear.get('category') == 'analysis':
......@@ -182,6 +188,27 @@ def run(batch_job):
jobs.append(job)
job_ids.append(job_id)
for dest in proposed_destinations:
job_map = copy.deepcopy(job_defaults)
job_map['destination'] = dest
if gear.get('category') == 'analysis':
# Create analysis
result = an_storage.create_job_and_analysis('sessions', bson.ObjectId(dest['id']), analysis, job_map, origin, None)
job = result.get('job')
job_id = result.get('job_id')
else:
job = Queue.enqueue_job(job_map, origin)
job_id = job.id_
jobs.append(job)
job_ids.append(job_id)
update(batch_job['_id'], {'state': 'running', 'jobs': job_ids})
return jobs
......
......@@ -11,7 +11,7 @@ from .. import upload
from .. import util
from ..auth import require_login, has_access
from ..dao import APIPermissionException, APINotFoundException
from ..dao.containerstorage import ProjectStorage, AcquisitionStorage
from ..dao.containerstorage import ProjectStorage, SessionStorage, AcquisitionStorage
from ..dao.containerutil import ContainerReference
from ..web import base
from ..web.encoder import pseudo_consistent_json_encode
......@@ -546,29 +546,51 @@ class BatchHandler(base.RequestHandler):
self.abort(400, 'targets must all be of same type.')
container_ids.append(t.get('id'))
# Get acquisitions associated with targets
objectIds = [bson.ObjectId(x) for x in container_ids]
containers = AcquisitionStorage().get_all_for_targets(container_type, objectIds,
collection_id=collection_id, include_archived=False)
# Determine if gear is no-input gear
file_inputs = False
for input_ in gear['gear'].get('inputs', {}).itervalues():
if input_['base'] == 'file':
file_inputs = True
break
if not file_inputs:
# Grab sessions rather than acquisitions
containers = SessionStorage().get_all_for_targets(container_type, objectIds, include_archived=False)
else:
# Get acquisitions associated with targets
containers = AcquisitionStorage().get_all_for_targets(container_type, objectIds,
collection_id=collection_id, include_archived=False)
if not containers:
self.abort(404, 'Could not find acquisitions from targets.')
self.abort(404, 'Could not find necessary containers from targets.')
improper_permissions = []
acquisitions = []
perm_checked_conts = []
# Make sure user has read-write access, add those to acquisition list
for c in containers:
if self.superuser_request or has_access(self.uid, c, 'rw'):
c.pop('permissions')
acquisitions.append(c)
perm_checked_conts.append(c)
else:
improper_permissions.append(c['_id'])
if not acquisitions:
if not perm_checked_conts:
self.abort(403, 'User does not have write access to targets.')
results = batch.find_matching_conts(gear, acquisitions, 'acquisition')
if not file_inputs:
# All containers become matched destinations
results = {
'matched': [{'id': str(x['_id']), 'type': 'session'} for x in containers]
}
else:
# Look for file matches in each acquisition
results = batch.find_matching_conts(gear, perm_checked_conts, 'acquisition')
matched = results['matched']
batch_proposal = {}
......@@ -583,18 +605,22 @@ class BatchHandler(base.RequestHandler):
'state': 'pending',
'origin': self.origin,
'proposal': {
'inputs': [c.pop('inputs') for c in matched],
'analysis': analysis_data,
'tags': tags
}
}
if not file_inputs:
batch_proposal['proposal']['destinations'] = matched
else:
batch_proposal['proposal']['inputs'] = [c.pop('inputs') for c in matched]
batch.insert(batch_proposal)
batch_proposal.pop('proposal')
# Either way, return information about the status of the containers
batch_proposal['not_matched'] = results['not_matched']
batch_proposal['ambiguous'] = results['ambiguous']
batch_proposal['not_matched'] = results.get('not_matched', [])
batch_proposal['ambiguous'] = results.get('ambiguous', [])
batch_proposal['matched'] = matched
batch_proposal['improper_permissions'] = improper_permissions
......
import bson
import time
def test_batch(data_builder, as_user, as_admin, as_root):
......@@ -246,3 +247,133 @@ def test_batch(data_builder, as_user, as_admin, as_root):
# test batch is complete
r = as_admin.get('/batch/' + batch_id)
assert r.json()['state'] == 'failed'
def test_no_input_batch(data_builder, default_payload, randstr, as_admin, as_root, api_db):
project = data_builder.create_project()
session = data_builder.create_session(project=project)
acquisition = data_builder.create_acquisition(session=session)
gear_name = randstr()
gear_doc = default_payload['gear']
gear_doc['gear']['name'] = gear_name
gear_doc['gear']['inputs'] = {
'api_key': {
'base': 'api-key'
}
}
r = as_root.post('/gears/' + gear_name, json=gear_doc)
assert r.ok
gear = r.json()['_id']
# create a batch w/o inputs targeting session
r = as_admin.post('/batch', json={
'gear_id': gear,
'targets': [{'type': 'session', 'id': session}]
})
assert r.ok
batch1 = r.json()
assert len(batch1['matched']) == 1
assert batch1['matched'][0]['id'] == session
# create a batch w/o inputs targeting acquisition
r = as_admin.post('/batch', json={
'gear_id': gear,
'targets': [{'type': 'acquisition', 'id': acquisition}]
})
assert r.ok
batch2 = r.json()
assert len(batch2['matched']) == 1
assert batch2['matched'][0]['id'] == session
# create a batch w/o inputs targeting project
r = as_admin.post('/batch', json={
'gear_id': gear,
'targets': [{'type': 'project', 'id': project}]
})
assert r.ok
batch3 = r.json()
assert len(batch3['matched']) == 1
assert batch3['matched'][0]['id'] == session
batch_id = batch1['_id']
# run batch
r = as_admin.post('/batch/' + batch_id + '/run')
assert r.ok
# test batch.state after calling run
r = as_admin.get('/batch/' + batch_id)
assert r.json()['state'] == 'running'
jobs = r.json()['jobs']
for job in jobs:
# set jobs to failed
r = as_root.put('/jobs/' + job, json={'state': 'running'})
assert r.ok
r = as_root.put('/jobs/' + job, json={'state': 'complete'})
assert r.ok
# test batch is complete
r = as_admin.get('/batch/' + batch_id)
assert r.json()['state'] == 'complete'
## Test no-input anlaysis gear ##
gear_name = randstr()
gear_doc = default_payload['gear']
gear_doc['category'] = 'analysis'
gear_doc['gear']['name'] = gear_name
gear_doc['gear']['inputs'] = {
'api_key': {
'base': 'api-key'
}
}
r = as_root.post('/gears/' + gear_name, json=gear_doc)
assert r.ok
gear2 = r.json()['_id']
# create a batch w/o inputs targeting session
r = as_admin.post('/batch', json={
'gear_id': gear2,
'targets': [{'type': 'session', 'id': session}]
})
assert r.ok
batch4 = r.json()
assert len(batch4['matched']) == 1
assert batch4['matched'][0]['id'] == session
batch_id = batch4['_id']
# run batch
r = as_admin.post('/batch/' + batch_id + '/run')
assert r.ok
# test batch.state after calling run
r = as_admin.get('/batch/' + batch_id)
assert r.json()['state'] == 'running'
jobs = r.json()['jobs']
for job in jobs:
# set jobs to failed
r = as_root.put('/jobs/' + job, json={'state': 'running'})
assert r.ok
r = as_root.put('/jobs/' + job, json={'state': 'complete'})
assert r.ok
# cleanup
r = as_root.delete('/gears/' + gear)
assert r.ok
r = as_root.delete('/gears/' + gear2)
assert r.ok
# must remove jobs manually because gears were added manually
api_db.jobs.remove({'gear_id': {'$in': [gear, gear2]}})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment