diff --git a/api/dao/containerutil.py b/api/dao/containerutil.py index baab48416eb28ecbc4a0c8b931eaba56243fdd29..aca14b11fbc46776d00378850efb3024866ce2cf 100644 --- a/api/dao/containerutil.py +++ b/api/dao/containerutil.py @@ -3,6 +3,11 @@ import bson.objectid from .. import config from ..auth import INTEGER_ROLES +CONT_TYPES = ['acquisition', 'analysis', 'collection', 'group', 'project', 'session'] + +def getPerm(name): + return INTEGER_ROLES[name] + def add_id_to_subject(subject, pid): """ Add a mongo id field to given subject object (dict) @@ -31,14 +36,55 @@ def add_id_to_subject(subject, pid): subject['_id'] = bson.ObjectId() return subject +def inflate_job_info(analysis): + """ + Inflate job from id ref in analysis + + Lookup job via id stored on analysis + Lookup input filerefs and inflate into files array with 'input': True + If job is in failed state, look for most recent job referencing this analysis + Update analysis if new job is found + """ + if analysis.get('job') is None: + return analysis + job = config.db.jobs.find_one({'_id': analysis['job']}) + if job is None: + raise Exception('No job with id {} found.'.format(analysis['job'])) + + # If the job currently tied to the analysis failed, try to find one that didn't + while job.get('state') == 'failed' and job.get('_id') is not None: + next_job = config.db.jobs.find_one({'previous_job_id': job['_id']}) + if next_job is None: + break + job = next_job + if job['_id'] != analysis['job']: + # Update analysis if job has changed + q = {'analyses._id': analysis['_id']} + u = {'$set': {'analyses.$.job': job['_id']}} + config.db.sessions.update_one(q, u) + analysis['job'] = job + + # Inflate files from job inputs, add to analysis file array + files = analysis.get('files', []) + inputs = job.get('inputs', {}) + for i in inputs.keys(): + fileref = create_filereference_from_dictionary(inputs[i]) + contref = create_containerreference_from_filereference(fileref) + file_ = contref.find_file(fileref.name) + if file_: + file_['input'] = True + files.append(file_) + + analysis['files'] = files + return analysis + + -def getPerm(name): - return INTEGER_ROLES[name] class ContainerReference(object): def __init__(self, type, id): - if type.endswith('s'): - raise Exception('Container type cannot be plural :|') + if type not in CONT_TYPES: + raise Exception('Container type must be one of {}'.format(CONT_TYPES)) self.type = type self.id = id @@ -63,6 +109,13 @@ class ContainerReference(object): raise Exception("No such " + self.type + " " + self.id + " in database") return result + def find_file(self, filename): + cont = self.get() + for f in cont.get('files', []): + if f['name'] == filename: + return f + return None + def check_access(self, userID, perm_name): perm = getPerm(perm_name) for p in self.get()['permissions']: @@ -73,8 +126,8 @@ class ContainerReference(object): class FileReference(ContainerReference): def __init__(self, type, id, name): - if type.endswith('s'): - raise Exception('Container type cannot be plural :|') + if type not in CONT_TYPES: + raise Exception('Container type must be one of {}'.format(CONT_TYPES)) self.type = type self.id = id diff --git a/api/dao/liststorage.py b/api/dao/liststorage.py index 266b3274aa1894d0550a9c319a7bfe0b200397af..50aa8b34278b2a0ad83e369ba315920120cffcfa 100644 --- a/api/dao/liststorage.py +++ b/api/dao/liststorage.py @@ -2,7 +2,7 @@ import bson.errors import bson.objectid from .. import config -from . import consistencychecker +from . import consistencychecker, containerutil from . import APIStorageException, APIConflictException log = config.log @@ -188,17 +188,19 @@ class AnalysesStorage(ListStorage): def get_fileinfo(self, _id, analysis_id, filename = None): _id = bson.ObjectId(_id) - query = [ - {'$match': {'_id' : _id}}, - {'$unwind': '$' + self.list_name}, - {'$match': {self.list_name+ '._id' : analysis_id}}, - {'$unwind': '$' + self.list_name + '.files'} - ] + analysis = self._get_el(_id, {'_id': analysis_id}) + if analysis is None: + raise APIStorageException('{} {} not found in {} {}.'.format(self.list_name, analysis_id, self.cont_name, _id)) + analysis = containerutil.inflate_job_info(analysis) + files = analysis.get('files') + if files is None: + return None if filename: - query.append( - {'$match': {self.list_name + '.files.name' : filename}} - ) - return [cont['analyses'] for cont in self.dbc.aggregate(query)] + for f in files: + if f.get('name') == filename: + return [f] + else: + return files def add_note(self, _id, analysis_id, payload): _id = bson.ObjectId(_id) diff --git a/api/handlers/containerhandler.py b/api/handlers/containerhandler.py index 6834345365190768f3e799da2026d3fe7cb4037b..e08deec3d408dc409ce9079e477a813b7cbea419 100644 --- a/api/handlers/containerhandler.py +++ b/api/handlers/containerhandler.py @@ -104,6 +104,9 @@ class ContainerHandler(base.RequestHandler): if self.debug: debuginfo.add_debuginfo(self, cont_name, result) + if cont_name == 'sessions': + result = self.handle_analyses(result) + return self.handle_origin(result) def handle_origin(self, result): @@ -147,6 +150,19 @@ class ContainerHandler(base.RequestHandler): return result + def handle_analyses(self, result): + """ + Given an object with an `analyses` array key, inflate job info for job-based analyses + """ + analyses = result.get('analyses') + if analyses is None: + return result + for a in analyses: + if a.get('job') is not None: + a = containerutil.inflate_job_info(a) + result['analyses'] = analyses + return result + def _filter_permissions(self, result, uid, site): """ if the user is not admin only her permissions are returned. @@ -269,6 +285,8 @@ class ContainerHandler(base.RequestHandler): for result in results: result = self.handle_origin(result) + if cont_name == 'sessions': + result = self.handle_analyses(result) return results diff --git a/api/handlers/listhandler.py b/api/handlers/listhandler.py index 254f9251b3ac721b5d34b1eefcba833e910b663d..63c7db102968063d890c8c541849fe7fa0a9461f 100644 --- a/api/handlers/listhandler.py +++ b/api/handlers/listhandler.py @@ -11,6 +11,7 @@ from .. import base from .. import config from .. import files from ..jobs import rules +from ..jobs.jobs import Job from .. import tempdir as tempfile from .. import upload from .. import download @@ -21,6 +22,8 @@ from ..dao import noop from ..dao import liststorage from ..dao import APIStorageException from ..dao import hierarchy +from ..dao.containerutil import create_filereference_from_dictionary, create_containerreference_from_dictionary + log = config.log @@ -607,21 +610,221 @@ class AnalysesHandler(ListHandler): def put(self, *args, **kwargs): raise NotImplementedError("an analysis can't be modified") + def _default_analysis(self): + analysis_obj = {} + analysis_obj['_id'] = str(bson.objectid.ObjectId()) + analysis_obj['created'] = datetime.datetime.utcnow() + analysis_obj['modified'] = datetime.datetime.utcnow() + analysis_obj['user'] = self.uid + return analysis_obj + def post(self, cont_name, list_name, **kwargs): + """ + .. http:post:: /api/(cont_name)/(cid)/analyses + + Default behavior: + Creates an analysis object and uploads supplied input + and output files. + When param ``job`` is true: + Creates an analysis object and job object that reference + each other via ``job`` and ``destination`` fields. Job based + analyses are only allowed at the session level. + + :param cont_name: one of ``projects``, ``sessions``, ``collections`` + :type cont_name: string + + :param cid: Container ID + :type cid: string + + :query boolean job: a flag specifying the type of analysis + + :statuscode 200: no error + :statuscode 400: Job-based analyses must be at the session level + :statuscode 400: Job-based analyses must have ``job`` and ``analysis`` maps in JSON body + + **Example request**: + + .. sourcecode:: http + + POST /api/sessions/57081d06b386a6dc79ca383c/analyses HTTP/1.1 + + { + "analysis": { + "label": "Test Analysis 1" + }, + "job" : { + "gear": "dcm_convert", + "inputs": { + "dicom": { + "type": "acquisition", + "id": "57081d06b386a6dc79ca386b", + "name" : "test_acquisition_dicom.zip" + } + }, + "tags": ["example"] + } + } + + **Example response**: + + .. sourcecode:: http + + HTTP/1.1 200 OK + Vary: Accept-Encoding + Content-Type: application/json; charset=utf-8 + { + "_id": "573cb66b135d87002660597c" + } + + """ _id = kwargs.pop('cid') container, permchecker, storage, mongo_validator, _, keycheck = self._initialize_request(cont_name, list_name, _id) - permchecker(noop)('POST', _id=_id) + + if self.is_true('job'): + if cont_name == 'sessions': + return self._create_job_and_analysis(cont_name, _id, storage) + else: + self.abort(400, 'Analysis created via a job must be at the session level') + payload = upload.process_upload(self.request, upload.Strategy.analysis, origin=self.origin) - payload['created'] = datetime.datetime.utcnow() - payload['user'] = payload.get('user', self.uid) - result = keycheck(mongo_validator(storage.exec_op))('POST', _id=_id, payload=payload) + analysis = self._default_analysis() + analysis.update(payload) + result = keycheck(mongo_validator(storage.exec_op))('POST', _id=_id, payload=analysis) if result.modified_count == 1: return {'_id': payload['_id']} else: - self.abort(404, 'Element not added in list {} of container {} {}'.format(storage.list_name, storage.cont_name, _id)) + self.abort(500, 'Element not added in list analyses of container {} {}'.format(cont_name, _id)) + + def _create_job_and_analysis(self, cont_name, cid, storage): + payload = self.request.json_body + analysis = payload.get('analysis') + job = payload.get('job') + if job is None or analysis is None: + self.abort(400, 'JSON body must contain map for "analysis" and "job"') + + default = self._default_analysis() + analysis = default.update(analysis) + result = storage.exec_op('POST', _id=cid, payload=analysis) + if result.modified_count != 1: + self.abort(500, 'Element not added in list analyses of container {} {}'.format(cont_name, cid)) + + gear_name = job['gear'] + # Translate maps to FileReferences + inputs = {} + for x in job['inputs'].keys(): + input_map = job['inputs'][x] + inputs[x] = create_filereference_from_dictionary(input_map) + tags = job.get('tags', []) + if 'analysis' not in tags: + tags.append('analysis') + + destination = create_containerreference_from_dictionary({'type': 'analysis', 'id': analysis['_id']}) + job = Job(gear_name, inputs, destination=destination, tags=tags) + job_id = job.insert() + if not job_id: + self.abort(500, 'Job not created for analysis {} of container {} {}'.format(analysis['_id'], cont_name, cid)) + result = storage.exec_op('PUT', _id=cid, query_params={'_id': analysis['_id']}, payload={'job': job_id}) + return { '_id': analysis['_id']} + + def download(self, cont_name, list_name, **kwargs): + """ + .. http:get:: /api/(cont_name)/(cid)/analyses/(analysis_id)/files/(file_name) + + Download a file from an analysis or download a tar of all files + + When no filename is provided, a tar of all input and output files is created. + The first request to this endpoint without a ticket ID generates a download ticket. + A request to this endpoint with a ticket ID downloads the file(s). + If the analysis object is tied to a job, the input file(s) are inlfated from + the job's ``input`` array. + + :param cont_name: one of ``projects``, ``sessions``, ``collections`` + :type cont_name: string + + :param cid: Container ID + :type cid: string + + :param analysis_id: Analysis ID + :type analysis_id: string + + :param filename: (Optional) Filename of specific file to download + :type cid: string + + :query string ticket: Download ticket ID + + :statuscode 200: no error + :statuscode 404: No files on analysis ``analysis_id`` + :statuscode 404: Could not find file ``filename`` on analysis ``analysis_id`` + + **Example request without ticket ID**: + + .. sourcecode:: http + + GET /api/sessions/57081d06b386a6dc79ca383c/analyses/5751cd3781460100a66405c8/files HTTP/1.1 + Host: demo.flywheel.io + Accept: */* + + + **Response**: + + .. sourcecode:: http + + HTTP/1.1 200 OK + Vary: Accept-Encoding + Content-Type: application/json; charset=utf-8 + { + "ticket": "57f2af23-a94c-426d-8521-11b2e8782020", + "filename": "analysis_5751cd3781460100a66405c8.tar", + "file_cnt": 3, + "size": 4525137 + } + + **Example request with ticket ID**: + + .. sourcecode:: http + + GET /api/sessions/57081d06b386a6dc79ca383c/analyses/5751cd3781460100a66405c8/files?ticket=57f2af23-a94c-426d-8521-11b2e8782020 HTTP/1.1 + Host: demo.flywheel.io + Accept: */* + + + **Response**: + + .. sourcecode:: http + + HTTP/1.1 200 OK + Vary: Accept-Encoding + Content-Type: application/octet-stream + Content-Disposition: attachment; filename=analysis_5751cd3781460100a66405c8.tar; + + **Example Request with filename**: + + .. sourcecode:: http + + GET /api/sessions/57081d06b386a6dc79ca383c/analyses/5751cd3781460100a66405c8/files/exampledicom.zip?ticket= HTTP/1.1 + Host: demo.flywheel.io + Accept: */* + + + **Response**: + + .. sourcecode:: http + + HTTP/1.1 200 OK + Vary: Accept-Encoding + Content-Type: application/json; charset=utf-8 + { + "ticket": "57f2af23-a94c-426d-8521-11b2e8782020", + "filename": "exampledicom.zip", + "file_cnt": 1, + "size": 4525137 + } + + + """ _id = kwargs.pop('cid') container, permchecker, storage, _, _, _ = self._initialize_request(cont_name, list_name, _id) filename = kwargs.get('name') @@ -629,7 +832,12 @@ class AnalysesHandler(ListHandler): if not ticket_id: permchecker(noop)('GET', _id=_id) analysis_id = kwargs.get('_id') - fileinfo = [f['files'] for f in storage.get_fileinfo(_id, analysis_id, filename)] + fileinfo = storage.get_fileinfo(_id, analysis_id, filename) + if fileinfo is None: + error_msg = 'No files on analysis {}'.format(analysis_id) + if filename: + error_msg = 'Could not find file {} on analysis {}'.format(filename, analysis_id) + self.abort(404, error_msg) if not ticket_id: if filename: total_size = fileinfo[0]['size'] diff --git a/api/placer.py b/api/placer.py index 924300c59426906fed483f2747d9073565d7e2ee..bd4264d0713679584d81abb9b10ac83d594c8fe1 100644 --- a/api/placer.py +++ b/api/placer.py @@ -504,5 +504,21 @@ class AnalysisPlacer(Placer): metadata_info = metadata_infos.get(info['name'], {}) metadata_info.update(info) self.metadata['files'].append(metadata_info) - self.metadata['_id'] = str(bson.objectid.ObjectId()) return self.metadata + +class AnalysisJobPlacer(AnalysisPlacer): + def check(self): + super(AnalysisJobPlacer, self).check() + self.metadata['outputs'] = self.metadata['acquisition'].pop('files', []) + + def finalize(self): + super(AnalysisJobPlacer, self).finalize() + # Search the sessions table for analysis, replace file field + if self.metadata.get('files'): + q = {'analyses._id': str(self.id)} + u = {'$set': {'analyses.$.files': self.metadata['files']}} + if self.context.get('job_id'): + # If the original job failed, update the analysis with the job that succeeded + u['$set']['job'] = self.context['job_id'] + config.db.sessions.update_one(q, u) + diff --git a/api/upload.py b/api/upload.py index b3e3071d60fc044d92e62968a2496a62772b9557..2d324f2a7e4f35df476a5731e157c237d6c605ec 100644 --- a/api/upload.py +++ b/api/upload.py @@ -17,13 +17,14 @@ from .dao import hierarchy, APIStorageException log = config.log Strategy = util.Enum('Strategy', { - 'targeted' : pl.TargetedPlacer, # Upload N files to a container. - 'engine' : pl.EnginePlacer, # Upload N files from the result of a successful job. - 'token' : pl.TokenPlacer, # Upload N files to a saved folder based on a token. - 'packfile' : pl.PackfilePlacer, # Upload N files as a new packfile to a container. - 'labelupload': pl.LabelPlacer, - 'uidupload' : pl.UIDPlacer, - 'analysis' : pl.AnalysisPlacer # Upload N files for an analysys (no db updates) + 'targeted' : pl.TargetedPlacer, # Upload N files to a container. + 'engine' : pl.EnginePlacer, # Upload N files from the result of a successful job. + 'token' : pl.TokenPlacer, # Upload N files to a saved folder based on a token. + 'packfile' : pl.PackfilePlacer, # Upload N files as a new packfile to a container. + 'labelupload' : pl.LabelPlacer, + 'uidupload' : pl.UIDPlacer, + 'analysis' : pl.AnalysisPlacer, # Upload N files to an analysis as input and output (no db updates) + 'analysis_job': pl.AnalysisJobPlacer # Upload N files to an analysis as output from job results }) def process_upload(request, strategy, container_type=None, id=None, origin=None, context=None, response=None, metadata=None): @@ -61,7 +62,7 @@ def process_upload(request, strategy, container_type=None, id=None, origin=None, if id is not None and container_type == None: raise Exception('Unspecified container type') - if container_type is not None and container_type not in ('acquisition', 'session', 'project', 'collection'): + if container_type is not None and container_type not in ('acquisition', 'session', 'project', 'collection', 'analysis'): raise Exception('Unknown container type') timestamp = datetime.datetime.utcnow() @@ -224,14 +225,18 @@ class Upload(base.RequestHandler): if level is None: self.abort(404, 'container level is required') - if level != 'acquisition': - self.abort(404, 'engine uploads are supported only at the acquisition level') - - acquisition_id = self.get_param('id') - if not acquisition_id: + cont_id = self.get_param('id') + if not cont_id: self.abort(404, 'container id is required') else: - acquisition_id = bson.ObjectId(acquisition_id) + cont_id = bson.ObjectId(cont_id) + if level not in ['acquisition', 'analysis']: + self.abort(404, 'engine uploads are supported only at the acquisition or analysis level') + + if level == 'analysis': + context = {'job_id': self.get_param('job')} + return process_upload(self.request, Strategy.analysis_job, origin=self.origin, container_type=level, id=cont_id, context=context) + if not self.superuser_request: self.abort(402, 'uploads must be from an authorized drone') with tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) as tempdir_path: @@ -247,7 +252,7 @@ class Upload(base.RequestHandler): file_infos = file_store.metadata['acquisition'].pop('files', []) now = datetime.datetime.utcnow() try: - acquisition_obj = hierarchy.update_container_hierarchy(file_store.metadata, acquisition_id, level) + acquisition_obj = hierarchy.update_container_hierarchy(file_store.metadata, cont_id, level) except APIStorageException as e: self.abort(400, e.message) # move the files before updating the database