Skip to content
Snippets Groups Projects
Commit 1783b5a8 authored by Renzo Frigato's avatar Renzo Frigato
Browse files

expose stub endpoint for the engine

parent df277024
No related branches found
No related tags found
No related merge requests found
Showing
with 336 additions and 32 deletions
......@@ -55,6 +55,7 @@ routes = [
webapp2_extras.routes.PathPrefixRoute(r'/api', [
webapp2.Route(r'/download', core.Core, handler_method='download', methods=['GET', 'POST'], name='download'),
webapp2.Route(r'/reaper', core.Core, handler_method='reaper', methods=['POST']),
webapp2.Route(r'/engine', core.Core, handler_method='engine', methods=['POST']),
webapp2.Route(r'/sites', core.Core, handler_method='sites', methods=['GET']),
webapp2.Route(r'/register', core.Core, handler_method='register', methods=['POST']),
webapp2.Route(r'/config', core.Config, methods=['GET']),
......
......@@ -180,6 +180,21 @@ class Core(base.RequestHandler):
throughput = file_store.size / file_store.duration.total_seconds()
log.info('Received %s [%s, %s/s] from %s' % (file_store.filename, util.hrsize(file_store.size), util.hrsize(throughput), self.request.client_addr))
def engine(self):
"""Receive a sortable reaper upload."""
if not self.superuser_request:
self.abort(402, 'uploads must be from an authorized drone')
with tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) as tempdir_path:
try:
file_store = files.MultiFileStore(self.request, tempdir_path)
except files.FileStoreException as e:
self.abort(400, str(e))
if not file_store.metadata:
self.abort(400, 'metadata is missing')
metadata_validator = validators.payload_from_schema_file(self, 'input/enginemetadata.json')
metadata_validator(file_store.metadata, 'POST')
return [{'filename': k, 'hash': v['hash'], 'size': v['size']} for k, v in file_store.files.items()]
def _preflight_archivestream(self, req_spec):
data_path = config.get_item('persistent', 'data_path')
arc_prefix = 'sdm'
......
......@@ -3,13 +3,14 @@ import pymongo
import datetime
import dateutil.parser
from .. import util
from .. import config
from . import APIStorageException
log = config.log
class ReapedAcquisition(object):
class TargetAcquisition(object):
def __init__(self, acquisition, fileinfo):
self.acquisition = acquisition
......@@ -144,4 +145,95 @@ def create_container_hierarchy(metadata):
upsert=True,
return_document=pymongo.collection.ReturnDocument.AFTER,
)
return ReapedAcquisition(acquisition_obj, file_)
return TargetAcquisition(acquisition_obj, file_)
def update_container_hierarchy(metadata):
group = metadata.get('group')
project = metadata.get('project')
session = metadata.get('session')
acquisition = metadata.get('acquisition')
files_ = metadata.get('files')
_check_hierarchy_consistency(group, project, session, acquisition)
return
# now = datetime.datetime.utcnow()
# if acquisition.get('timestamp'):
# acquisition['timestamp'] = dateutil.parser.parse(acquisition['timestamp'])
# acquisition['modified'] = now
# acquisition_obj = _update_container({'uid': acquisition_uid}, acquisition, 'acquisitions')
# if acquisition.get('timestamp'):
# session_obj = config.db.session.find_one_and_update(
# {'_id': acquisition_obj['session']},
# {
# '$min': dict(timestamp=acquisition['timestamp']),
# '$set': dict(timezone=acquisition.get('timezone'))
# },
# return_document=pymongo.collection.ReturnDocument.AFTER
# )
# config.db.project.find_one_and_update(
# {'_id': session_obj['project']},
# {
# '$max': dict(timestamp=acquisition['timestamp']),
# '$set': dict(timezone=acquisition.get('timezone'))
# }
# )
# if session:
# session['modified'] = now
# _update_container({'uid': session['uid']}, session, 'sessions')
# if project:
# project['modified'] = now
# _update_container({'label': project['label']}, project, 'projects')
# if group:
# group['modified'] = now
# _update_container({'_id': group['_id']}, group, 'groups')
# return TargetAcquisition(acquisition_obj, files_)
def _update_container(query, update, cont_name):
return config.db[cont_name].find_one_and_update(
query,
{
'$set': util.mongo_dict(update)
},
return_document=pymongo.collection.ReturnDocument.AFTER
)
def _check_hierarchy_consistency(group, project, session, acquisition):
"""this method check the consistency of the container hierarchy provided.
It is checking:
1) that each non null container has the required id field (FIXME should be removed when we enforce the metadata schema)
2) that each non null container exists
3) that the acquisition is not null
4) that each container provided (other than the acquisition) contains the acquisition
"""
if not acquisition:
raise APIStorageException('acquisition is missing')
if acquisition.get('uid') is None:
raise APIStorageException('acquisition uid is missing')
acquisition_obj = config.db.acquisitions.find_one({'uid': acquisition['uid']})
if acquisition_obj is None:
raise APIStorageException('acquisition doesn''t exist')
if session and session.get('uid') is None:
raise APIStorageException('session uid is missing')
if session:
session_obj = config.db.sessions.find_one({'uid': session['uid']})
if session_obj is None:
raise APIStorageException('session doesn''t exist')
if session_obj['_id'] != acquisition_obj['session']:
raise APIStorageException('session doesn''t contain the acquisition')
else:
session_obj = config.db.sessions.find_one({'_id': acquisition_obj['session']})
if project and project.get('label') is None:
raise APIStorageException('project label is missing')
if project:
project_obj = config.db.projects.find_one({'label': project['label']})
if project_obj is None:
raise APIStorageException('project doesn''t exist')
if project_obj['_id'] != session_obj['project']:
raise APIStorageException('project doesn''t contain the acquisition')
if group and group.get('_id') is None:
raise APIStorageException('group _id is missing')
if group:
if group['_id'] != session_obj['group']:
raise APIStorageException('group doesn''t contain the acquisition')
group_obj = config.db.groups.find_one({'_id': group['_id']})
if group_obj is None:
raise APIStorageException('group doesn''t exist')
......@@ -28,8 +28,6 @@ class HashingFile(file):
return self.hash_alg.hexdigest()
def getHashingFieldStorage(upload_dir, hash_alg):
class HashingFieldStorage(cgi.FieldStorage):
bufsize = 2**20
......@@ -68,7 +66,6 @@ class FileStore(object):
"""
def __init__(self, request, dest_path, filename=None, hash_alg='sha384'):
self.client_addr = request.client_addr
self.body = request.body_file
self.environ = request.environ.copy()
self.environ.setdefault('CONTENT_LENGTH', '0')
......@@ -76,7 +73,7 @@ class FileStore(object):
self.hash_alg = hash_alg
start_time = datetime.datetime.utcnow()
if request.content_type == 'multipart/form-data':
self._save_multipart_file(dest_path, hash_alg)
self._save_multipart_files(dest_path, hash_alg)
self.payload = request.POST.mixed()
else:
self.payload = request.POST.mixed()
......@@ -130,3 +127,31 @@ class FileStore(object):
return True
else:
return hash_ == self.hash
class MultiFileStore(object):
"""This class provides and interface for file uploads.
"""
def __init__(self, request, dest_path, filename=None, hash_alg='sha384'):
self.body = request.body_file
self.environ = request.environ.copy()
self.environ.setdefault('CONTENT_LENGTH', '0')
self.environ['QUERY_STRING'] = ''
self.hash_alg = hash_alg
self.files = {}
self._save_multipart_files(dest_path, hash_alg)
self.payload = request.POST.mixed()
def _save_multipart_files(self, dest_path, hash_alg):
form = getHashingFieldStorage(dest_path, hash_alg)(fp=self.body, environ=self.environ, keep_blank_values=True)
self.tags = json.loads(form['tags'].file.getvalue()) if 'tags' in form else None
self.metadata = json.loads(form['metadata'].file.getvalue()) if 'metadata' in form else None
for field in form:
if form[field].filename:
filename = os.path.basename(form[field].filename)
self.files[filename] = {
'file': form[field].file,
'hash': form[field].file.get_hash(),
'size': os.path.getsize(os.path.join(dest_path, filename))
}
......@@ -2,7 +2,7 @@
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Acquisition",
"type": "object",
"allOf": [{"$ref": "input/container.json"}],
"allOf": [{"$ref": "container.json"}],
"properties": {
"notes": {},
"public": {},
......
......@@ -2,7 +2,7 @@
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Collection",
"type": "object",
"allOf": [{"$ref": "input/container.json"}],
"allOf": [{"$ref": "container.json"}],
"properties": {
"_id": {},
"public": {},
......
......@@ -48,8 +48,8 @@
"items": {
"type": "object",
"properties": {
"tags": {"$ref": "input/download.json#/definitions/filterDefinition"},
"types": {"$ref": "input/download.json#/definitions/filterDefinition"}
"tags": {"$ref": "download.json#/definitions/filterDefinition"},
"types": {"$ref": "download.json#/definitions/filterDefinition"}
}
}
}
......
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "EngineMetadata",
"type": "object",
"properties": {
"group": {"$ref": "group.json"},
"project": {"$ref": "project.json"},
"session": {"$ref": "session.json"},
"acquisition": {"$ref": "acquisition.json"},
"files": {
"type": "array",
"items": {"$ref": "file.json"}
}
},
"required": ["acquisition"],
"additionalProperties": false
}
......@@ -2,7 +2,7 @@
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Project",
"type": "object",
"allOf": [{"$ref": "input/container.json"}],
"allOf": [{"$ref": "container.json"}],
"properties": {
"_id": {},
"public": {},
......
......@@ -2,7 +2,7 @@
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Session",
"type": "object",
"allOf": [{"$ref": "input/container.json"}],
"allOf": [{"$ref": "container.json"}],
"properties": {
"public": {},
"label": {},
......@@ -12,7 +12,7 @@
"uid": {"type": "string"},
"timestamp": {"type": "string"},
"timezone": {"type": "string"},
"subject": {"$ref": "input/subject.json"}
"subject": {"$ref": "subject.json"}
},
"additionalProperties": false
}
......@@ -2,7 +2,7 @@
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Acquisition",
"type": "object",
"allOf": [{"$ref": "mongo/container.json"}],
"allOf": [{"$ref": "container.json"}],
"properties": {
"created": {},
"modified": {},
......
......@@ -2,7 +2,7 @@
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Collection",
"type": "object",
"allOf": [{"$ref": "mongo/container.json"}],
"allOf": [{"$ref": "container.json"}],
"properties": {
"_id": {},
"created": {},
......
......@@ -21,7 +21,7 @@
},
"roles": {
"type": "array",
"items": {"$ref": "mongo/permission.json"},
"items": {"$ref": "permission.json"},
"title": "Roles",
"default": [],
"uniqueItems": true
......
......@@ -2,7 +2,7 @@
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Project",
"type": "object",
"allOf": [{"$ref": "mongo/container.json"}],
"allOf": [{"$ref": "container.json"}],
"properties": {
"_id": {},
"created": {},
......
......@@ -2,7 +2,7 @@
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Session",
"type": "object",
"allOf": [{"$ref": "mongo/container.json"}],
"allOf": [{"$ref": "container.json"}],
"properties": {
"_id": {},
"created": {},
......@@ -20,7 +20,7 @@
"uid": {"type": "string"},
"timestamp": {"type": "string"},
"timezone": {"type": "string"},
"subject": {"$ref": "mongo/subject.json"}
"subject": {"$ref": "subject.json"}
},
"additionalProperties": false
}
......@@ -29,7 +29,7 @@
"title": "Email",
"type": "string"
},
"avatars": { "$ref": "mongo/avatars.json"},
"avatars": { "$ref": "avatars.json"},
"avatar": {
"format": "uri",
"title": "Avatar",
......
import os
import cgi
import json
import shutil
import hashlib
import zipfile
import datetime
from . import util
from . import config
log = config.log
class FileStoreException(Exception):
pass
class HashingFile(file):
def __init__(self, file_path, hash_alg):
super(HashingFile, self).__init__(file_path, "wb")
self.hash_alg = hashlib.new(hash_alg)
def write(self, data):
self.hash_alg.update(data)
return file.write(self, data)
def get_hash(self):
return self.hash_alg.hexdigest()
def getHashingFieldStorage(upload_dir, hash_alg):
class HashingFieldStorage(cgi.FieldStorage):
bufsize = 2**20
def make_file(self, binary=None):
self.open_file = HashingFile(os.path.join(upload_dir, os.path.basename(self.filename)), hash_alg)
return self.open_file
# override private method __write of superclass FieldStorage
# _FieldStorage__file is the private variable __file of the same class
def _FieldStorage__write(self, line):
if self._FieldStorage__file is not None:
# use the make_file method only if the form includes a filename
# e.g. do not create a file and a hash for the form metadata.
if self.filename:
self.file = self.make_file('')
self.file.write(self._FieldStorage__file.getvalue())
self._FieldStorage__file = None
self.file.write(line)
def get_hash(self):
return self.open_file.get_hash()
return HashingFieldStorage
def prepare_multipart_upload(payload, body, dest_path, environ, hash_alg='sha384'):
environ.setdefault('CONTENT_LENGTH', '0')
environ['QUERY_STRING'] = ''
form = getHashingFieldStorage(dest_path, hash_alg)(fp=body, environ=environ, keep_blank_values=True)
self.tags = json.loads(form['tags'].file.getvalue()) if 'tags' in form else None
self.metadata = json.loads(form['metadata'].file.getvalue()) if 'metadata' in form else None
for field in form:
self.received_file = form[field].file
self.filename = os.path.basename(form[field].filename)
def prepare_body_upload(payload, body, dest_path, hash_alg='sha384'):
class FileStore(object):
"""This class provides and interface for file uploads.
To perform an upload the client of the class should follow these steps:
1) initialize the request
2) save a temporary file
3) check identical
4) move the temporary file to its destination
The operations could be safely interleaved with other actions like permission checks or database updates.
"""
def __init__(self, request, dest_path, filename=None, hash_alg='sha384'):
self.body = request.body_file
self.environ = request.environ.copy()
self.environ.setdefault('CONTENT_LENGTH', '0')
self.environ['QUERY_STRING'] = ''
self.hash_alg = hash_alg
start_time = datetime.datetime.utcnow()
if request.content_type == 'multipart/form-data':
self._save_multipart_file(dest_path, hash_alg)
self.payload = request.POST.mixed()
else:
self.payload = request.POST.mixed()
self.filename = filename or self.payload.get('filename')
self._save_body_file(dest_path, filename, hash_alg)
self.path = os.path.join(dest_path, self.filename)
self.duration = datetime.datetime.utcnow() - start_time
self.mimetype = util.guess_mimetype(self.filename)
self.filetype = util.guess_filetype(self.filename, self.mimetype)
self.hash = self.received_file.get_hash()
self.size = os.path.getsize(self.path)
def _save_multipart_file(self, dest_path, hash_alg):
form = getHashingFieldStorage(dest_path, hash_alg)(fp=self.body, environ=self.environ, keep_blank_values=True)
self.received_file = form['file'].file
self.filename = os.path.basename(form['file'].filename)
self.tags = json.loads(form['tags'].file.getvalue()) if 'tags' in form else None
self.metadata = json.loads(form['metadata'].file.getvalue()) if 'metadata' in form else None
def _save_body_file(self, dest_path, filename, hash_alg):
if not filename:
raise FileStoreException('filename is required for body uploads')
self.filename = os.path.basename(filename)
self.received_file = HashingFile(os.path.join(dest_path, filename), hash_alg)
for chunk in iter(lambda: self.body.read(2**20), ''):
self.received_file.write(chunk)
self.tags = None
self.metadata = None
def move_file(self, target_path):
target_dir = os.path.dirname(target_path)
if not os.path.exists(target_dir):
os.makedirs(target_dir)
shutil.move(self.path, target_path)
self.path = target_path
def identical(self, filepath, hash_):
if zipfile.is_zipfile(filepath) and zipfile.is_zipfile(self.path):
with zipfile.ZipFile(filepath) as zf1, zipfile.ZipFile(self.path) as zf2:
zf1_infolist = sorted(zf1.infolist(), key=lambda zi: zi.filename)
zf2_infolist = sorted(zf2.infolist(), key=lambda zi: zi.filename)
if zf1.comment != zf2.comment:
return False
if len(zf1_infolist) != len(zf2_infolist):
return False
for zii, zij in zip(zf1_infolist, zf2_infolist):
if zii.CRC != zij.CRC:
return False
else:
return True
else:
return hash_ == self.hash
......@@ -10,7 +10,8 @@ log = config.log
# json schema files are expected to be in the schemas folder relative to this module
schema_path = os.path.abspath(os.path.dirname(__file__))
resolver = jsonschema.RefResolver('file://' + schema_path + '/schemas/', None)
resolver_input = jsonschema.RefResolver('file://' + schema_path + '/schemas/input/', None)
resolver_mongo = jsonschema.RefResolver('file://' + schema_path + '/schemas/mongo/', None)
expected_mongo_schemas = set([
'acquisition.json',
......@@ -41,24 +42,25 @@ expected_input_schemas = set([
'user.json',
'avatars.json',
'download.json',
'tag.json'
'tag.json',
'enginemetadata.json'
])
mongo_schemas = set()
input_schemas = set()
# validate and cache schemas at start time
for schema_file in os.listdir(schema_path + '/schemas/mongo/'):
mongo_schemas.add(schema_file)
resolver.resolve('mongo/' + schema_file)
resolver_mongo.resolve(schema_file)
assert mongo_schemas == expected_mongo_schemas, '{} is different from {}'.format(mongo_schemas, expected_schemas)
assert mongo_schemas == expected_mongo_schemas, '{} is different from {}'.format(mongo_schemas, expected_mongo_schemas)
for schema_file in os.listdir(schema_path + '/schemas/input/'):
input_schemas.add(schema_file)
resolver.resolve('input/' + schema_file)
resolver_input.resolve(schema_file)
assert input_schemas == expected_input_schemas, '{} is different from {}'.format(input_schemas, expected_schemas)
assert input_schemas == expected_input_schemas, '{} is different from {}'.format(input_schemas, expected_input_schemas)
def _validate_json(json_data, schema):
def _validate_json(json_data, schema, resolver):
jsonschema.validate(json_data, schema, resolver=resolver)
#jsonschema.Draft4Validator(schema, resolver=resolver).validate(json_data)
......@@ -68,7 +70,8 @@ def no_op(g, *args):
def mongo_from_schema_file(handler, schema_file):
if schema_file is None:
return no_op
schema = resolver.resolve(schema_file)[1]
schema_file = schema_file.split('/')[1]
schema = resolver_mongo.resolve(schema_file)[1]
def g(exec_op):
def mongo_val(method, **kwargs):
payload = kwargs['payload']
......@@ -80,7 +83,7 @@ def mongo_from_schema_file(handler, schema_file):
_schema = schema
if method in ['POST', 'PUT']:
try:
_validate_json(payload, _schema)
_validate_json(payload, _schema, resolver_mongo)
except jsonschema.ValidationError as e:
handler.abort(500, str(e))
return exec_op(method, **kwargs)
......@@ -90,7 +93,8 @@ def mongo_from_schema_file(handler, schema_file):
def payload_from_schema_file(handler, schema_file):
if schema_file is None:
return no_op
schema = resolver.resolve(schema_file)[1]
schema_file = schema_file.split('/')[1]
schema = resolver_input.resolve(schema_file)[1]
def g(payload, method):
if method == 'PUT' and schema.get('required'):
_schema = copy.copy(schema)
......@@ -99,7 +103,7 @@ def payload_from_schema_file(handler, schema_file):
_schema = schema
if method in ['POST', 'PUT']:
try:
_validate_json(payload, _schema)
_validate_json(payload, _schema, resolver_input)
except jsonschema.ValidationError as e:
handler.abort(400, str(e))
return g
......@@ -121,7 +125,12 @@ def key_check(handler, schema_file):
"""
if schema_file is None:
return no_op
schema = resolver.resolve(schema_file)[1]
path_elements = schema_file.split('/')
schema_file = path_elements[1]
if path_elements[0] == 'mongo':
schema = resolver_mongo.resolve(schema_file)[1]
else:
schema = resolver_input.resolve(schema_file)[1]
log.debug(schema)
if schema.get('key_fields') is None:
return no_op
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment