diff --git a/api/dao/containerstorage.py b/api/dao/containerstorage.py index 4a1aaa22879adb4708ef29539375213651249f30..68b4a3f6473dd1dc0cbad970c4152335b00184bf 100644 --- a/api/dao/containerstorage.py +++ b/api/dao/containerstorage.py @@ -10,6 +10,12 @@ from . import hierarchy log = config.log +# TODO: Find a better place to put this until OOP where we can just call cont.children +CHILD_MAP = { + 'groups': 'projects', + 'projects': 'sessions', + 'sessions': 'acquisitions' +} class ContainerStorage(object): """ @@ -23,63 +29,59 @@ class ContainerStorage(object): self.use_object_id = use_object_id self.dbc = config.db[cont_name] - def get_container(self, _id, projection=None, get_children=False): - cont = self._get_el(_id, projection=projection) - - if get_children: - child_map = { - 'groups': 'projects', - 'projects': 'sessions', - 'sessions': 'acquisitions' - } - - child_name = child_map.get(self.cont_name) - if not child_name: - raise ValueError('Children can only be listed from group, project or session level') - else: - query = {self.cont_name[:-1]: bson.objectid.ObjectId(_id)} - cont[child_name] = ContainerStorage(child_name, True).exec_op('GET', query=query, projection=projection) - - return cont - def set_fields(self, _id, payload): + @staticmethod + def factory(cont_name, use_object_id = False): """ - Doesn't use util.mongo_dict. Directly sets the fields in payload + Factory method to aid in the creation of a ContainerStorage when cont_name is dynamic. """ + if cont_name == 'groups': + return GroupStorage(cont_name, use_object_id) + elif cont_name == 'sessions': + return SessionStorage() + elif cont_name == 'acquisitions': + return AcquisitionStorage() + else: + return ContainerStorage(cont_name, use_object_id) - update = {'$set': util.mongo_sanitize_fields(payload)} - if self.use_object_id: - try: - _id = bson.objectid.ObjectId(_id) - except bson.errors.InvalidId as e: - raise APIStorageException(e.message) - return self.dbc.update_one({'_id': _id}, update) + def get_container(self, _id, projection=None, get_children=False): + cont = self.get_el(_id, projection=projection) + if get_children: + children = self.get_children(_id, projection=projection) + cont[CHILD_MAP.get(self.cont_name)] = children + return cont + + def get_children(self, _id, projection=None): + child_name = CHILD_MAP.get(self.cont_name) + if not child_name: + raise ValueError('Children can only be listed from group, project or session level') + query = {self.cont_name[:-1]: bson.objectid.ObjectId(_id)} + return self.factory(child_name, True).get_all_el(query, None, projection) def exec_op(self, action, _id=None, payload=None, query=None, user=None, public=False, projection=None, recursive=False, r_payload=None, # pylint: disable=unused-argument replace_metadata=False): """ - Generic method to exec an operation. - The request is dispatched to the corresponding private methods. + Generic method to exec a CRUD operation from a REST verb. """ check = consistencychecker.get_container_storage_checker(action, self.cont_name) data_op = payload or {'_id': _id} check(data_op) if action == 'GET' and _id: - return self._get_el(_id, projection) + return self.get_el(_id, projection) if action == 'GET': - return self._get_all_el(query, user, projection) + return self.get_all_el(query, user, projection) if action == 'DELETE': - return self._delete_el(_id) + return self.delete_el(_id) if action == 'PUT': - return self._update_el(_id, payload, recursive, r_payload, replace_metadata) + return self.update_el(_id, payload, recursive, r_payload, replace_metadata) if action == 'POST': - return self._create_el(payload) + return self.create_el(payload) raise ValueError('action should be one of GET, POST, PUT, DELETE') - def _create_el(self, payload): + def create_el(self, payload): log.debug(payload) try: result = self.dbc.insert_one(payload) @@ -87,7 +89,7 @@ class ContainerStorage(object): raise APIConflictException('Object with id {} already exists.'.format(payload['_id'])) return result - def _update_el(self, _id, payload, recursive=False, r_payload=None, replace_metadata=False): + def update_el(self, _id, payload, recursive=False, r_payload=None, replace_metadata=False): replace = None if replace_metadata: replace = {} @@ -109,9 +111,9 @@ class ContainerStorage(object): raise APIStorageException(e.message) if recursive and r_payload is not None: hierarchy.propagate_changes(self.cont_name, _id, {}, {'$set': util.mongo_dict(r_payload)}) - return self.dbc.update_one({'_id': _id}, update) + self.dbc.update_one({'_id': _id}, update) - def _delete_el(self, _id): + def delete_el(self, _id): if self.use_object_id: try: _id = bson.objectid.ObjectId(_id) @@ -119,7 +121,7 @@ class ContainerStorage(object): raise APIStorageException(e.message) return self.dbc.delete_one({'_id':_id}) - def _get_el(self, _id, projection=None): + def get_el(self, _id, projection=None): if self.use_object_id: try: _id = bson.objectid.ObjectId(_id) @@ -127,7 +129,7 @@ class ContainerStorage(object): raise APIStorageException(e.message) return self.dbc.find_one(_id, projection) - def _get_all_el(self, query, user, projection): + def get_all_el(self, query, user, projection): if user: if query.get('permissions'): query['$and'] = [{'permissions': {'$elemMatch': user}}, {'permissions': query.pop('permissions')}] @@ -140,7 +142,10 @@ class ContainerStorage(object): class GroupStorage(ContainerStorage): - def _create_el(self, payload): + def __init__(self): + super(GroupStorage,self).__init__('groups', use_object_id=False) + + def create_el(self, payload): log.debug(payload) roles = payload.pop('roles') return self.dbc.update_one( @@ -150,3 +155,51 @@ class GroupStorage(ContainerStorage): '$setOnInsert': {'roles': roles} }, upsert=True) + +class SessionStorage(ContainerStorage): + + def __init__(self): + super(SessionStorage,self).__init__('sessions', use_object_id=True) + + def create_el(self, payload): + project = ContainerStorage('projects', use_object_id=True).get_container(payload['project']) + if project.get('template'): + payload['project_has_template'] = True + payload['satisfies_template'] = hierarchy.is_session_compliant(payload, project.get('template')) + return super(SessionStorage, self).create_el(payload) + + def update_el(self, _id, payload, recursive=False, r_payload=None, replace_metadata=False): + session = self.get_container(_id) + if session.get('project_has_template'): + project = ContainerStorage('projects', use_object_id=True).get_container(payload['project']) + session.update(payload) + payload['satisfies_template'] = hierarchy.is_session_compliant(session, project.get('template')) + return super(SessionStorage, self).update_el(_id, payload, recursive, r_payload, replace_metadata) + + def recalc_session_compliance(self, session_id, session=None, template=None): + if session is None: + session = self.get_container(session_id) + if session.get('project_has_template'): + if template is None: + template = ContainerStorage('projects', use_object_id=True).get_container(session['project']).get('template') + satisfies_template = hierarchy.is_session_compliant(session, template) + if session.get('satisfies_template') != satisfies_template: + update = {'satisfies_template': satisfies_template} + super(SessionStorage, self).update_el(session_id, update) + +class AcquisitionStorage(ContainerStorage): + + def __init__(self): + super(AcquisitionStorage,self).__init__('acquisitions', use_object_id=True) + + def create_el(self, payload): + result = super(AcquisitionStorage, self)._create_el(payload) + SessionStorage().recalc_session_compliance(payload['session']) + return result + + def update_el(self, _id, payload, recursive=False, r_payload=None, replace_metadata=False): + result = super(AcquisitionStorage, self)._update_el(_id, payload, recursive, r_payload, replace_metadata) + acquisition = self.get_container(_id) + SessionStorage().recalc_session_compliance(acquisition['session']) + return result + diff --git a/api/dao/hierarchy.py b/api/dao/hierarchy.py index a13eb93d8bde776951faba9d052edd337a11e0ee..71cdcc1e1c6f55861f8487a21e91407c44d90aff 100644 --- a/api/dao/hierarchy.py +++ b/api/dao/hierarchy.py @@ -3,6 +3,7 @@ import copy import datetime import dateutil.parser import difflib +from jsonschema import Draft4Validator, ValidationError import pymongo import re @@ -112,6 +113,58 @@ def propagate_changes(cont_name, _id, query, update): else: raise ValueError('changes can only be propagated from group, project or session level') +def is_session_compliant(session, template): + """ + Given a project-level session template and a session, + returns True/False if the session is in compliance with the template + """ + s_requirements = template.get('session') + a_requirements = template.get('acquisitions') + #f_requirements = template.get('files') + + if s_requirements: + validator = Draft4Validator(s_requirements.get('schema')) + try: + validator.validate(session) + except ValidationError: + return False + + if a_requirements: + acquisitions = config.db.acquisitions.find({'session': session['_id']}) + for req in a_requirements: + validator = Draft4Validator(req.get('schema')) + min_count = req.get('minimum') + count = 0 + while count < min_count: + for a in acquisitions: + try: + validator.validate(a) + except ValidationError: + continue + else: + count += 1 + if count < min_count: + return False + + # if f_requirements: + # files = [] #This is where we'd get acquisitions + # for req in f_requirements: + # validator = Draft4Validator(req.get('schema')) + # min_count = req.get('minimum') + # count = 0 + # while count < min_count: + # for f in files: + # try: + # validator.validate(f) + # except ValidationError as err: + # continue + # else: + # count += 1 + # if count < min_count: + # return False + + return True + def upsert_fileinfo(cont_name, _id, fileinfo): # TODO: make all functions take singular noun cont_name += 's' diff --git a/api/dao/liststorage.py b/api/dao/liststorage.py index 17bb33787978cd02a7382c43da9ff71329314593..d91b2db4ba74a87794bd60e610ef479d4db94092 100644 --- a/api/dao/liststorage.py +++ b/api/dao/liststorage.py @@ -4,6 +4,7 @@ import bson.objectid from .. import config from . import consistencychecker, containerutil from . import APIStorageException, APIConflictException +from .containerstorage import SessionStorage, AcquisitionStorage from ..jobs.jobs import Job log = config.log @@ -105,7 +106,11 @@ class ListStorage(object): update = {'$pull': {self.list_name: query_params} } log.debug('query {}'.format(query)) log.debug('update {}'.format(update)) - return self.dbc.update_one(query, update) + result = self.dbc.update_one(query, update) + if self.list_name is 'files' and self.cont_name in ['sessions', 'acquisitions']: + session_id = _id if self.cont_name == 'sessions' else AcquisitionStorage().get_container(_id).get('session') + SessionStorage().recalc_session_compliance(session_id) + return result def _get_el(self, _id, query_params): log.debug('query_params {}'.format(query_params)) diff --git a/api/handlers/containerhandler.py b/api/handlers/containerhandler.py index 3a6dd7727b8d4e9203da412fb32e32bae282d016..b94afcd939672d9b960dd63763a2f988c3f4a044 100644 --- a/api/handlers/containerhandler.py +++ b/api/handlers/containerhandler.py @@ -1,6 +1,7 @@ import bson import datetime import dateutil +import json from .. import base from .. import util @@ -51,7 +52,7 @@ class ContainerHandler(base.RequestHandler): 'projects': { 'storage': containerstorage.ContainerStorage('projects', use_object_id=use_object_id['projects']), 'permchecker': containerauth.default_container, - 'parent_storage': containerstorage.ContainerStorage('groups', use_object_id=use_object_id['groups']), + 'parent_storage': containerstorage.GroupStorage(), 'storage_schema_file': 'project.json', 'payload_schema_file': 'project.json', 'list_projection': {'metadata': 0}, @@ -59,7 +60,7 @@ class ContainerHandler(base.RequestHandler): 'children_cont': 'sessions' }, 'sessions': { - 'storage': containerstorage.ContainerStorage('sessions', use_object_id=use_object_id['sessions']), + 'storage': containerstorage.SessionStorage(), 'permchecker': containerauth.default_container, 'parent_storage': containerstorage.ContainerStorage('projects', use_object_id=use_object_id['projects']), 'storage_schema_file': 'session.json', @@ -69,9 +70,9 @@ class ContainerHandler(base.RequestHandler): 'children_cont': 'acquisitions' }, 'acquisitions': { - 'storage': containerstorage.ContainerStorage('acquisitions', use_object_id=use_object_id['acquisitions']), + 'storage': containerstorage.AcquisitionStorage(), 'permchecker': containerauth.default_container, - 'parent_storage': containerstorage.ContainerStorage('sessions', use_object_id=use_object_id['sessions']), + 'parent_storage': containerstorage.SessionStorage(), 'storage_schema_file': 'acquisition.json', 'payload_schema_file': 'acquisition.json', 'list_projection': {'metadata': 0} @@ -484,23 +485,23 @@ class ContainerHandler(base.RequestHandler): def set_project_template(self, **kwargs): project_id = kwargs.pop('cid') + log.debug('the project_id is {}'.format(project_id)) self.config = self.container_handler_configurations['projects'] self.storage = self.config['storage'] container = self._get_container(project_id) template = self.request.json_body validators.validate_data(template, 'project-template.json', 'input', 'POST') - payload = {'template': template} + payload = {'template': json.dumps(template)} payload['modified'] = datetime.datetime.utcnow() permchecker = self._get_permchecker(container) - result = permchecker(noop)('PUT', project_id, payload=payload) - result = self.storage.set_fields(project_id, payload) + result = permchecker(self.storage.exec_op)('PUT', _id=project_id, payload=payload) if result.modified_count == 1: return {'modified': result.modified_count} else: - self.abort(404, 'Element not updated in container {} {}'.format(self.storage.cont_name, _id)) + self.abort(404, 'Could not find project {}'.format(project_id)) def _get_validators(self): mongo_schema_uri = validators.schema_uri('mongo', self.config.get('storage_schema_file')) diff --git a/api/handlers/grouphandler.py b/api/handlers/grouphandler.py index cb23723e4fa1d074f8aceae9cc0682777845d442..b7b7a805f9471d92fbe3278395dd845e756583e3 100644 --- a/api/handlers/grouphandler.py +++ b/api/handlers/grouphandler.py @@ -11,7 +11,7 @@ class GroupHandler(base.RequestHandler): def __init__(self, request=None, response=None): super(GroupHandler, self).__init__(request, response) - self.storage = containerstorage.GroupStorage('groups', use_object_id=False) + self.storage = containerstorage.GroupStorage() def get(self, _id): group = self._get_group(_id) diff --git a/api/jobs/gears.py b/api/jobs/gears.py index 9471cabbaf19d35a28203d32c0500e9383a51550..48097e1defbdc32080c5925e0c4b3be3d40f70cc 100644 --- a/api/jobs/gears.py +++ b/api/jobs/gears.py @@ -62,7 +62,7 @@ def suggest_container(gear, cont_name, cid): Given a container reference, suggest files that would work well for each input on a gear. """ - root = ContainerStorage(cont_name, True).get_container(cid, projection={'permissions':0}, get_children=True) + root = ContainerStorage.factory(cont_name, True).get_container(cid, projection={'permissions':0}, get_children=True) invocation_schema = get_invocation_schema(gear) schemas = {} diff --git a/api/placer.py b/api/placer.py index 2298558539fe495d1bd3e04fba0dfd00fa801482..94d2d40b8535fba499cdd70669974b81b257de47 100644 --- a/api/placer.py +++ b/api/placer.py @@ -13,6 +13,7 @@ from . import files from . import tempdir as tempfile from . import util from . import validators +from .dao.containerstorage import SessionStorage, AcquisitionStorage from .dao import containerutil, hierarchy from .jobs import rules from .types import Origin @@ -94,6 +95,14 @@ class Placer(object): # Queue any jobs as a result of this upload rules.create_jobs(config.db, self.container, self.container_type, info) + def recalc_session_compliance(self): + if self.container_type in ['session', 'acquisition']: + if self.container_type is 'session': + session_id = self.id_ + else: + session_id = AcquisitionStorage().get_container(self.id_).get('session') + SessionStorage().recalc_session_compliance(session_id) + class TargetedPlacer(Placer): """ @@ -114,6 +123,7 @@ class TargetedPlacer(Placer): self.saved.append(info) def finalize(self): + self.recalc_session_compliance() return self.saved @@ -129,6 +139,7 @@ class UIDPlacer(Placer): def __init__(self, container_type, container, id_, metadata, timestamp, origin, context): super(UIDPlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context) self.metadata_for_file = {} + self.session_id = None def check(self): @@ -144,6 +155,8 @@ class UIDPlacer(Placer): self.metadata_for_file = {} for target in targets: + if target[0].level is 'session': + self.session_id = target[0].id_ for name in target[1]: self.metadata_for_file[name] = { 'container': target[0], @@ -181,6 +194,10 @@ class UIDPlacer(Placer): self.saved.append(info) def finalize(self): + if self.session_id: + self.container_type = 'session' + self.id_ = self.session_id + self.recalc_session_compliance() return self.saved @@ -238,6 +255,7 @@ class EnginePlacer(Placer): self.metadata[k].pop('files', {}) hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type) + self.recalc_session_compliance() return self.saved @@ -279,6 +297,7 @@ class TokenPlacer(Placer): dest = os.path.join(self.folder, os.path.basename(path)) shutil.move(path, dest) + self.recalc_session_compliance() return self.saved @@ -514,6 +533,8 @@ class PackfilePlacer(Placer): self.save_file(cgi_field, cgi_info) + self.recalc_session_compliance() + # Delete token config.db['tokens'].delete_one({ '_id': token })