diff --git a/api/dao/hierarchy.py b/api/dao/hierarchy.py index fee634338f7d063f0caaa99e5167a6ede9f90796..2f67cebbec55817663917bd8d827485497503208 100644 --- a/api/dao/hierarchy.py +++ b/api/dao/hierarchy.py @@ -182,83 +182,52 @@ def _find_or_create_destination_project(group_id, project_label, timestamp): ) return project - -def _create_session_query(session, project, type_): - if type_ == 'label': +def _create_query(cont, cont_type, parent_type, parent_id, upload_type): + if upload_type == 'label': return { - 'label': session['label'], - 'project': project['_id'] + 'label': cont['label'], + parent_type: bson.ObjectId(parent_id) } - elif type_ == 'uid': + elif upload_type == 'uid': return { - 'uid': session['uid'] + 'uid': cont['uid'] } else: - raise NotImplementedError('upload type is not handled by _create_session_query') + raise NotImplementedError('upload type is not handled by _create_query') +def _upsert_container(cont, cont_type, parent, parent_type, upload_type, timestamp): + cont['modified'] = timestamp + + if cont.get('timestamp'): + cont['timestamp'] = dateutil.parser.parse(cont['timestamp']) + + if cont_type == 'acquisition': + session_operations = {'$min': dict(timestamp=cont['timestamp'])} + if cont.get('timezone'): + session_operations['$set'] = {'timezone': cont['timezone']} + config.db.sessions.update_one({'_id': parent['_id']}, session_operations) + + if cont_type == 'session': + cont['subject'] = containerutil.add_id_to_subject(cont.get('subject'), parent['_id']) + + query = _create_query(cont, cont_type, parent_type, parent['_id'], upload_type) + + if config.db[cont_type+'s'].find_one(query) is not None: + return _update_container_nulls(query, cont, cont_type) -def _create_acquisition_query(acquisition, session, type_): - if type_ == 'label': - return { - 'label': acquisition['label'], - 'session': session['_id'] - } - elif type_ == 'uid': - return { - 'uid': acquisition['uid'] - } else: - raise NotImplementedError('upload type is not handled by _create_acquisition_query') - - -def _upsert_session(session, project_obj, type_, timestamp): - session['modified'] = timestamp - if session.get('timestamp'): - session['timestamp'] = dateutil.parser.parse(session['timestamp']) - session['subject'] = containerutil.add_id_to_subject(session.get('subject'), project_obj['_id']) - session_operations = { - '$setOnInsert': dict( - group=project_obj['group'], - project=project_obj['_id'], - permissions=project_obj['permissions'], - public=project_obj.get('public', False), - created=timestamp - ), - '$set': session - } - session_obj = config.db.sessions.find_one_and_update( - _create_session_query(session, project_obj, type_), - session_operations, - upsert=True, - return_document=pymongo.collection.ReturnDocument.AFTER, - ) - return session_obj - -def _upsert_acquisition(acquisition, session_obj, type_, timestamp): - if acquisition.get('timestamp'): - acquisition['timestamp'] = dateutil.parser.parse(acquisition['timestamp']) - session_operations = {'$min': dict(timestamp=acquisition['timestamp'])} - if acquisition.get('timezone'): - session_operations['$set'] = {'timezone': acquisition['timezone']} - config.db.sessions.update_one({'_id': session_obj['_id']}, session_operations) - - acquisition['modified'] = timestamp - acq_operations = { - '$setOnInsert': dict( - session=session_obj['_id'], - permissions=session_obj['permissions'], - public=session_obj.get('public', False), - created=timestamp - ), - '$set': acquisition - } - acquisition_obj = config.db.acquisitions.find_one_and_update( - _create_acquisition_query(acquisition, session_obj, type_), - acq_operations, - upsert=True, - return_document=pymongo.collection.ReturnDocument.AFTER - ) - return acquisition_obj + insert_vals = { + parent_type: parent['_id'], + 'permissions': parent['permissions'], + 'public': parent.get('public', False), + 'created': timestamp + } + if cont_type == 'session': + insert_vals['group'] = parent['group'] + cont.update(insert_vals) + insert_id = config.db[cont_type+'s'].insert(cont) + cont['_id'] = insert_id + return cont def _get_targets(project_obj, session, acquisition, type_, timestamp): @@ -266,14 +235,14 @@ def _get_targets(project_obj, session, acquisition, type_, timestamp): if not session: return target_containers session_files = dict_fileinfos(session.pop('files', [])) - session_obj = _upsert_session(session, project_obj, type_, timestamp) + session_obj = _upsert_container(session, 'session', project_obj, 'project', type_, timestamp) target_containers.append( (TargetContainer(session_obj, 'session'), session_files) ) if not acquisition: return target_containers acquisition_files = dict_fileinfos(acquisition.pop('files', [])) - acquisition_obj = _upsert_acquisition(acquisition, session_obj, type_, timestamp) + acquisition_obj = _upsert_container(acquisition, 'acquisition', session_obj, 'session', type_, timestamp) target_containers.append( (TargetContainer(acquisition_obj, 'acquisition'), acquisition_files) ) @@ -340,22 +309,13 @@ def update_container_hierarchy(metadata, cid, container_type): if c_metadata.get('timestamp'): c_metadata['timestamp'] = dateutil.parser.parse(c_metadata['timestamp']) c_metadata['modified'] = now - c_obj = _update_container({'_id': cid}, {}, c_metadata, container_type) + c_obj = _update_container_nulls({'_id': cid}, c_metadata, container_type) if c_obj is None: raise APIStorageException('container does not exist') if container_type in ['session', 'acquisition']: _update_hierarchy(c_obj, container_type, metadata) return c_obj - -def _update_container(query, update, set_update, container_type): - coll_name = container_type if container_type.endswith('s') else container_type+'s' - update['$set'] = util.mongo_dict(set_update) - return config.db[coll_name].find_one_and_update(query,update, - return_document=pymongo.collection.ReturnDocument.AFTER - ) - - def _update_hierarchy(container, container_type, metadata): project_id = container.get('project') # for sessions now = datetime.datetime.utcnow() @@ -365,7 +325,7 @@ def _update_hierarchy(container, container_type, metadata): session_obj = None if session.keys(): session['modified'] = now - session_obj = _update_container({'_id': container['session']}, {}, session, 'sessions') + session_obj = _update_container_nulls({'_id': container['session']}, session, 'sessions') if session_obj is None: session_obj = get_container('session', container['session']) project_id = session_obj['project'] @@ -375,7 +335,32 @@ def _update_hierarchy(container, container_type, metadata): project = metadata.get('project', {}) if project.keys(): project['modified'] = now - project_obj = _update_container({'_id': project_id}, {}, project, 'projects') + project_obj = _update_container_nulls({'_id': project_id}, project, 'projects') + +def _update_container_nulls(base_query, update, container_type): + coll_name = container_type if container_type.endswith('s') else container_type+'s' + cont = config.db[coll_name].find_one(base_query) + if cont is None: + raise APIStorageException('Failed to find {} object using the query: {}'.format(container_type, base_query)) + + bulk = config.db[coll_name].initialize_unordered_bulk_op() + + if update.get('metadata') and not cont.get('metadata'): + # If we are trying to update metadata fields and the container metadata does not exist or is empty, + # metadata can all be updated at once for efficiency + m_update = util.mongo_sanitize_fields(update.pop('metadata')) + bulk.find(base_query).update_one({'$set': {'metadata': m_update}}) + + update_dict = util.mongo_dict(update) + for k,v in update_dict.items(): + q = {} + q.update(base_query) + q['$or'] = [{k: {'$exists': False}}, {k: None}] + u = {'$set': {k: v}} + log.debug('the query is {} and the update is {}'.format(q,u)) + bulk.find(q).update_one(u) + bulk.execute() + return config.db[coll_name].find_one(base_query) def merge_fileinfos(parsed_files, infos): diff --git a/api/upload.py b/api/upload.py index 02604acfecbdabefc49665f1f0d881088ffbf282..722fe949b297cc33bb6421c277603db7adcf1d10 100644 --- a/api/upload.py +++ b/api/upload.py @@ -143,43 +143,6 @@ def process_upload(request, strategy, container_type=None, id=None, origin=None, class Upload(base.RequestHandler): - def reaper(self): - """Receive a sortable reaper upload.""" - if not self.superuser_request: - self.abort(402, 'uploads must be from an authorized drone') - with tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) as tempdir_path: - try: - file_store = files.FileStore(self.request, tempdir_path) - except files.FileStoreException as e: - self.abort(400, str(e)) - now = datetime.datetime.utcnow() - fileinfo = dict( - name=file_store.filename, - created=now, - modified=now, - size=file_store.size, - hash=file_store.hash, - mimetype=util.guess_mimetype(file_store.filename), - tags=file_store.tags, - metadata=file_store.metadata.get('file', {}).get('metadata', {}), - origin=self.origin - ) - - target, file_metadata = hierarchy.create_container_hierarchy(file_store.metadata) - fileinfo.update(file_metadata) - f = target.find(file_store.filename) - target_path = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash'])) - if not f: - file_store.move_file(target_path) - target.add_file(fileinfo) - rules.create_jobs(config.db, target.container, target.level[:-1], fileinfo) - elif not files.identical(file_store.hash, file_store.path, f['hash'], util.path_from_hash(f['hash'])): - file_store.move_file(target_path) - target.update_file(fileinfo) - rules.create_jobs(config.db, target.container, target.level[:-1], fileinfo) - throughput = file_store.size / file_store.duration.total_seconds() - log.info('Received %s [%s, %s/s] from %s' % (file_store.filename, util.hrsize(file_store.size), util.hrsize(throughput), self.request.client_addr)) - def upload(self, strategy): """ .. http:post:: /api/upload/<strategy:label|uid> diff --git a/test/integration_tests/test_uploads.py b/test/integration_tests/test_uploads.py index 957b8f07a14dd88e4d0aba0b56c043f2dfec09d0..a5e6330ebb1b5eba66dd2d979568d36f4f5a42b2 100644 --- a/test/integration_tests/test_uploads.py +++ b/test/integration_tests/test_uploads.py @@ -198,20 +198,23 @@ def test_acquisition_engine_upload(with_hierarchy_and_file_data, api_as_admin): r = api_as_admin.get('/projects/' + data.project) assert r.ok p = json.loads(r.content) - assert p['label'] == metadata['project']['label'] + # Engine metadata should not replace existing fields + assert p['label'] != metadata['project']['label'] assert cmp(p['metadata'], metadata['project']['metadata']) == 0 r = api_as_admin.get('/sessions/' + data.session) assert r.ok s = json.loads(r.content) - assert s['label'] == metadata['session']['label'] + # Engine metadata should not replace existing fields + assert s['label'] != metadata['session']['label'] assert cmp(s['metadata'], metadata['session']['metadata']) == 0 assert s['subject']['code'] == metadata['session']['subject']['code'] r = api_as_admin.get('/acquisitions/' + data.acquisition) assert r.ok a = json.loads(r.content) - assert a['label'] == metadata['acquisition']['label'] + # Engine metadata should not replace existing fields + assert a['label'] != metadata['acquisition']['label'] a_timestamp = dateutil.parser.parse(a['timestamp']) m_timestamp = dateutil.parser.parse(metadata['acquisition']['timestamp']) assert a_timestamp == m_timestamp @@ -258,13 +261,15 @@ def test_session_engine_upload(with_hierarchy_and_file_data, api_as_admin): r = api_as_admin.get('/projects/' + data.project) assert r.ok p = json.loads(r.content) - assert p['label'] == metadata['project']['label'] + # Engine metadata should not replace existing fields + assert p['label'] != metadata['project']['label'] assert cmp(p['metadata'], metadata['project']['metadata']) == 0 r = api_as_admin.get('/sessions/' + data.session) assert r.ok s = json.loads(r.content) - assert s['label'] == metadata['session']['label'] + # Engine metadata should not replace existing fields + assert s['label'] != metadata['session']['label'] assert cmp(s['metadata'], metadata['session']['metadata']) == 0 assert s['subject']['code'] == metadata['session']['subject']['code'] s_timestamp = dateutil.parser.parse(s['timestamp']) @@ -306,7 +311,8 @@ def test_project_engine_upload(with_hierarchy_and_file_data, api_as_admin): r = api_as_admin.get('/projects/' + data.project) assert r.ok p = json.loads(r.content) - assert p['label'] == metadata['project']['label'] + # Engine metadata should not replace existing fields + assert p['label'] != metadata['project']['label'] assert cmp(p['metadata'], metadata['project']['metadata']) == 0 for f in p['files']: @@ -413,20 +419,23 @@ def test_acquisition_metadata_only_engine_upload(with_hierarchy_and_file_data, a r = api_as_admin.get('/projects/' + data.project) assert r.ok p = json.loads(r.content) - assert p['label'] == metadata['project']['label'] + # Engine metadata should not replace existing fields + assert p['label'] != metadata['project']['label'] assert cmp(p['metadata'], metadata['project']['metadata']) == 0 r = api_as_admin.get('/sessions/' + data.session) assert r.ok s = json.loads(r.content) - assert s['label'] == metadata['session']['label'] + # Engine metadata should not replace existing fields + assert s['label'] != metadata['session']['label'] assert cmp(s['metadata'], metadata['session']['metadata']) == 0 assert s['subject']['code'] == metadata['session']['subject']['code'] r = api_as_admin.get('/acquisitions/' + data.acquisition) assert r.ok a = json.loads(r.content) - assert a['label'] == metadata['acquisition']['label'] + # Engine metadata should not replace existing fields + assert a['label'] != metadata['acquisition']['label'] a_timestamp = dateutil.parser.parse(a['timestamp']) m_timestamp = dateutil.parser.parse(metadata['acquisition']['timestamp']) assert a_timestamp == m_timestamp