Skip to content
Snippets Groups Projects
Commit 5a124dc5 authored by Renzo Frigato's avatar Renzo Frigato Committed by Megan Henning
Browse files

add analyses endpoint

parent 4a32bbd2
No related branches found
No related tags found
No related merge requests found
......@@ -266,6 +266,23 @@ routes = [
webapp2.Route(_format(r'/api/search/<cont_name:{cont_name_re}>'), searchhandler.SearchHandler, name='es_proxy', methods=['GET']),
webapp2.Route(_format(r'/api/schemas/<schema:{schema_re}>'), schemahandler.SchemaHandler, name='schemas', methods=['GET']),
webapp2.Route(r'/api/report/<report_type:site|project>', reporthandler.ReportHandler, methods=['GET']),
webapp2.Route(_format(r'/api/<cont_name:{cont_name_re}>/<cid:{cid_re}>/<list_name:analyses>'),
listhandler.AnalysesHandler, name='analysis_post', methods=['POST']),
webapp2.Route(_format(r'/api/<cont_name:{cont_name_re}>/<cid:{cid_re}>/<list_name:analyses>/<_id:{cid_re}>'),
listhandler.AnalysesHandler, name='analysis',
methods=['GET', 'DELETE']),
webapp2.Route(_format(r'/api/<cont_name:{cont_name_re}>/<cid:{cid_re}>/<list_name:analyses>/<_id:{cid_re}>/files'),
listhandler.AnalysesHandler, handler_method='download',
methods=['GET'], name='analysis_files'),
webapp2.Route(_format(r'/api/<cont_name:{cont_name_re}>/<cid:{cid_re}>/<list_name:analyses>/<_id:{cid_re}>/files/<name:{filename_re}>'),
listhandler.AnalysesHandler,
handler_method='download', name='analysis_single_file'),
webapp2.Route(_format(r'/api/<cont_name:{cont_name_re}>/<cid:{cid_re}>/<list_name:analyses>/<_id:{cid_re}>/notes'),
listhandler.AnalysesHandler, handler_method='add_note',
methods=['POST'], name='analysis_add_note'),
webapp2.Route(_format(r'/api/<cont_name:{cont_name_re}>/<cid:{cid_re}>/<list_name:analyses>/<_id:{cid_re}>/notes/<note_id:{cid_re}>'),
listhandler.AnalysesHandler, handler_method='delete_note',
methods=['DELETE'], name='analysis_delete_note'),
]
......
......@@ -182,3 +182,49 @@ class StringListStorage(ListStorage):
result = self.dbc.find_one(query, projection)
if result and result.get(self.list_name):
return result.get(self.list_name)[0]
class AnalysesStorage(ListStorage):
def get_fileinfo(self, _id, analysis_id, filename = None):
_id = bson.ObjectId(_id)
query = [
{'$match': {'_id' : _id}},
{'$unwind': '$' + self.list_name},
{'$match': {self.list_name+ '._id' : analysis_id}},
{'$unwind': '$' + self.list_name + '.files'}
]
if filename:
query.append(
{'$match': {self.list_name + '.files.name' : filename}}
)
log.error(query)
return [cont['analyses'] for cont in self.dbc.aggregate(query)]
def add_note(self, _id, analysis_id, payload):
_id = bson.ObjectId(_id)
query = {
'_id': _id,
'analyses._id': analysis_id
}
update = {
'$push': {
'analyses.$.notes': payload
}
}
return self.dbc.update_one(query, update)
def delete_note(self, _id, analysis_id, note_id):
_id = bson.ObjectId(_id)
query = {
'_id': _id,
'analyses._id': analysis_id
}
update = {
'$pull': {
'analyses.$.notes': {
'_id': note_id
}
}
}
return self.dbc.update_one(query, update)
......@@ -47,6 +47,32 @@ def _append_targets(targets, container, prefix, total_size, total_cnt, optional,
total_cnt += 1
return total_size, total_cnt
def symlinkarchivestream(ticket, data_path):
for filepath, arcpath, _ in ticket['target']:
t = tarfile.TarInfo(name=arcpath)
t.type = tarfile.SYMTYPE
t.linkname = os.path.relpath(filepath, data_path)
yield t.tobuf()
stream = cStringIO.StringIO()
with tarfile.open(mode='w|', fileobj=stream) as archive:
pass
yield stream.getvalue() # get tar stream trailer
stream.close()
def archivestream(ticket):
BLOCKSIZE = 512
CHUNKSIZE = 2**20 # stream files in 1MB chunks
stream = cStringIO.StringIO()
with tarfile.open(mode='w|', fileobj=stream) as archive:
for filepath, arcpath, _ in ticket['target']:
yield archive.gettarinfo(filepath, arcpath).tobuf()
with open(filepath, 'rb') as fd:
for chunk in iter(lambda: fd.read(CHUNKSIZE), ''):
yield chunk
if len(chunk) % BLOCKSIZE != 0:
yield (BLOCKSIZE - (len(chunk) % BLOCKSIZE)) * b'\0'
yield stream.getvalue() # get tar stream trailer
stream.close()
class Download(base.RequestHandler):
......@@ -125,33 +151,6 @@ class Download(base.RequestHandler):
used_subpaths[parent_id] = used_subpaths.get(parent_id, []) + [path]
return path
def _archivestream(self, ticket):
BLOCKSIZE = 512
CHUNKSIZE = 2**20 # stream files in 1MB chunks
stream = cStringIO.StringIO()
with tarfile.open(mode='w|', fileobj=stream) as archive:
for filepath, arcpath, _ in ticket['target']:
yield archive.gettarinfo(filepath, arcpath).tobuf()
with open(filepath, 'rb') as fd:
for chunk in iter(lambda: fd.read(CHUNKSIZE), ''):
yield chunk
if len(chunk) % BLOCKSIZE != 0:
yield (BLOCKSIZE - (len(chunk) % BLOCKSIZE)) * b'\0'
yield stream.getvalue() # get tar stream trailer
stream.close()
def _symlinkarchivestream(self, ticket, data_path):
for filepath, arcpath, _ in ticket['target']:
t = tarfile.TarInfo(name=arcpath)
t.type = tarfile.SYMTYPE
t.linkname = os.path.relpath(filepath, data_path)
yield t.tobuf()
stream = cStringIO.StringIO()
with tarfile.open(mode='w|', fileobj=stream) as archive:
pass
yield stream.getvalue() # get tar stream trailer
stream.close()
def download(self):
"""
.. http:get:: /api/download
......@@ -207,9 +206,9 @@ class Download(base.RequestHandler):
if ticket['ip'] != self.request.client_addr:
self.abort(400, 'ticket not for this source IP')
if self.get_param('symlinks'):
self.response.app_iter = self._symlinkarchivestream(ticket, config.get_item('persistent', 'data_path'))
self.response.app_iter = symlinkarchivestream(ticket, config.get_item('persistent', 'data_path'))
else:
self.response.app_iter = self._archivestream(ticket)
self.response.app_iter = archivestream(ticket)
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename=' + str(ticket['filename'])
for project_id in ticket['projects']:
......
......@@ -13,6 +13,7 @@ from .. import files
from ..jobs import rules
from .. import tempdir as tempfile
from .. import upload
from .. import download
from .. import util
from .. import validators
from ..auth import listauth, always_ok
......@@ -61,6 +62,13 @@ def initialize_list_configurations():
'storage_schema_file': 'note.json',
'input_schema_file': 'note.json'
},
'analyses': {
'storage': liststorage.AnalysesStorage,
'permchecker': listauth.default_sublist,
'use_object_id': True,
'storage_schema_file': 'analysis.json',
'input_schema_file': 'analysis.json'
}
}
list_handler_configurations = {
'groups': {
......@@ -576,3 +584,132 @@ class FileListHandler(ListHandler):
metadata = json.loads(self.request.GET.get('metadata'))
return upload.process_upload(self.request, upload.Strategy.packfile, origin=self.origin, context={'token': token_id}, response=self.response, metadata=metadata)
class AnalysesHandler(ListHandler):
def _check_ticket(self, ticket_id, _id, filename):
ticket = config.db.downloads.find_one({'_id': ticket_id})
if not ticket:
self.abort(404, 'no such ticket')
if ticket['ip'] != self.request.client_addr:
self.abort(400, 'ticket not for this source IP')
if not filename:
return self._check_ticket_for_batch(ticket)
if ticket.get('filename') != filename or ticket['target'] != _id:
self.abort(400, 'ticket not for this resource')
return ticket
def _check_ticket_for_batch(self, ticket):
if ticket.get('type') != 'batch':
self.abort(400, 'ticket not for this resource')
return ticket
def put(self, *args, **kwargs):
raise NotImplementedError("an analysis can't be modified")
def post(self, cont_name, list_name, **kwargs):
_id = kwargs.pop('cid')
container, permchecker, storage, mongo_validator, _, keycheck = self._initialize_request(cont_name, list_name, _id)
permchecker(noop)('POST', _id=_id)
payload = upload.process_upload(self.request, upload.Strategy.analysis, origin=self.origin)
payload['created'] = datetime.datetime.utcnow()
payload['user'] = payload.get('user', self.uid)
result = keycheck(mongo_validator(storage.exec_op))('POST', _id=_id, payload=payload)
if result.modified_count == 1:
return {'_id': payload['_id']}
else:
self.abort(404, 'Element not added in list {} of container {} {}'.format(storage.list_name, storage.cont_name, _id))
def download(self, cont_name, list_name, **kwargs):
_id = kwargs.pop('cid')
container, permchecker, storage, _, _, _ = self._initialize_request(cont_name, list_name, _id)
filename = kwargs.get('name')
ticket_id = self.get_param('ticket')
if not ticket_id:
permchecker(noop)('GET', _id=_id)
analysis_id = kwargs.get('_id')
fileinfo = [f['files'] for f in storage.get_fileinfo(_id, analysis_id, filename)]
if not ticket_id:
if filename:
total_size = fileinfo[0]['size']
file_cnt = 1
ticket = util.download_ticket(self.request.client_addr, 'file', _id, filename, total_size)
else:
targets, total_size, file_cnt = self._prepare_batch(fileinfo)
filename = 'analysis_' + analysis_id + '.tar'
ticket = util.download_ticket(self.request.client_addr, 'batch', targets, filename, total_size)
return {
'ticket': config.db.downloads.insert_one(ticket).inserted_id,
'size': total_size,
'file_cnt': file_cnt,
'filename': filename
}
else:
ticket = self._check_ticket(ticket_id, _id, filename)
if not filename:
self._send_batch(ticket)
return
if not fileinfo:
self.abort(404, '{} doesn''t exist'.format(filename))
fileinfo = fileinfo[0]
filepath = os.path.join(
config.get_item('persistent', 'data_path'),
util.path_from_hash(fileinfo['hash'])
)
filename = fileinfo['name']
self.response.app_iter = open(filepath, 'rb')
self.response.headers['Content-Length'] = str(fileinfo['size']) # must be set after setting app_iter
if self.is_true('view'):
self.response.headers['Content-Type'] = str(fileinfo.get('mimetype', 'application/octet-stream'))
else:
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename=' + str(filename)
def _prepare_batch(self, fileinfo):
## duplicated code from download.py
## we need a way to avoid this
targets = []
total_size = total_cnt = 0
data_path = config.get_item('persistent', 'data_path')
for f in fileinfo:
filepath = os.path.join(data_path, util.path_from_hash(f['hash']))
log.error(filepath)
if os.path.exists(filepath): # silently skip missing files
targets.append((filepath, 'analyses/' + f['name'], f['size']))
total_size += f['size']
total_cnt += 1
return targets, total_size, total_cnt
def _send_batch(self, ticket):
self.response.app_iter = download.archivestream(ticket)
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename=' + str(ticket['filename'])
def delete_note(self, cont_name, list_name, **kwargs):
_id = kwargs.pop('cid')
analysis_id = kwargs.pop('_id')
container, permchecker, storage, _, _, _ = self._initialize_request(cont_name, list_name, _id)
note_id = kwargs.get('note_id')
permchecker(noop)('DELETE', _id=_id)
result = storage.delete_note(_id=_id, analysis_id=analysis_id, note_id=note_id)
if result.modified_count == 1:
return {'modified':result.modified_count}
else:
self.abort(404, 'Element not removed from list {} of container {} {}'.format(storage.list_name, storage.cont_name, _id))
def add_note(self, cont_name, list_name, **kwargs):
_id = kwargs.pop('cid')
analysis_id = kwargs.get('_id')
container, permchecker, storage, mongo_validator, input_validator, keycheck = self._initialize_request(cont_name, list_name, _id)
payload = self.request.json_body
input_validator(payload, 'POST')
payload['_id'] = str(bson.objectid.ObjectId())
payload['user'] = payload.get('user', self.uid)
payload['created'] = datetime.datetime.utcnow()
permchecker(noop)('POST', _id=_id)
result = storage.add_note(_id=_id, analysis_id=analysis_id, payload=payload)
if result.modified_count == 1:
return {'modified':result.modified_count}
else:
self.abort(404, 'Element not added in list {} of container {} {}'.format(storage.list_name, storage.cont_name, _id))
......@@ -73,7 +73,7 @@ class Placer(object):
if self.metadata == None:
raise Exception('Metadata required')
def save_file(self, field, info):
def save_file(self, field=None, info=None):
"""
Helper function that moves a file saved via a form field into our CAS.
May trigger jobs, if applicable, so this should only be called once we're ready for that.
......@@ -82,13 +82,15 @@ class Placer(object):
"""
# Save file
files.move_form_file_field_into_cas(field)
if field is not None:
files.move_form_file_field_into_cas(field)
# Update the DB
hierarchy.upsert_fileinfo(self.container_type, self.id, info)
if info is not None:
hierarchy.upsert_fileinfo(self.container_type, self.id, info)
# Queue any jobs as a result of this upload
rules.create_jobs(config.db, self.container, self.container_type, info)
# Queue any jobs as a result of this upload
rules.create_jobs(config.db, self.container, self.container_type, info)
class TargetedPlacer(Placer):
......@@ -475,3 +477,32 @@ class PackfilePlacer(Placer):
'event': 'result',
'data': result,
})
class AnalysisPlacer(Placer):
def check(self):
self.requireMetadata()
#validators.validate_data(self.metadata, 'analysys.json', 'input', 'POST', optional=True)
self.saved = []
def process_file_field(self, field, info):
self.save_file(field)
self.saved.append(info)
def finalize(self):
# we are going to merge the "hard" infos from the processed upload
# with the infos from the payload
metadata_infos = {}
for info in self.metadata.pop('inputs', []):
info['input'] = True
metadata_infos[info['name']] = info
for info in self.metadata.pop('outputs', []):
info['output'] = True
metadata_infos[info['name']] = info
self.metadata['files'] = []
for info in self.saved:
metadata_info = metadata_infos.get(info['name'], {})
metadata_info.update(info)
self.metadata['files'].append(metadata_info)
self.metadata['_id'] = str(bson.objectid.ObjectId())
return self.metadata
......@@ -23,6 +23,7 @@ Strategy = util.Enum('Strategy', {
'packfile' : pl.PackfilePlacer, # Upload N files as a new packfile to a container.
'labelupload': pl.LabelPlacer,
'uidupload' : pl.UIDPlacer,
'analysis' : pl.AnalysisPlacer # Upload N files for an analysys (no db updates)
})
def process_upload(request, strategy, container_type=None, id=None, origin=None, context=None, response=None, metadata=None):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment