Skip to content
Snippets Groups Projects
Commit ed9fb63d authored by Renzo Frigato's avatar Renzo Frigato
Browse files

enable engine upload with metadata updates

parent 1783b5a8
No related branches found
No related tags found
No related merge requests found
...@@ -14,7 +14,7 @@ from . import files ...@@ -14,7 +14,7 @@ from . import files
from . import rules from . import rules
from . import config from . import config
from . import centralclient from . import centralclient
from .dao import reaperutil from .dao import reaperutil, APIStorageException
from . import tempdir as tempfile from . import tempdir as tempfile
log = config.log log = config.log
...@@ -193,8 +193,42 @@ class Core(base.RequestHandler): ...@@ -193,8 +193,42 @@ class Core(base.RequestHandler):
self.abort(400, 'metadata is missing') self.abort(400, 'metadata is missing')
metadata_validator = validators.payload_from_schema_file(self, 'input/enginemetadata.json') metadata_validator = validators.payload_from_schema_file(self, 'input/enginemetadata.json')
metadata_validator(file_store.metadata, 'POST') metadata_validator(file_store.metadata, 'POST')
file_infos = file_store.metadata['acquisition'].pop('files', [])
try:
acquisition_obj = reaperutil.update_container_hierarchy(file_store.metadata)
except APIStorageException as e:
self.abort(400, e.message)
# move the files before updating the database
for name, fileinfo in file_store.files.items():
path = fileinfo['path']
target_path = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash']))
files.move_file(path, target_path)
self._merge_fileinfos(file_store.files, file_infos)
# update the fileinfo in mongo if a file already exists
for f in acquisition_obj['files']:
fileinfo = file_store.files.get(f['name'])
if fileinfo:
fileinfo.pop('path', None)
reaperutil.update_fileinfo('acquisitions', acquisition_obj['_id'], fileinfo)
fileinfo['existing'] = True
# create the missing fileinfo in mongo
for name, fileinfo in file_store.files.items():
# if the file exists we don't need to create it
# skip update fileinfo for files that doesn't have a path
if not fileinfo.get('existing') and fileinfo.get('path'):
del fileinfo['path']
reaperutil.add_fileinfo('acquisitions', acquisition_obj['_id'], fileinfo)
return [{'filename': k, 'hash': v['hash'], 'size': v['size']} for k, v in file_store.files.items()] return [{'filename': k, 'hash': v['hash'], 'size': v['size']} for k, v in file_store.files.items()]
def _merge_fileinfos(self, hard_infos, infos):
"""it takes a dictionary of "hard_infos" (file size, hash, mimetype, filetype)
merging them with infos derived from a list of infos on the same or on other files
"""
for info in infos:
info.update(hard_infos.get(info['name'], {}))
hard_infos[info['name']] = info
def _preflight_archivestream(self, req_spec): def _preflight_archivestream(self, req_spec):
data_path = config.get_item('persistent', 'data_path') data_path = config.get_item('persistent', 'data_path')
arc_prefix = 'sdm' arc_prefix = 'sdm'
......
import bson
import difflib import difflib
import pymongo import pymongo
import datetime import datetime
...@@ -9,6 +10,7 @@ from . import APIStorageException ...@@ -9,6 +10,7 @@ from . import APIStorageException
log = config.log log = config.log
PROJECTION_FIELDS = ['group', 'name', 'label', 'timestamp', 'permissions', 'public']
class TargetAcquisition(object): class TargetAcquisition(object):
...@@ -32,7 +34,7 @@ class TargetAcquisition(object): ...@@ -32,7 +34,7 @@ class TargetAcquisition(object):
for k,v in fileinfo.iteritems(): for k,v in fileinfo.iteritems():
update_set['files.$.' + k] = v update_set['files.$.' + k] = v
acquisition_obj = self.dbc.update_one( acquisition_obj = self.dbc.update_one(
{'_id': self.acquisition['_id'], 'files.name': fileinfo['filename']}, {'_id': self.acquisition['_id'], 'files.name': fileinfo['name']},
{'$set': update_set} {'$set': update_set}
) )
...@@ -40,9 +42,19 @@ class TargetAcquisition(object): ...@@ -40,9 +42,19 @@ class TargetAcquisition(object):
fileinfo.update(self.fileinfo) fileinfo.update(self.fileinfo)
self.dbc.update_one({'_id': self.acquisition['_id']}, {'$push': {'files': fileinfo}}) self.dbc.update_one({'_id': self.acquisition['_id']}, {'$push': {'files': fileinfo}})
def update_fileinfo(cont_name, _id, fileinfo):
update_set = {'files.$.modified': datetime.datetime.utcnow()}
# in this method, we are overriding an existing file.
# update_set allows to update all the fileinfo like size, hash, etc.
for k,v in fileinfo.iteritems():
update_set['files.$.' + k] = v
config.db[cont_name].update_one(
{'_id': _id, 'files.name': fileinfo['name']},
{'$set': update_set}
)
def add_fileinfo(cont_name, _id, fileinfo):
PROJECTION_FIELDS = ['group', 'name', 'label', 'timestamp', 'permissions', 'public'] config.db[cont_name].update_one({'_id': _id}, {'$push': {'files': fileinfo}})
def _find_or_create_destination_project(group_name, project_label, created, modified): def _find_or_create_destination_project(group_name, project_label, created, modified):
existing_group_ids = [g['_id'] for g in config.db.groups.find(None, ['_id'])] existing_group_ids = [g['_id'] for g in config.db.groups.find(None, ['_id'])]
...@@ -152,40 +164,38 @@ def update_container_hierarchy(metadata): ...@@ -152,40 +164,38 @@ def update_container_hierarchy(metadata):
project = metadata.get('project') project = metadata.get('project')
session = metadata.get('session') session = metadata.get('session')
acquisition = metadata.get('acquisition') acquisition = metadata.get('acquisition')
files_ = metadata.get('files') _set_hierarchy_ids(group, project, session, acquisition)
_check_hierarchy_consistency(group, project, session, acquisition) now = datetime.datetime.utcnow()
return if acquisition.get('timestamp'):
# now = datetime.datetime.utcnow() acquisition['timestamp'] = dateutil.parser.parse(acquisition['timestamp'])
# if acquisition.get('timestamp'): acquisition['modified'] = now
# acquisition['timestamp'] = dateutil.parser.parse(acquisition['timestamp']) acquisition_obj = _update_container({'_id': acquisition['_id']}, acquisition, 'acquisitions')
# acquisition['modified'] = now if acquisition.get('timestamp'):
# acquisition_obj = _update_container({'uid': acquisition_uid}, acquisition, 'acquisitions') session_obj = config.db.session.find_one_and_update(
# if acquisition.get('timestamp'): {'_id': acquisition_obj['session']},
# session_obj = config.db.session.find_one_and_update( {
# {'_id': acquisition_obj['session']}, '$min': dict(timestamp=acquisition['timestamp']),
# { '$set': dict(timezone=acquisition.get('timezone'))
# '$min': dict(timestamp=acquisition['timestamp']), },
# '$set': dict(timezone=acquisition.get('timezone')) return_document=pymongo.collection.ReturnDocument.AFTER
# }, )
# return_document=pymongo.collection.ReturnDocument.AFTER config.db.project.find_one_and_update(
# ) {'_id': session_obj['project']},
# config.db.project.find_one_and_update( {
# {'_id': session_obj['project']}, '$max': dict(timestamp=acquisition['timestamp']),
# { '$set': dict(timezone=acquisition.get('timezone'))
# '$max': dict(timestamp=acquisition['timestamp']), }
# '$set': dict(timezone=acquisition.get('timezone')) )
# } if session:
# ) session['modified'] = now
# if session: _update_container({'_id': session['_id']}, session, 'sessions')
# session['modified'] = now if project:
# _update_container({'uid': session['uid']}, session, 'sessions') project['modified'] = now
# if project: _update_container({'_id': project['_id']}, project, 'projects')
# project['modified'] = now if group:
# _update_container({'label': project['label']}, project, 'projects') group['modified'] = now
# if group: _update_container({'_id': group['_id']}, group, 'groups')
# group['modified'] = now return acquisition_obj
# _update_container({'_id': group['_id']}, group, 'groups')
# return TargetAcquisition(acquisition_obj, files_)
def _update_container(query, update, cont_name): def _update_container(query, update, cont_name):
return config.db[cont_name].find_one_and_update( return config.db[cont_name].find_one_and_update(
...@@ -196,44 +206,21 @@ def _update_container(query, update, cont_name): ...@@ -196,44 +206,21 @@ def _update_container(query, update, cont_name):
return_document=pymongo.collection.ReturnDocument.AFTER return_document=pymongo.collection.ReturnDocument.AFTER
) )
def _check_hierarchy_consistency(group, project, session, acquisition): def _set_hierarchy_ids(group, project, session, acquisition):
"""this method check the consistency of the container hierarchy provided. """this method sets the correct id on the hierarchy.
It is checking: If the acquisition can't be found it raises an error.
1) that each non null container has the required id field (FIXME should be removed when we enforce the metadata schema)
2) that each non null container exists
3) that the acquisition is not null
4) that each container provided (other than the acquisition) contains the acquisition
""" """
if not acquisition: acquisition['_id'] = bson.ObjectId(acquisition['_id'])
raise APIStorageException('acquisition is missing') acquisition_obj = config.db.acquisitions.find_one({'_id': acquisition['_id']})
if acquisition.get('uid') is None:
raise APIStorageException('acquisition uid is missing')
acquisition_obj = config.db.acquisitions.find_one({'uid': acquisition['uid']})
if acquisition_obj is None: if acquisition_obj is None:
raise APIStorageException('acquisition doesn''t exist') raise APIStorageException('acquisition doesn''t exist')
if session and session.get('uid') is None: session_obj = config.db.sessions.find_one({'_id': acquisition_obj['session']})
raise APIStorageException('session uid is missing')
if session: if session:
session_obj = config.db.sessions.find_one({'uid': session['uid']}) session['_id'] = session_obj['_id']
if session_obj is None: project_obj = config.db.projects.find_one({'_id': session_obj['project']})
raise APIStorageException('session doesn''t exist')
if session_obj['_id'] != acquisition_obj['session']:
raise APIStorageException('session doesn''t contain the acquisition')
else:
session_obj = config.db.sessions.find_one({'_id': acquisition_obj['session']})
if project and project.get('label') is None:
raise APIStorageException('project label is missing')
if project: if project:
project_obj = config.db.projects.find_one({'label': project['label']}) project['_id'] = project_obj['_id']
if project_obj is None: group_obj = config.db.groups.find_one({'_id': project_obj['group']})
raise APIStorageException('project doesn''t exist')
if project_obj['_id'] != session_obj['project']:
raise APIStorageException('project doesn''t contain the acquisition')
if group and group.get('_id') is None:
raise APIStorageException('group _id is missing')
if group: if group:
if group['_id'] != session_obj['group']: group['_id'] = group_obj['_id']
raise APIStorageException('group doesn''t contain the acquisition')
group_obj = config.db.groups.find_one({'_id': group['_id']})
if group_obj is None:
raise APIStorageException('group doesn''t exist')
...@@ -12,6 +12,12 @@ from . import config ...@@ -12,6 +12,12 @@ from . import config
log = config.log log = config.log
def move_file(path, target_path):
target_dir = os.path.dirname(target_path)
if not os.path.exists(target_dir):
os.makedirs(target_dir)
shutil.move(path, target_path)
class FileStoreException(Exception): class FileStoreException(Exception):
pass pass
...@@ -73,7 +79,7 @@ class FileStore(object): ...@@ -73,7 +79,7 @@ class FileStore(object):
self.hash_alg = hash_alg self.hash_alg = hash_alg
start_time = datetime.datetime.utcnow() start_time = datetime.datetime.utcnow()
if request.content_type == 'multipart/form-data': if request.content_type == 'multipart/form-data':
self._save_multipart_files(dest_path, hash_alg) self._save_multipart_file(dest_path, hash_alg)
self.payload = request.POST.mixed() self.payload = request.POST.mixed()
else: else:
self.payload = request.POST.mixed() self.payload = request.POST.mixed()
...@@ -105,10 +111,7 @@ class FileStore(object): ...@@ -105,10 +111,7 @@ class FileStore(object):
self.metadata = None self.metadata = None
def move_file(self, target_path): def move_file(self, target_path):
target_dir = os.path.dirname(target_path) move_file(self.path, target_path)
if not os.path.exists(target_dir):
os.makedirs(target_dir)
shutil.move(self.path, target_path)
self.path = target_path self.path = target_path
def identical(self, filepath, hash_): def identical(self, filepath, hash_):
...@@ -149,9 +152,11 @@ class MultiFileStore(object): ...@@ -149,9 +152,11 @@ class MultiFileStore(object):
for field in form: for field in form:
if form[field].filename: if form[field].filename:
filename = os.path.basename(form[field].filename) filename = os.path.basename(form[field].filename)
mimetype = util.guess_mimetype(filename)
self.files[filename] = { self.files[filename] = {
'file': form[field].file,
'hash': form[field].file.get_hash(), 'hash': form[field].file.get_hash(),
'size': os.path.getsize(os.path.join(dest_path, filename)) 'size': os.path.getsize(os.path.join(dest_path, filename)),
'mimetype': mimetype,
'filetype': util.guess_filetype(filename, mimetype),
'path': os.path.join(dest_path, filename)
} }
...@@ -4,7 +4,6 @@ ...@@ -4,7 +4,6 @@
"type": "object", "type": "object",
"allOf": [{"$ref": "container.json"}], "allOf": [{"$ref": "container.json"}],
"properties": { "properties": {
"notes": {},
"public": {}, "public": {},
"label": {}, "label": {},
"metadata": {}, "metadata": {},
......
...@@ -4,7 +4,6 @@ ...@@ -4,7 +4,6 @@
"type": "object", "type": "object",
"allOf": [{"$ref": "container.json"}], "allOf": [{"$ref": "container.json"}],
"properties": { "properties": {
"_id": {},
"public": {}, "public": {},
"label": {}, "label": {},
"metadata": {}, "metadata": {},
......
...@@ -3,13 +3,78 @@ ...@@ -3,13 +3,78 @@
"title": "EngineMetadata", "title": "EngineMetadata",
"type": "object", "type": "object",
"properties": { "properties": {
"group": {"$ref": "group.json"}, "group": {
"project": {"$ref": "project.json"}, "type": "object",
"session": {"$ref": "session.json"}, "properties": {
"acquisition": {"$ref": "acquisition.json"}, "_id": {
"files": { "maxLength": 32,
"type": "array", "minLength": 2,
"items": {"$ref": "file.json"} "pattern": "^[0-9a-z][0-9a-z.@_-]{0,30}[0-9a-z]$",
"title": "ID",
"type": "string"
},
"name": {
"maxLength": 32,
"minLength": 2,
"pattern": "^[0-9A-Za-z][0-9A-Za-z .@_-]{0,30}[0-9A-Za-z]$",
"title": "Name",
"type": "string"
}
},
"additionalProperties": false
},
"project": {
"type": "object",
"allOf": [{"$ref": "container.json"}],
"properties": {
"_id": {},
"public": {},
"label": {},
"metadata": {},
"group": {"type": "string"}
},
"additionalProperties": false
},
"session": {
"type": "object",
"allOf": [{"$ref": "container.json"}],
"properties": {
"_id": {},
"public": {},
"label": {},
"metadata": {},
"project": {"type": "string"},
"uid": {"type": "string"},
"timestamp": {"type": "string"},
"timezone": {"type": "string"},
"subject": {"$ref": "subject.json"}
},
"additionalProperties": false
},
"acquisition": {
"type": "object",
"allOf": [{"$ref": "container.json"}],
"properties": {
"_id": {},
"public": {},
"label": {},
"metadata": {},
"session": {"type": "string"},
"uid": {"type": "string"},
"instrument": {"type": "string"},
"measurement": {"type": "string"},
"timestamp": {"type": "string"},
"timezone": {"type": "string"},
"files": {
"type": "array",
"items": {"$ref": "file.json"}
}
},
"required": ["_id"],
"additionalProperties": false
} }
}, },
"required": ["acquisition"], "required": ["acquisition"],
......
...@@ -4,7 +4,6 @@ ...@@ -4,7 +4,6 @@
"type": "object", "type": "object",
"allOf": [{"$ref": "container.json"}], "allOf": [{"$ref": "container.json"}],
"properties": { "properties": {
"_id": {},
"public": {}, "public": {},
"label": {}, "label": {},
"metadata": {}, "metadata": {},
......
...@@ -6,7 +6,6 @@ ...@@ -6,7 +6,6 @@
"properties": { "properties": {
"created": {}, "created": {},
"modified": {}, "modified": {},
"notes": {},
"permissions": {}, "permissions": {},
"files": {}, "files": {},
"public": {}, "public": {},
......
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
"_id": {}, "_id": {},
"created": {}, "created": {},
"modified": {}, "modified": {},
"notes": {},
"permissions": {}, "permissions": {},
"files": {}, "files": {},
"public": {}, "public": {},
......
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
"_id": {}, "_id": {},
"created": {}, "created": {},
"modified": {}, "modified": {},
"notes": {},
"permissions": {}, "permissions": {},
"files": {}, "files": {},
"public": {}, "public": {},
......
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
"_id": {}, "_id": {},
"created": {}, "created": {},
"modified": {}, "modified": {},
"notes": {},
"permissions": {}, "permissions": {},
"files": {}, "files": {},
"public": {}, "public": {},
......
import os
import cgi
import json
import shutil
import hashlib
import zipfile
import datetime
from . import util
from . import config
log = config.log
class FileStoreException(Exception):
pass
class HashingFile(file):
def __init__(self, file_path, hash_alg):
super(HashingFile, self).__init__(file_path, "wb")
self.hash_alg = hashlib.new(hash_alg)
def write(self, data):
self.hash_alg.update(data)
return file.write(self, data)
def get_hash(self):
return self.hash_alg.hexdigest()
def getHashingFieldStorage(upload_dir, hash_alg):
class HashingFieldStorage(cgi.FieldStorage):
bufsize = 2**20
def make_file(self, binary=None):
self.open_file = HashingFile(os.path.join(upload_dir, os.path.basename(self.filename)), hash_alg)
return self.open_file
# override private method __write of superclass FieldStorage
# _FieldStorage__file is the private variable __file of the same class
def _FieldStorage__write(self, line):
if self._FieldStorage__file is not None:
# use the make_file method only if the form includes a filename
# e.g. do not create a file and a hash for the form metadata.
if self.filename:
self.file = self.make_file('')
self.file.write(self._FieldStorage__file.getvalue())
self._FieldStorage__file = None
self.file.write(line)
def get_hash(self):
return self.open_file.get_hash()
return HashingFieldStorage
def prepare_multipart_upload(payload, body, dest_path, environ, hash_alg='sha384'):
environ.setdefault('CONTENT_LENGTH', '0')
environ['QUERY_STRING'] = ''
form = getHashingFieldStorage(dest_path, hash_alg)(fp=body, environ=environ, keep_blank_values=True)
self.tags = json.loads(form['tags'].file.getvalue()) if 'tags' in form else None
self.metadata = json.loads(form['metadata'].file.getvalue()) if 'metadata' in form else None
for field in form:
self.received_file = form[field].file
self.filename = os.path.basename(form[field].filename)
def prepare_body_upload(payload, body, dest_path, hash_alg='sha384'):
class FileStore(object):
"""This class provides and interface for file uploads.
To perform an upload the client of the class should follow these steps:
1) initialize the request
2) save a temporary file
3) check identical
4) move the temporary file to its destination
The operations could be safely interleaved with other actions like permission checks or database updates.
"""
def __init__(self, request, dest_path, filename=None, hash_alg='sha384'):
self.body = request.body_file
self.environ = request.environ.copy()
self.environ.setdefault('CONTENT_LENGTH', '0')
self.environ['QUERY_STRING'] = ''
self.hash_alg = hash_alg
start_time = datetime.datetime.utcnow()
if request.content_type == 'multipart/form-data':
self._save_multipart_file(dest_path, hash_alg)
self.payload = request.POST.mixed()
else:
self.payload = request.POST.mixed()
self.filename = filename or self.payload.get('filename')
self._save_body_file(dest_path, filename, hash_alg)
self.path = os.path.join(dest_path, self.filename)
self.duration = datetime.datetime.utcnow() - start_time
self.mimetype = util.guess_mimetype(self.filename)
self.filetype = util.guess_filetype(self.filename, self.mimetype)
self.hash = self.received_file.get_hash()
self.size = os.path.getsize(self.path)
def _save_multipart_file(self, dest_path, hash_alg):
form = getHashingFieldStorage(dest_path, hash_alg)(fp=self.body, environ=self.environ, keep_blank_values=True)
self.received_file = form['file'].file
self.filename = os.path.basename(form['file'].filename)
self.tags = json.loads(form['tags'].file.getvalue()) if 'tags' in form else None
self.metadata = json.loads(form['metadata'].file.getvalue()) if 'metadata' in form else None
def _save_body_file(self, dest_path, filename, hash_alg):
if not filename:
raise FileStoreException('filename is required for body uploads')
self.filename = os.path.basename(filename)
self.received_file = HashingFile(os.path.join(dest_path, filename), hash_alg)
for chunk in iter(lambda: self.body.read(2**20), ''):
self.received_file.write(chunk)
self.tags = None
self.metadata = None
def move_file(self, target_path):
target_dir = os.path.dirname(target_path)
if not os.path.exists(target_dir):
os.makedirs(target_dir)
shutil.move(self.path, target_path)
self.path = target_path
def identical(self, filepath, hash_):
if zipfile.is_zipfile(filepath) and zipfile.is_zipfile(self.path):
with zipfile.ZipFile(filepath) as zf1, zipfile.ZipFile(self.path) as zf2:
zf1_infolist = sorted(zf1.infolist(), key=lambda zi: zi.filename)
zf2_infolist = sorted(zf2.infolist(), key=lambda zi: zi.filename)
if zf1.comment != zf2.comment:
return False
if len(zf1_infolist) != len(zf2_infolist):
return False
for zii, zij in zip(zf1_infolist, zf2_infolist):
if zii.CRC != zij.CRC:
return False
else:
return True
else:
return hash_ == self.hash
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment