diff --git a/api/api.py b/api/api.py index b705b406a04c1f3130a4192066171ca7eb229ee3..12a09fa76bb027620129a613073f1923430c961a 100644 --- a/api/api.py +++ b/api/api.py @@ -80,8 +80,10 @@ routes = [ webapp2.Route(r'/api', root.Root), webapp2_extras.routes.PathPrefixRoute(r'/api', [ webapp2.Route(r'/download', download.Download, handler_method='download', methods=['GET', 'POST'], name='download'), - webapp2.Route(r'/reaper', upload.Upload, handler_method='reaper', methods=['POST']), - webapp2.Route(r'/uploader', upload.Upload, handler_method='uploader', methods=['POST']), + # webapp2.Route(r'/reaper', upload.Upload, handler_method='reaper', methods=['POST']), + # upload take a parameter type with possible values ['label', 'uid'] default to 'label' + webapp2.Route(r'/uploader', upload.Upload, handler_method='upload', methods=['POST'], defaults={'strategy': 'label'}), + webapp2.Route(r'/upload/<strategy:label|uid>', upload.Upload, handler_method='upload', methods=['POST']), webapp2.Route(r'/engine', upload.Upload, handler_method='engine', methods=['POST']), webapp2.Route(r'/sites', centralclient.CentralClient, handler_method='sites', methods=['GET']), webapp2.Route(r'/register', centralclient.CentralClient, handler_method='register', methods=['POST']), diff --git a/api/base.py b/api/base.py index 7b0c1894a105f39dd59dfacfada3022c2af8cf4a..4fb476fc5107ff2eaf72a04da70f640e313df862 100644 --- a/api/base.py +++ b/api/base.py @@ -10,6 +10,7 @@ import urlparse import webapp2 from . import util +from . import files from . import config from .types import Origin from . import validators @@ -237,6 +238,8 @@ class RequestHandler(webapp2.RequestHandler): code = 400 elif isinstance(exception, APIConsistencyException): code = 400 + elif isinstance(exception, files.FileStoreException): + code = 400 else: code = 500 util.send_json_http_exception(self.response, str(exception), code) diff --git a/api/config.py b/api/config.py index 21d1b3256b2238b98e7b5ddedf209df93fa59fb9..3471d128f0ef78877769f3fc8f89fb4ab744af0e 100644 --- a/api/config.py +++ b/api/config.py @@ -124,8 +124,8 @@ expected_input_schemas = set([ 'download.json', 'tag.json', 'enginemetadata.json', - 'uploader.json', - 'reaper.json' + 'labelupload.json', + 'uidupload.json' ]) mongo_schemas = set() input_schemas = set() diff --git a/api/dao/reaperutil.py b/api/dao/hierarchy.py similarity index 72% rename from api/dao/reaperutil.py rename to api/dao/hierarchy.py index 346510d04db3eb7d46ca1df8142d248491690a27..b65345a3dd699d71cd76783f64536b2835a33b38 100644 --- a/api/dao/reaperutil.py +++ b/api/dao/hierarchy.py @@ -95,21 +95,30 @@ def add_fileinfo(cont_name, _id, fileinfo): return_document=pymongo.collection.ReturnDocument.AFTER ) -def _find_or_create_destination_project(group_name, project_label, created, modified): +def _group_name_fuzzy_match(group_name, project_label): existing_group_ids = [g['_id'] for g in config.db.groups.find(None, ['_id'])] + if group_name in existing_group_ids: + return group_name, project_label group_id_matches = difflib.get_close_matches(group_name, existing_group_ids, cutoff=0.8) if len(group_id_matches) == 1: group_name = group_id_matches[0] else: project_label = group_name + '_' + project_label group_name = 'unknown' + return group_name, project_label + +def _find_or_create_destination_project(group_name, project_label, timestamp): + group_name, project_label = _group_name_fuzzy_match(group_name, project_label) group = config.db.groups.find_one({'_id': group_name}) project = config.db.projects.find_one_and_update( - {'group': group['_id'], 'label': project_label}, + {'group': group['_id'], + 'label': {'$regex': project_label, '$options': 'i'} + }, { '$setOnInsert': { + 'label': project_label, 'permissions': group['roles'], 'public': False, - 'created': created, 'modified': modified + 'created': timestamp, 'modified': timestamp } }, PROJECTION_FIELDS, @@ -118,62 +127,58 @@ def _find_or_create_destination_project(group_name, project_label, created, modi ) return project -def create_container_hierarchy(metadata): - #TODO: possibly try to keep a list of session IDs on the project, instead of having the session point to the project - # same for the session and acquisition - # queries might be more efficient that way - group = metadata.get('group', {}) - project = metadata.get('project', {}) - session = metadata.get('session', {}) - acquisition = metadata.get('acquisition', {}) - subject = metadata.get('subject') - file_ = metadata.get('file') +def _create_session_query(session, project, type_): + if type_ == 'label': + return { + 'label': session['label'], + 'project': project['_id'] + } + elif type_ == 'uid': + return { + 'uid': session['uid'] + } + else: + raise NotImplementedError('upload type is not handled by _create_session_query') - # Fail if some fields are missing - try: - group_id = group['_id'] - project_label = project['label'] - session_uid = session['uid'] - acquisition_uid = acquisition['uid'] - except Exception as e: - log.error(metadata) - raise APIStorageException(str(e)) - subject = metadata.get('subject') - file_ = metadata.get('file') +def _create_acquisition_query(acquisition, session, type_): + if type_ == 'label': + return { + 'label': acquisition['label'], + 'session': session['_id'] + } + elif type_ == 'uid': + return { + 'uid': acquisition['uid'] + } + else: + raise NotImplementedError('upload type is not handled by _create_acquisition_query') - now = datetime.datetime.utcnow() - session_obj = config.db.sessions.find_one({'uid': session_uid}, ['project']) - if session_obj: # skip project creation, if session exists - project_obj = config.db.projects.find_one({'_id': session_obj['project']}, projection=PROJECTION_FIELDS + ['name']) - else: - project_obj = _find_or_create_destination_project(group_id, project_label, now, now) - session['subject'] = subject or {} - #FIXME session modified date should be updated on updates +def _upsert_session(session, project_obj, type_, timestamp): + session['modified'] = timestamp if session.get('timestamp'): session['timestamp'] = dateutil.parser.parse(session['timestamp']) - session['modified'] = now + session_operations = { + '$setOnInsert': dict( + group=project_obj['group'], + project=project_obj['_id'], + permissions=project_obj['permissions'], + public=project_obj['public'], + created=timestamp + ), + '$set': session + } session_obj = config.db.sessions.find_one_and_update( - {'uid': session_uid}, - { - '$setOnInsert': dict( - group=project_obj['group'], - project=project_obj['_id'], - permissions=project_obj['permissions'], - public=project_obj['public'], - created=now - ), - '$set': session, - }, - PROJECTION_FIELDS, + _create_session_query(session, project_obj, type_), + session_operations, upsert=True, return_document=pymongo.collection.ReturnDocument.AFTER, ) + return session_obj - log.info('Storing %s -> %s -> %s' % (project_obj['group'], project_obj['label'], session_uid)) - +def _upsert_acquisition(acquisition, session_obj, type_, timestamp): if acquisition.get('timestamp'): acquisition['timestamp'] = dateutil.parser.parse(acquisition['timestamp']) session_operations = {'$min': dict(timestamp=acquisition['timestamp'])} @@ -181,130 +186,98 @@ def create_container_hierarchy(metadata): session_operations['$set'] = {'timezone': acquisition['timezone']} config.db.sessions.update_one({'_id': session_obj['_id']}, session_operations) - acquisition['modified'] = now + acquisition['modified'] = timestamp acq_operations = { '$setOnInsert': dict( session=session_obj['_id'], permissions=session_obj['permissions'], public=session_obj['public'], - created=now + created=timestamp ), '$set': acquisition } - #FIXME acquisition modified date should be updated on updates acquisition_obj = config.db.acquisitions.find_one_and_update( - {'uid': acquisition_uid}, + _create_acquisition_query(acquisition, session_obj, type_), acq_operations, upsert=True, return_document=pymongo.collection.ReturnDocument.AFTER ) - return TargetContainer(acquisition_obj, 'acquisitions'), file_ + return acquisition_obj -def create_root_to_leaf_hierarchy(metadata, files): + +def _get_targets(project_obj, session, acquisition, type_, timestamp): target_containers = [] + if not session: + return target_containers + session_files = dict_fileinfos(session.pop('files', [])) + session_obj = _upsert_session(session, project_obj, type_, timestamp) + target_containers.append( + (TargetContainer(session_obj, 'session'), session_files) + ) + if not acquisition: + return target_containers + acquisition_files = dict_fileinfos(acquisition.pop('files', [])) + acquisition_obj = _upsert_acquisition(acquisition, session_obj, type_, timestamp) + target_containers.append( + (TargetContainer(acquisition_obj, 'acquisition'), acquisition_files) + ) + return target_containers + +def upsert_bottom_up_hierarchy(metadata): + group = metadata.get('group', {}) + project = metadata.get('project', {}) + session = metadata.get('session', {}) + acquisition = metadata.get('acquisition', {}) + + # Fail if some fields are missing + try: + group_id = group['_id'] + project_label = project['label'] + session_uid = session['uid'] + acquisition_uid = acquisition['uid'] + except Exception as e: + log.error(metadata) + raise APIStorageException(str(e)) + + now = datetime.datetime.utcnow() + + session_obj = config.db.sessions.find_one({'uid': session_uid}, ['project']) + if session_obj: # skip project creation, if session exists + project_files = dict_fileinfos(project.pop('files', [])) + project_obj = config.db.projects.find_one({'_id': session_obj['project']}, projection=PROJECTION_FIELDS + ['name']) + target_containers = _get_targets(project_obj, session, acquisition, 'uid', now) + target_containers.append( + (TargetContainer(project_obj, 'project'), project_files) + ) + return target_containers + else: + return upsert_top_down_hierarchy(metadata, 'uid') + + +def upsert_top_down_hierarchy(metadata, type_='label'): group = metadata['group'] project = metadata['project'] session = metadata.get('session') acquisition = metadata.get('acquisition') now = datetime.datetime.utcnow() + group_id = group['_id'] - group_obj = config.db.groups.find_one({'_id': group['_id']}) - if not group_obj: - raise APIStorageException('group does not exist') - project['modified'] = now - project_files = merge_fileinfos(files, project.pop('files', [])) - project_obj = config.db.projects.find_one_and_update( - { - 'label': project['label'], - 'group': group['_id'] - }, - { - '$setOnInsert': dict( - group=group_obj['_id'], - permissions=group_obj['roles'], - public=False, - created=now - ), - '$set': project - }, - upsert=True, - return_document=pymongo.collection.ReturnDocument.AFTER, - ) - target_containers.append( - (TargetContainer(project_obj, 'projects'), project_files) - ) - if not session: - return target_containers - session['modified'] = now - session_files = merge_fileinfos(files, session.pop('files', [])) - session_operations = { - '$setOnInsert': dict( - group=project_obj['group'], - project=project_obj['_id'], - permissions=project_obj['permissions'], - public=project_obj['public'], - created=now - ), - '$set': session - } - session_obj = config.db.sessions.find_one_and_update( - { - 'label': session['label'], - 'project': project_obj['_id'], - }, - session_operations, - upsert=True, - return_document=pymongo.collection.ReturnDocument.AFTER, - ) - target_containers.append( - (TargetContainer(session_obj, 'sessions'), session_files) - ) - if not acquisition: - return target_containers - acquisition['modified'] = now - acquisition_files = merge_fileinfos(files, acquisition.pop('files', [])) - acq_operations = { - '$setOnInsert': dict( - session=session_obj['_id'], - permissions=session_obj['permissions'], - public=session_obj['public'], - created=now - ), - '$set': acquisition - } - acquisition_obj = config.db.acquisitions.find_one_and_update( - { - 'label': acquisition['label'], - 'session': session_obj['_id'] - }, - acq_operations, - upsert=True, - return_document=pymongo.collection.ReturnDocument.AFTER - ) + project_files = dict_fileinfos(project.pop('files', [])) + project_obj = _find_or_create_destination_project(group_id, project['label'], now) + target_containers = _get_targets(project_obj, session, acquisition, type_, now) target_containers.append( - (TargetContainer(acquisition_obj, 'acquisitions'), acquisition_files) + (TargetContainer(project_obj, 'project'), project_files) ) return target_containers -def merge_fileinfos(parsed_files, infos): - """it takes a dictionary of "hard_infos" (file size, hash) - merging them with infos derived from a list of infos on the same or on other files - """ - merged_files = {} +def dict_fileinfos(infos): + dict_infos = {} for info in infos: - parsed = parsed_files.get(info['name']) - if parsed: - path = parsed.path - new_infos = copy.deepcopy(parsed.info) - else: - path = None - new_infos = {} - new_infos.update(info) - merged_files[info['name']] = files.ParsedFile(new_infos, path) - return merged_files + dict_infos[info['name']] = info + return dict_infos def update_container_hierarchy(metadata, acquisition_id, level): @@ -353,3 +326,20 @@ def _update_container(query, update, cont_name): }, return_document=pymongo.collection.ReturnDocument.AFTER ) + +def merge_fileinfos(parsed_files, infos): + """it takes a dictionary of "hard_infos" (file size, hash) + merging them with infos derived from a list of infos on the same or on other files + """ + merged_files = {} + for info in infos: + parsed = parsed_files.get(info['name']) + if parsed: + path = parsed.path + new_infos = copy.deepcopy(parsed.info) + else: + path = None + new_infos = {} + new_infos.update(info) + merged_files[info['name']] = files.ParsedFile(new_infos, path) + return merged_files diff --git a/api/placer.py b/api/placer.py index dad78c02f1f1b91ad2221e581b08644c7bde28a0..2e1973e0aa5a0d6081192604ee2643f19bdbd5ae 100644 --- a/api/placer.py +++ b/api/placer.py @@ -13,8 +13,9 @@ from . import rules from . import tempdir as tempfile from . import util from . import validators -from .dao import reaperutil, APIStorageException +from .dao import hierarchy, APIStorageException +log = config.log class Placer(object): """ @@ -73,7 +74,7 @@ class Placer(object): files.move_form_file_field_into_cas(field) # Update the DB - reaperutil.upsert_fileinfo(self.container_type, self.id, info) + hierarchy.upsert_fileinfo(self.container_type, self.id, info) # Queue any jobs as a result of this upload rules.create_jobs(config.db, self.container, self.container_type, info) @@ -100,20 +101,70 @@ class TargetedPlacer(Placer): return self.saved -class ReaperPlacer(Placer): +class UIDPlacer(Placer): """ - A placer that can accept files sent to it from a reaper. - Currently a stub. + A placer that can accept multiple files. + It uses the method upsert_bottom_up_hierarchy to create its project/session/acquisition hierarchy + Sessions and acquisitions are identified by UID. """ + metadata_schema = 'uidupload.json' + create_hierarchy = staticmethod(hierarchy.upsert_bottom_up_hierarchy) + + def check(self): + self.requireMetadata() + + payload_schema_uri = util.schema_uri('input', self.metadata_schema) + metadata_validator = validators.from_schema_path(payload_schema_uri) + metadata_validator(self.metadata, 'POST') + + targets = self.create_hierarchy(self.metadata) + + self.metadata_for_file = { } + + for target in targets: + for name in target[1]: + self.metadata_for_file[name] = { + 'container': target[0], + 'metadata': target[1][name] + } + + self.saved = [] + + def process_file_field(self, field, info): + + # For the file, given self.targets, choose a target + + name = field.filename + target = self.metadata_for_file.get(name) + # if the file was not included in the metadata skip it + if not target: + return + container = target['container'] + r_metadata = target['metadata'] - # def check(self): - # self.requireMetadata() + self.container_type = container.level + self.id = container._id + self.container = container.container - # def process_file_field(self, field, info): - # pass + info.update(r_metadata) + + self.save_file(field, info) + self.saved.append(info) + + def finalize(self): + return self.saved - # def finalize(self): - # pass + +class LabelPlacer(UIDPlacer): + """ + A placer that create a hierarchy based on labels. + + It uses the method upsert_top_down_hierarchy to create its project/session/acquisition hierarchy + Sessions and acquisitions are identified by label. + """ + + metadata_schema = 'labelupload.json' + create_hierarchy = staticmethod(hierarchy.upsert_top_down_hierarchy) class EnginePlacer(Placer): @@ -125,6 +176,9 @@ class EnginePlacer(Placer): def check(self): self.requireTarget() validators.validate_data(self.metadata, 'enginemetadata.json', 'input', 'POST', optional=True) + self.saved = [] + + # Could avoid loops in process_file_field by setting up the def process_file_field(self, field, info): if self.metadata is not None: @@ -137,20 +191,21 @@ class EnginePlacer(Placer): if file_md['name'] == info['name']: break else: - file_md = None + file_md = {} for x in ('type', 'instrument', 'measurements', 'tags', 'metadata'): info[x] = file_md.get(x) or info[x] self.save_file(field, info) + self.saved.append(info) def finalize(self): # Updating various properties of the hierarchy; currently assumes acquisitions; might need fixing for other levels. # NOTE: only called in EnginePlacer bid = bson.ObjectId(self.id) - self.obj = reaperutil.update_container_hierarchy(self.metadata, bid, '') + self.obj = hierarchy.update_container_hierarchy(self.metadata, bid, '') - return {} + return self.saved class PackfilePlacer(Placer): diff --git a/api/schemas/input/file.json b/api/schemas/input/file.json index f201963c5f41cc1f96fce157a822b8313dad427b..66a1949dc5fa55c9c8b948b789e1093175f3cd38 100644 --- a/api/schemas/input/file.json +++ b/api/schemas/input/file.json @@ -5,8 +5,6 @@ "name": { "type": "string" }, "type": { "type": "string" }, "mimetype": { "type": "string" }, - "size": { "type": "integer" }, - "hash": { "type": "string" }, "instrument": { "type": "string" }, "measurements": { "items": { "type": "string"}, diff --git a/api/schemas/input/uploader.json b/api/schemas/input/labelupload.json similarity index 100% rename from api/schemas/input/uploader.json rename to api/schemas/input/labelupload.json diff --git a/api/schemas/input/reaper.json b/api/schemas/input/uidupload.json similarity index 100% rename from api/schemas/input/reaper.json rename to api/schemas/input/uidupload.json diff --git a/api/upload.py b/api/upload.py index 37f54a45e7f074416e66e3267fa92922fd6ad0d8..9e0622d0e971f8356881075f27f4f41b336e6e5c 100644 --- a/api/upload.py +++ b/api/upload.py @@ -11,15 +11,16 @@ from . import tempdir as tempfile from . import placer as pl from . import util from . import validators -from .dao import reaperutil, APIStorageException +from .dao import hierarchy, APIStorageException log = config.log Strategy = util.Enum('Strategy', { - 'targeted' : pl.TargetedPlacer, # Upload N files to a container. - 'reaper': pl.ReaperPlacer, # Upload N files from a scientific data source. - 'engine' : pl.EnginePlacer, # Upload N files from the result of a successful job. - 'packfile' : pl.PackfilePlacer # Upload N files as a new packfile to a container. + 'targeted' : pl.TargetedPlacer, # Upload N files to a container. + 'engine' : pl.EnginePlacer, # Upload N files from the result of a successful job. + 'packfile' : pl.PackfilePlacer, # Upload N files as a new packfile to a container. + 'labelupload': pl.LabelPlacer, + 'uidupload' : pl.UIDPlacer, }) def process_upload(request, strategy, container_type=None, id=None, origin=None): @@ -64,7 +65,7 @@ def process_upload(request, strategy, container_type=None, id=None, origin=None) container = None if container_type and id: - container = reaperutil.get_container(container_type, id) + container = hierarchy.get_container(container_type, id) # The vast majority of this function's wall-clock time is spent here. # Tempdir is deleted off disk once out of scope, so let's hold onto this reference. @@ -73,7 +74,12 @@ def process_upload(request, strategy, container_type=None, id=None, origin=None) metadata = None if 'metadata' in form: # Slight misnomer: the metadata field, if present, is sent as a normal form field, NOT a file form field. - metadata = json.loads(form['metadata'].file.getvalue()) + metadata_file = form['metadata'].file + try: + metadata = json.loads(metadata_file.getvalue()) + except AttributeError: + raise files.FileStoreException('wrong format for field "metadata"') + placer_class = strategy.value placer = placer_class(container_type, container, id, metadata, timestamp, origin) @@ -132,7 +138,7 @@ class Upload(base.RequestHandler): with tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) as tempdir_path: try: file_store = files.FileStore(self.request, tempdir_path) - except FileStoreException as e: + except files.FileStoreException as e: self.abort(400, str(e)) now = datetime.datetime.utcnow() fileinfo = dict( @@ -147,7 +153,7 @@ class Upload(base.RequestHandler): origin=self.origin ) - target, file_metadata = reaperutil.create_container_hierarchy(file_store.metadata) + target, file_metadata = hierarchy.create_container_hierarchy(file_store.metadata) fileinfo.update(file_metadata) f = target.find(file_store.filename) target_path = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash'])) @@ -162,38 +168,19 @@ class Upload(base.RequestHandler): throughput = file_store.size / file_store.duration.total_seconds() log.info('Received %s [%s, %s/s] from %s' % (file_store.filename, util.hrsize(file_store.size), util.hrsize(throughput), self.request.client_addr)) - def uploader(self): + def upload(self, strategy): """Receive a sortable reaper upload.""" if not self.superuser_request: self.abort(402, 'uploads must be from an authorized drone') - with tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) as tempdir_path: - try: - file_store = files.MultiFileStore(self.request, tempdir_path) - except FileStoreException as e: - self.abort(400, str(e)) - if not file_store.metadata: - self.abort(400, 'metadata is missing') - payload_schema_uri = util.schema_uri('input', 'uploader.json') - metadata_validator = validators.from_schema_path(payload_schema_uri) - metadata_validator(file_store.metadata, 'POST') - try: - target_containers = reaperutil.create_root_to_leaf_hierarchy(file_store.metadata, file_store.files) - except APIStorageException as e: - self.abort(400, str(e)) - for target, file_dict in target_containers: - for filename, parsed_file in file_dict.items(): - fileinfo = parsed_file.info - fileinfo['origin'] = self.origin - f = target.find(filename) - target_path = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash'])) - if not f: - files.move_file(parsed_file.path, target_path) - target.add_file(fileinfo) - rules.create_jobs(config.db, target.container, target.level[:-1], fileinfo) - elif not files.identical(fileinfo['hash'], parsed_file.path, f['hash'], util.path_from_hash(f['hash'])): - files.move_file(parsed_file.path, target_path) - target.update_file(fileinfo) - rules.create_jobs(config.db, target.container, target.level[:-1], fileinfo) + + # TODO: what enum + if strategy == 'label': + strategy = Strategy.labelupload + elif strategy == 'uid': + strategy = Strategy.uidupload + else: + self.abort(500, 'stragegy {} not implemented'.format(strategy)) + return process_upload(self.request, strategy, origin=self.origin) def engine(self): """ @@ -217,7 +204,7 @@ class Upload(base.RequestHandler): with tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) as tempdir_path: try: file_store = files.MultiFileStore(self.request, tempdir_path) - except FileStoreException as e: + except files.FileStoreException as e: self.abort(400, str(e)) if not file_store.metadata: self.abort(400, 'metadata is missing') @@ -227,7 +214,7 @@ class Upload(base.RequestHandler): file_infos = file_store.metadata['acquisition'].pop('files', []) now = datetime.datetime.utcnow() try: - acquisition_obj = reaperutil.update_container_hierarchy(file_store.metadata, acquisition_id, level) + acquisition_obj = hierarchy.update_container_hierarchy(file_store.metadata, acquisition_id, level) except APIStorageException as e: self.abort(400, e.message) # move the files before updating the database @@ -236,14 +223,14 @@ class Upload(base.RequestHandler): target_path = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash'])) files.move_file(parsed_file.path, target_path) # merge infos from the actual file and from the metadata - merged_files = reaperutil.merge_fileinfos(file_store.files, file_infos) + merged_files = hierarchy.merge_fileinfos(file_store.files, file_infos) # update the fileinfo in mongo if a file already exists for f in acquisition_obj['files']: merged_file = merged_files.get(f['name']) if merged_file: fileinfo = merged_file.info fileinfo['modified'] = now - acquisition_obj = reaperutil.update_fileinfo('acquisitions', acquisition_obj['_id'], fileinfo) + acquisition_obj = hierarchy.update_fileinfo('acquisitions', acquisition_obj['_id'], fileinfo) fileinfo['existing'] = True # create the missing fileinfo in mongo for name, merged_file in merged_files.items(): @@ -255,7 +242,7 @@ class Upload(base.RequestHandler): fileinfo['created'] = now fileinfo['modified'] = now fileinfo['origin'] = self.origin - acquisition_obj = reaperutil.add_fileinfo('acquisitions', acquisition_obj['_id'], fileinfo) + acquisition_obj = hierarchy.add_fileinfo('acquisitions', acquisition_obj['_id'], fileinfo) for f in acquisition_obj['files']: if f['name'] in file_store.files: @@ -268,4 +255,3 @@ class Upload(base.RequestHandler): } rules.create_jobs(config.db, acquisition_obj, 'acquisition', file_) return [{'name': k, 'hash': v.info.get('hash'), 'size': v.info.get('size')} for k, v in merged_files.items()] - diff --git a/test/integration_tests/test_uploads.py b/test/integration_tests/test_uploads.py new file mode 100644 index 0000000000000000000000000000000000000000..048e715a36999f5293d6ba8974f1e98f84c37b5a --- /dev/null +++ b/test/integration_tests/test_uploads.py @@ -0,0 +1,127 @@ +import os +import json +import time +import pytest +import logging + +log = logging.getLogger(__name__) +sh = logging.StreamHandler() +log.addHandler(sh) + +@pytest.fixture() +def with_group_and_file_data(api_as_admin, data_builder, bunch, request): + group_id = 'test_group_' + str(int(time.time() * 1000)) + data_builder.create_group(group_id) + file_names = ['proj.csv', 'ses.csv', 'acq.csv'] + files = {} + for i, name in enumerate(file_names): + files['file' + str(i+1)] = (name, 'some,data,to,send\nanother,row,to,send\n') + + def teardown_db(): + r = api_as_admin.get('/groups/{}/projects'.format(group_id)) + content = json.loads(r.content) + if content: + project_id = content[0]['_id'] + r = api_as_admin.get('/projects/{}/sessions'.format(project_id)) + content = json.loads(r.content) + if content: + session_id = content[0]['_id'] + r = api_as_admin.get('/sessions/{}/acquisitions'.format(session_id)) + content = json.loads(r.content) + if content: + acquisition_id = content[0]['_id'] + api_as_admin.delete('/acquisitions/' + acquisition_id) + api_as_admin.delete('/sessions/' + session_id) + api_as_admin.delete('/projects/' + project_id) + api_as_admin.delete('/groups/' + group_id) + + request.addfinalizer(teardown_db) + + fixture_data = bunch.create() + fixture_data.group_id = group_id + fixture_data.files = files + return fixture_data + + +def test_uid_upload(with_group_and_file_data, api_as_admin): + data = with_group_and_file_data + metadata = { + 'group':{ + '_id': data.group_id + }, + 'project':{ + 'label':'test_project', + 'files':[ + { + 'name':data.files.keys()[0] + } + ] + }, + 'session':{ + 'uid':'test_session_uid', + 'files':[ + { + 'name':data.files.keys()[1] + } + ], + 'subject': {'code': 'test_subject'} + }, + 'acquisition':{ + 'uid':'test_acquisition_uid', + 'files':[ + { + 'name':data.files.keys()[2] + } + ] + } + } + metadata = json.dumps(metadata) + data.files['metadata'] = ('', metadata) + + r = api_as_admin.post('/upload/uid', files=data.files) + assert r.ok + + +def test_label_upload(with_group_and_file_data, api_as_admin): + data = with_group_and_file_data + metadata = { + 'group':{ + '_id': data.group_id + }, + 'project':{ + 'label':'test_project', + 'files':[ + { + 'name':data.files.keys()[0] + } + ] + }, + 'session':{ + 'label':'test_session', + 'files':[ + { + 'name':data.files.keys()[1] + } + ], + 'subject': {'code': 'test_subject'} + }, + 'acquisition':{ + 'label':'test_acquisition', + 'files':[ + { + 'name':data.files.keys()[2] + } + ] + } + } + metadata = json.dumps(metadata) + data.files['metadata'] = ('', metadata) + + r = api_as_admin.post('/upload/label', files=data.files) + assert r.ok + + data.files['metadata'] = metadata + r = api_as_admin.post('/upload/label', files=data.files) + assert r.status_code == 400 + +