Skip to content
Snippets Groups Projects
Commit b9b2f8aa authored by Megan Henning's avatar Megan Henning
Browse files

Merge pull request #333 from scitran/job-analyses

Job-based Analyses
parents 961dd401 7a039948
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,11 @@ import bson.objectid
from .. import config
from ..auth import INTEGER_ROLES
CONT_TYPES = ['acquisition', 'analysis', 'collection', 'group', 'project', 'session']
def getPerm(name):
return INTEGER_ROLES[name]
def add_id_to_subject(subject, pid):
"""
Add a mongo id field to given subject object (dict)
......@@ -31,14 +36,55 @@ def add_id_to_subject(subject, pid):
subject['_id'] = bson.ObjectId()
return subject
def inflate_job_info(analysis):
"""
Inflate job from id ref in analysis
Lookup job via id stored on analysis
Lookup input filerefs and inflate into files array with 'input': True
If job is in failed state, look for most recent job referencing this analysis
Update analysis if new job is found
"""
if analysis.get('job') is None:
return analysis
job = config.db.jobs.find_one({'_id': analysis['job']})
if job is None:
raise Exception('No job with id {} found.'.format(analysis['job']))
# If the job currently tied to the analysis failed, try to find one that didn't
while job.get('state') == 'failed' and job.get('_id') is not None:
next_job = config.db.jobs.find_one({'previous_job_id': job['_id']})
if next_job is None:
break
job = next_job
if job['_id'] != analysis['job']:
# Update analysis if job has changed
q = {'analyses._id': analysis['_id']}
u = {'$set': {'analyses.$.job': job['_id']}}
config.db.sessions.update_one(q, u)
analysis['job'] = job
# Inflate files from job inputs, add to analysis file array
files = analysis.get('files', [])
inputs = job.get('inputs', {})
for i in inputs.keys():
fileref = create_filereference_from_dictionary(inputs[i])
contref = create_containerreference_from_filereference(fileref)
file_ = contref.find_file(fileref.name)
if file_:
file_['input'] = True
files.append(file_)
analysis['files'] = files
return analysis
def getPerm(name):
return INTEGER_ROLES[name]
class ContainerReference(object):
def __init__(self, type, id):
if type.endswith('s'):
raise Exception('Container type cannot be plural :|')
if type not in CONT_TYPES:
raise Exception('Container type must be one of {}'.format(CONT_TYPES))
self.type = type
self.id = id
......@@ -63,6 +109,13 @@ class ContainerReference(object):
raise Exception("No such " + self.type + " " + self.id + " in database")
return result
def find_file(self, filename):
cont = self.get()
for f in cont.get('files', []):
if f['name'] == filename:
return f
return None
def check_access(self, userID, perm_name):
perm = getPerm(perm_name)
for p in self.get()['permissions']:
......@@ -73,8 +126,8 @@ class ContainerReference(object):
class FileReference(ContainerReference):
def __init__(self, type, id, name):
if type.endswith('s'):
raise Exception('Container type cannot be plural :|')
if type not in CONT_TYPES:
raise Exception('Container type must be one of {}'.format(CONT_TYPES))
self.type = type
self.id = id
......
......@@ -2,7 +2,7 @@ import bson.errors
import bson.objectid
from .. import config
from . import consistencychecker
from . import consistencychecker, containerutil
from . import APIStorageException, APIConflictException
log = config.log
......@@ -188,17 +188,19 @@ class AnalysesStorage(ListStorage):
def get_fileinfo(self, _id, analysis_id, filename = None):
_id = bson.ObjectId(_id)
query = [
{'$match': {'_id' : _id}},
{'$unwind': '$' + self.list_name},
{'$match': {self.list_name+ '._id' : analysis_id}},
{'$unwind': '$' + self.list_name + '.files'}
]
analysis = self._get_el(_id, {'_id': analysis_id})
if analysis is None:
raise APIStorageException('{} {} not found in {} {}.'.format(self.list_name, analysis_id, self.cont_name, _id))
analysis = containerutil.inflate_job_info(analysis)
files = analysis.get('files')
if files is None:
return None
if filename:
query.append(
{'$match': {self.list_name + '.files.name' : filename}}
)
return [cont['analyses'] for cont in self.dbc.aggregate(query)]
for f in files:
if f.get('name') == filename:
return [f]
else:
return files
def add_note(self, _id, analysis_id, payload):
_id = bson.ObjectId(_id)
......
......@@ -104,6 +104,9 @@ class ContainerHandler(base.RequestHandler):
if self.debug:
debuginfo.add_debuginfo(self, cont_name, result)
if cont_name == 'sessions':
result = self.handle_analyses(result)
return self.handle_origin(result)
def handle_origin(self, result):
......@@ -147,6 +150,19 @@ class ContainerHandler(base.RequestHandler):
return result
def handle_analyses(self, result):
"""
Given an object with an `analyses` array key, inflate job info for job-based analyses
"""
analyses = result.get('analyses')
if analyses is None:
return result
for a in analyses:
if a.get('job') is not None:
a = containerutil.inflate_job_info(a)
result['analyses'] = analyses
return result
def _filter_permissions(self, result, uid, site):
"""
if the user is not admin only her permissions are returned.
......@@ -269,6 +285,8 @@ class ContainerHandler(base.RequestHandler):
for result in results:
result = self.handle_origin(result)
if cont_name == 'sessions':
result = self.handle_analyses(result)
return results
......
......@@ -11,6 +11,7 @@ from .. import base
from .. import config
from .. import files
from ..jobs import rules
from ..jobs.jobs import Job
from .. import tempdir as tempfile
from .. import upload
from .. import download
......@@ -21,6 +22,8 @@ from ..dao import noop
from ..dao import liststorage
from ..dao import APIStorageException
from ..dao import hierarchy
from ..dao.containerutil import create_filereference_from_dictionary, create_containerreference_from_dictionary
log = config.log
......@@ -607,21 +610,221 @@ class AnalysesHandler(ListHandler):
def put(self, *args, **kwargs):
raise NotImplementedError("an analysis can't be modified")
def _default_analysis(self):
analysis_obj = {}
analysis_obj['_id'] = str(bson.objectid.ObjectId())
analysis_obj['created'] = datetime.datetime.utcnow()
analysis_obj['modified'] = datetime.datetime.utcnow()
analysis_obj['user'] = self.uid
return analysis_obj
def post(self, cont_name, list_name, **kwargs):
"""
.. http:post:: /api/(cont_name)/(cid)/analyses
Default behavior:
Creates an analysis object and uploads supplied input
and output files.
When param ``job`` is true:
Creates an analysis object and job object that reference
each other via ``job`` and ``destination`` fields. Job based
analyses are only allowed at the session level.
:param cont_name: one of ``projects``, ``sessions``, ``collections``
:type cont_name: string
:param cid: Container ID
:type cid: string
:query boolean job: a flag specifying the type of analysis
:statuscode 200: no error
:statuscode 400: Job-based analyses must be at the session level
:statuscode 400: Job-based analyses must have ``job`` and ``analysis`` maps in JSON body
**Example request**:
.. sourcecode:: http
POST /api/sessions/57081d06b386a6dc79ca383c/analyses HTTP/1.1
{
"analysis": {
"label": "Test Analysis 1"
},
"job" : {
"gear": "dcm_convert",
"inputs": {
"dicom": {
"type": "acquisition",
"id": "57081d06b386a6dc79ca386b",
"name" : "test_acquisition_dicom.zip"
}
},
"tags": ["example"]
}
}
**Example response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept-Encoding
Content-Type: application/json; charset=utf-8
{
"_id": "573cb66b135d87002660597c"
}
"""
_id = kwargs.pop('cid')
container, permchecker, storage, mongo_validator, _, keycheck = self._initialize_request(cont_name, list_name, _id)
permchecker(noop)('POST', _id=_id)
if self.is_true('job'):
if cont_name == 'sessions':
return self._create_job_and_analysis(cont_name, _id, storage)
else:
self.abort(400, 'Analysis created via a job must be at the session level')
payload = upload.process_upload(self.request, upload.Strategy.analysis, origin=self.origin)
payload['created'] = datetime.datetime.utcnow()
payload['user'] = payload.get('user', self.uid)
result = keycheck(mongo_validator(storage.exec_op))('POST', _id=_id, payload=payload)
analysis = self._default_analysis()
analysis.update(payload)
result = keycheck(mongo_validator(storage.exec_op))('POST', _id=_id, payload=analysis)
if result.modified_count == 1:
return {'_id': payload['_id']}
else:
self.abort(404, 'Element not added in list {} of container {} {}'.format(storage.list_name, storage.cont_name, _id))
self.abort(500, 'Element not added in list analyses of container {} {}'.format(cont_name, _id))
def _create_job_and_analysis(self, cont_name, cid, storage):
payload = self.request.json_body
analysis = payload.get('analysis')
job = payload.get('job')
if job is None or analysis is None:
self.abort(400, 'JSON body must contain map for "analysis" and "job"')
default = self._default_analysis()
analysis = default.update(analysis)
result = storage.exec_op('POST', _id=cid, payload=analysis)
if result.modified_count != 1:
self.abort(500, 'Element not added in list analyses of container {} {}'.format(cont_name, cid))
gear_name = job['gear']
# Translate maps to FileReferences
inputs = {}
for x in job['inputs'].keys():
input_map = job['inputs'][x]
inputs[x] = create_filereference_from_dictionary(input_map)
tags = job.get('tags', [])
if 'analysis' not in tags:
tags.append('analysis')
destination = create_containerreference_from_dictionary({'type': 'analysis', 'id': analysis['_id']})
job = Job(gear_name, inputs, destination=destination, tags=tags)
job_id = job.insert()
if not job_id:
self.abort(500, 'Job not created for analysis {} of container {} {}'.format(analysis['_id'], cont_name, cid))
result = storage.exec_op('PUT', _id=cid, query_params={'_id': analysis['_id']}, payload={'job': job_id})
return { '_id': analysis['_id']}
def download(self, cont_name, list_name, **kwargs):
"""
.. http:get:: /api/(cont_name)/(cid)/analyses/(analysis_id)/files/(file_name)
Download a file from an analysis or download a tar of all files
When no filename is provided, a tar of all input and output files is created.
The first request to this endpoint without a ticket ID generates a download ticket.
A request to this endpoint with a ticket ID downloads the file(s).
If the analysis object is tied to a job, the input file(s) are inlfated from
the job's ``input`` array.
:param cont_name: one of ``projects``, ``sessions``, ``collections``
:type cont_name: string
:param cid: Container ID
:type cid: string
:param analysis_id: Analysis ID
:type analysis_id: string
:param filename: (Optional) Filename of specific file to download
:type cid: string
:query string ticket: Download ticket ID
:statuscode 200: no error
:statuscode 404: No files on analysis ``analysis_id``
:statuscode 404: Could not find file ``filename`` on analysis ``analysis_id``
**Example request without ticket ID**:
.. sourcecode:: http
GET /api/sessions/57081d06b386a6dc79ca383c/analyses/5751cd3781460100a66405c8/files HTTP/1.1
Host: demo.flywheel.io
Accept: */*
**Response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept-Encoding
Content-Type: application/json; charset=utf-8
{
"ticket": "57f2af23-a94c-426d-8521-11b2e8782020",
"filename": "analysis_5751cd3781460100a66405c8.tar",
"file_cnt": 3,
"size": 4525137
}
**Example request with ticket ID**:
.. sourcecode:: http
GET /api/sessions/57081d06b386a6dc79ca383c/analyses/5751cd3781460100a66405c8/files?ticket=57f2af23-a94c-426d-8521-11b2e8782020 HTTP/1.1
Host: demo.flywheel.io
Accept: */*
**Response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept-Encoding
Content-Type: application/octet-stream
Content-Disposition: attachment; filename=analysis_5751cd3781460100a66405c8.tar;
**Example Request with filename**:
.. sourcecode:: http
GET /api/sessions/57081d06b386a6dc79ca383c/analyses/5751cd3781460100a66405c8/files/exampledicom.zip?ticket= HTTP/1.1
Host: demo.flywheel.io
Accept: */*
**Response**:
.. sourcecode:: http
HTTP/1.1 200 OK
Vary: Accept-Encoding
Content-Type: application/json; charset=utf-8
{
"ticket": "57f2af23-a94c-426d-8521-11b2e8782020",
"filename": "exampledicom.zip",
"file_cnt": 1,
"size": 4525137
}
"""
_id = kwargs.pop('cid')
container, permchecker, storage, _, _, _ = self._initialize_request(cont_name, list_name, _id)
filename = kwargs.get('name')
......@@ -629,7 +832,12 @@ class AnalysesHandler(ListHandler):
if not ticket_id:
permchecker(noop)('GET', _id=_id)
analysis_id = kwargs.get('_id')
fileinfo = [f['files'] for f in storage.get_fileinfo(_id, analysis_id, filename)]
fileinfo = storage.get_fileinfo(_id, analysis_id, filename)
if fileinfo is None:
error_msg = 'No files on analysis {}'.format(analysis_id)
if filename:
error_msg = 'Could not find file {} on analysis {}'.format(filename, analysis_id)
self.abort(404, error_msg)
if not ticket_id:
if filename:
total_size = fileinfo[0]['size']
......
......@@ -504,5 +504,21 @@ class AnalysisPlacer(Placer):
metadata_info = metadata_infos.get(info['name'], {})
metadata_info.update(info)
self.metadata['files'].append(metadata_info)
self.metadata['_id'] = str(bson.objectid.ObjectId())
return self.metadata
class AnalysisJobPlacer(AnalysisPlacer):
def check(self):
super(AnalysisJobPlacer, self).check()
self.metadata['outputs'] = self.metadata['acquisition'].pop('files', [])
def finalize(self):
super(AnalysisJobPlacer, self).finalize()
# Search the sessions table for analysis, replace file field
if self.metadata.get('files'):
q = {'analyses._id': str(self.id)}
u = {'$set': {'analyses.$.files': self.metadata['files']}}
if self.context.get('job_id'):
# If the original job failed, update the analysis with the job that succeeded
u['$set']['job'] = self.context['job_id']
config.db.sessions.update_one(q, u)
......@@ -17,13 +17,14 @@ from .dao import hierarchy, APIStorageException
log = config.log
Strategy = util.Enum('Strategy', {
'targeted' : pl.TargetedPlacer, # Upload N files to a container.
'engine' : pl.EnginePlacer, # Upload N files from the result of a successful job.
'token' : pl.TokenPlacer, # Upload N files to a saved folder based on a token.
'packfile' : pl.PackfilePlacer, # Upload N files as a new packfile to a container.
'labelupload': pl.LabelPlacer,
'uidupload' : pl.UIDPlacer,
'analysis' : pl.AnalysisPlacer # Upload N files for an analysys (no db updates)
'targeted' : pl.TargetedPlacer, # Upload N files to a container.
'engine' : pl.EnginePlacer, # Upload N files from the result of a successful job.
'token' : pl.TokenPlacer, # Upload N files to a saved folder based on a token.
'packfile' : pl.PackfilePlacer, # Upload N files as a new packfile to a container.
'labelupload' : pl.LabelPlacer,
'uidupload' : pl.UIDPlacer,
'analysis' : pl.AnalysisPlacer, # Upload N files to an analysis as input and output (no db updates)
'analysis_job': pl.AnalysisJobPlacer # Upload N files to an analysis as output from job results
})
def process_upload(request, strategy, container_type=None, id=None, origin=None, context=None, response=None, metadata=None):
......@@ -61,7 +62,7 @@ def process_upload(request, strategy, container_type=None, id=None, origin=None,
if id is not None and container_type == None:
raise Exception('Unspecified container type')
if container_type is not None and container_type not in ('acquisition', 'session', 'project', 'collection'):
if container_type is not None and container_type not in ('acquisition', 'session', 'project', 'collection', 'analysis'):
raise Exception('Unknown container type')
timestamp = datetime.datetime.utcnow()
......@@ -224,14 +225,18 @@ class Upload(base.RequestHandler):
if level is None:
self.abort(404, 'container level is required')
if level != 'acquisition':
self.abort(404, 'engine uploads are supported only at the acquisition level')
acquisition_id = self.get_param('id')
if not acquisition_id:
cont_id = self.get_param('id')
if not cont_id:
self.abort(404, 'container id is required')
else:
acquisition_id = bson.ObjectId(acquisition_id)
cont_id = bson.ObjectId(cont_id)
if level not in ['acquisition', 'analysis']:
self.abort(404, 'engine uploads are supported only at the acquisition or analysis level')
if level == 'analysis':
context = {'job_id': self.get_param('job')}
return process_upload(self.request, Strategy.analysis_job, origin=self.origin, container_type=level, id=cont_id, context=context)
if not self.superuser_request:
self.abort(402, 'uploads must be from an authorized drone')
with tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) as tempdir_path:
......@@ -247,7 +252,7 @@ class Upload(base.RequestHandler):
file_infos = file_store.metadata['acquisition'].pop('files', [])
now = datetime.datetime.utcnow()
try:
acquisition_obj = hierarchy.update_container_hierarchy(file_store.metadata, acquisition_id, level)
acquisition_obj = hierarchy.update_container_hierarchy(file_store.metadata, cont_id, level)
except APIStorageException as e:
self.abort(400, e.message)
# move the files before updating the database
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment