Skip to content
Snippets Groups Projects
Commit 19cfdb00 authored by Megan Henning's avatar Megan Henning Committed by GitHub
Browse files

Merge pull request #629 from scitran/batch-analysis

Support tag and analysis label for batch
parents a9a70b6f d2d3c420
No related branches found
No related tags found
No related merge requests found
......@@ -361,29 +361,46 @@ class AcquisitionStorage(ContainerStorage):
SessionStorage().recalc_session_compliance(acquisition['session'])
return result
def get_all_for_targets(self, target_type, target_ids, user=None, projection=None):
def get_all_for_targets(self, target_type, target_ids,
user=None, projection=None, collection_id=None, include_archived=True):
"""
Given a container type and list of ids, get all acquisitions that are in those hierarchies.
For example, if target_type='projects' and target_ids=['id1', 'id2'], this method will return
all acquisitions that are in sessions in project id1 and project id2.
Params `target_ids` and `collection`
If user is supplied, will only return acquisitions with user in its perms list.
If projection is supplied, it will be applied to the acquisition query.
If colllection is supplied, the collection context will be used to query acquisitions.
If inlude_archived is false, it will ignore archived acquisitions.
- if target_type is 'project', it will ignore sessions in the project that are archived
"""
query = {}
if not include_archived:
query['archived'] = {'$ne': True}
# If target_type is 'acquisitions', it just wraps self.get_all_el with a query containing
# all acquisition ids.
if target_type in ['acquisition', 'acquisitions']:
return self.get_all_el({'_id': {'$in':target_ids}}, user, projection)
query['_id'] = {'$in':target_ids}
return self.get_all_el(query, user, projection)
# Find session ids from projects
session_ids = None
if target_type in ['project', 'projects']:
query = {'project': {'$in':target_ids}}
query['project'] = {'$in':target_ids}
session_ids = [s['_id'] for s in SessionStorage().get_all_el(query, user, {'_id':1})]
elif target_type in ['session', 'sessions']:
session_ids = target_ids
else:
raise ValueError('Target type must be of type project, session or acquisition.')
return self.get_all_el({'session': {'$in':session_ids}}, user, projection)
# Using session ids, find acquisitions
query.pop('project', None)
query['session'] = {'$in':session_ids}
if collection_id:
query['collections'] = collection_id
return self.get_all_el(query, user, projection)
......@@ -5,7 +5,7 @@ import bson
import datetime
from .. import config
from ..dao import APINotFoundException
from ..dao import APINotFoundException, APIStorageException
from ..dao.containerstorage import AcquisitionStorage
from ..dao.containerutil import create_filereference_from_dictionary, create_containerreference_from_filereference
from ..dao.liststorage import AnalysesStorage
......@@ -130,16 +130,24 @@ def run(batch_job):
Creates jobs from proposed inputs, returns jobs enqueued.
"""
proposed_inputs = batch_job.get('proposed_inputs', [])
proposal = batch_job.get('proposal')
if not proposal:
raise APIStorageException('The batch job is not formatted correctly.')
proposed_inputs = proposal.get('inputs', [])
gear_name = batch_job.get('gear')
gear = gears.get_gear_by_name(gear_name)
config_ = batch_job.get('config')
origin = batch_job.get('origin')
tags = proposal.get('tags', [])
tags.append('batch')
if gear.get('category') == 'analysis':
analysis = proposal.get('analysis', {})
if not analysis.get('label'):
time_now = datetime.datetime.utcnow()
analysis['label'] = {'label': '{} {}'.format(gear_name, time_now)}
an_storage = AnalysesStorage('sessions', 'analyses', use_object_id = True)
acq_storage = AcquisitionStorage()
time_now = datetime.datetime.utcnow()
jobs = []
job_ids = []
......@@ -147,13 +155,11 @@ def run(batch_job):
if gear.get('category') == 'analysis':
# Analysis gear, must create analysis on session
# Create job and analysis proposal objects:
analysis = {'label': '{} {}'.format(gear_name, time_now)}
job = {
'config': config_,
'gear': gear_name,
'inputs': inputs
'inputs': inputs,
'tags': tags
}
# Create analysis
......@@ -169,7 +175,7 @@ def run(batch_job):
for input_name, fr in inputs.iteritems():
inputs[input_name] = create_filereference_from_dictionary(fr)
destination = create_containerreference_from_filereference(inputs[inputs.keys()[0]])
job = Job(gear_name, inputs, destination=destination, tags=['batch'], config_=config_, origin=origin)
job = Job(gear_name, inputs, destination=destination, tags=tags, config_=config_, origin=origin)
job_id = job.insert()
jobs.append(job)
......
......@@ -6,7 +6,7 @@ import json
import StringIO
from jsonschema import ValidationError
from ..auth import require_login
from ..auth import require_login, has_access
from ..dao import APIPermissionException
from ..dao.containerstorage import AcquisitionStorage
from ..dao.containerutil import create_filereference_from_dictionary, create_containerreference_from_dictionary, create_containerreference_from_filereference, ContainerReference
......@@ -278,7 +278,7 @@ class BatchHandler(base.RequestHandler):
query = {}
else:
query = {'origin.id': self.uid}
return batch.get_all(query, {'proposed_inputs':0})
return batch.get_all(query, {'proposal':0})
@require_login
def get(self, _id):
......@@ -288,26 +288,27 @@ class BatchHandler(base.RequestHandler):
"""
get_jobs = self.is_true('jobs')
batch_job = batch.get(_id, projection={'proposed_inputs':0}, get_jobs=get_jobs)
batch_job = batch.get(_id, projection={'proposal':0}, get_jobs=get_jobs)
self._check_permission(batch_job)
return batch_job
@require_login
def post(self):
"""
Create a batch job proposal, insert as 'pending'.
Create a batch job proposal, insert as 'pending' if there are matched containers
"""
if self.superuser_request:
# Don't enforce permissions for superuser requests
user = None
else:
user = {'_id': self.uid, 'site': self.user_site}
payload = self.request.json
gear_name = payload.get('gear')
targets = payload.get('targets')
config_ = payload.get('config', {})
analysis_data = payload.get('analysis', {})
tags = payload.get('tags', [])
# Request might specify a collection context
collection_id = payload.get('target_context', {}).get('id', None)
if collection_id:
collection_id = bson.ObjectId(collection_id)
if not gear_name or not targets:
self.abort(400, 'A gear name and list of target containers is required.')
......@@ -317,6 +318,7 @@ class BatchHandler(base.RequestHandler):
container_ids = []
container_type = None
# Get list of container ids from target list
for t in targets:
if not container_type:
container_type = t.get('type')
......@@ -326,33 +328,57 @@ class BatchHandler(base.RequestHandler):
self.abort(400, 'targets must all be of same type.')
container_ids.append(t.get('id'))
# Get Acquisitions associated with targets
objectIds = [bson.ObjectId(x) for x in container_ids]
containers = AcquisitionStorage().get_all_for_targets(container_type, objectIds, user=user, projection={'permissions': 0})
containers = AcquisitionStorage().get_all_for_targets(container_type, objectIds,
collection_id=collection_id, include_archived=False)
if not containers:
self.abort(404, 'Could not find specified targets, targets have no acquisitions, or user does not have access to targets.')
self.abort(404, 'Could not find acquisitions from targets.')
improper_permissions = []
acquisitions = []
# Make sure user has read-write access, add those to acquisition list
for c in containers:
if self.superuser_request or has_access(self.uid, c, 'rw'):
c.pop('permissions')
acquisitions.append(c)
else:
improper_permissions.append(c['_id'])
if not acquisitions:
self.abort(403, 'User does not have write access to targets.')
results = batch.find_matching_conts(gear, containers, 'acquisition')
results = batch.find_matching_conts(gear, acquisitions, 'acquisition')
matched = results['matched']
if not matched:
self.abort(400, 'No target containers matched gear input spec.')
batch_proposal = {
'_id': bson.ObjectId(),
'gear': gear_name,
'config': config_,
'state': 'pending',
'origin': self.origin,
'proposed_inputs': [c.pop('inputs') for c in matched]
}
batch.insert(batch_proposal)
batch_proposal.pop('proposed_inputs')
batch_proposal = {}
# If there are matches, create a batch job object and insert it
if matched:
batch_proposal = {
'_id': bson.ObjectId(),
'gear': gear_name,
'config': config_,
'state': 'pending',
'origin': self.origin,
'proposal': {
'inputs': [c.pop('inputs') for c in matched],
'analysis': analysis_data,
'tags': tags
}
}
batch.insert(batch_proposal)
batch_proposal.pop('proposal')
# Either way, return information about the status of the containers
batch_proposal['not_matched'] = results['not_matched'],
batch_proposal['ambiguous'] = results['ambiguous'],
batch_proposal['matched'] = matched
batch_proposal['improper_permissions'] = improper_permissions
return batch_proposal
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment