Newer
Older
from . import util
from . import validators
def _filter_check(property_filter, property_values):
minus = set(property_filter.get('-', []))
plus = set(property_filter.get('+', []))
if not minus.isdisjoint(property_values):
return False
if plus and plus.isdisjoint(property_values):
return False
return True
def _append_targets(targets, container, prefix, total_size, total_cnt, optional, data_path, filters):
if filters:
filtered = True
for filter_ in filters:
type_as_list = [f['type']] if f.get('type') else []
if (
_filter_check(filter_.get('tags', {}), f.get('tags', [])) and
_filter_check(filter_.get('types', {}), type_as_list)
):
filtered = False
break
if filtered:
continue
if optional or not f.get('optional', False):
filepath = os.path.join(data_path, util.path_from_hash(f['hash']))
if os.path.exists(filepath): # silently skip missing files
targets.append((filepath, prefix + '/' + f['name'], f['size']))
total_size += f['size']
total_cnt += 1
return total_size, total_cnt
def symlinkarchivestream(ticket, data_path):
for filepath, arcpath, _ in ticket['target']:
t = tarfile.TarInfo(name=arcpath)
t.type = tarfile.SYMTYPE
t.linkname = os.path.relpath(filepath, data_path)
yield t.tobuf()
stream = cStringIO.StringIO()
with tarfile.open(mode='w|', fileobj=stream) as archive:
pass
yield stream.getvalue() # get tar stream trailer
stream.close()
def archivestream(ticket):
BLOCKSIZE = 512
CHUNKSIZE = 2**20 # stream files in 1MB chunks
stream = cStringIO.StringIO()
with tarfile.open(mode='w|', fileobj=stream) as archive:
for filepath, arcpath, _ in ticket['target']:
yield archive.gettarinfo(filepath, arcpath).tobuf()
with open(filepath, 'rb') as fd:
for chunk in iter(lambda: fd.read(CHUNKSIZE), ''):
yield chunk
if len(chunk) % BLOCKSIZE != 0:
yield (BLOCKSIZE - (len(chunk) % BLOCKSIZE)) * b'\0'
yield stream.getvalue() # get tar stream trailer
stream.close()
def _bulk_preflight_archivestream(self, file_refs):
data_path = config.get_item('persistent', 'data_path')
arc_prefix = 'sdm'
file_cnt = 0
total_size = 0
targets = []
for fref in file_refs:
cont_name = fref.get('container_name','')+'s'
cont_id = fref.get('container_id', '')
filename = fref.get('filename', '')
if cont_name not in ['projects', 'sessions', 'acquisitions']:
self.abort(400, 'Bulk download only supports files in projects, sessions and acquisitions')
file_obj = None
try:
# Try to find the file reference in the database (filtering on user permissions)
bid = bson.ObjectId(cont_id)
query = {'_id': bid}
if not self.superuser_request:
query['permissions._id'] = self.uid
{'files': { '$elemMatch': {
'name': filename
}}
})['files'][0]
except:
# self.abort(404, 'File {} on Container {} {} not found'.format(filename, cont_name, cont_id))
# silently skip missing files/files user does not have access to
continue
filepath = os.path.join(data_path, util.path_from_hash(file_obj['hash']))
if os.path.exists(filepath): # silently skip missing files
targets.append((filepath, cont_name+'/'+cont_id+'/'+file_obj['name'], file_obj['size']))
total_size += file_obj['size']
file_cnt += 1
if len(targets) > 0:
filename = arc_prefix + datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') + '.tar'
ticket = util.download_ticket(self.request.client_addr, 'batch', targets, filename, total_size)
config.db.downloads.insert_one(ticket)
return {'ticket': ticket['_id'], 'file_cnt': file_cnt, 'size': total_size}
else:
self.abort(404, 'No files requested could be found')
def _preflight_archivestream(self, req_spec, collection=None):
data_path = config.get_item('persistent', 'data_path')
base_query = {}
if not self.superuser_request:
base_query['permissions._id'] = self.uid
base_query['_id'] = item_id
project = config.db.projects.find_one(base_query, ['group', 'label', 'files'])
if not project:
# silently skip missing objects/objects user does not have access to
continue
prefix = '/'.join([arc_prefix, project['group'], project['label']])
total_size, file_cnt = _append_targets(targets, project, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
sessions = config.db.sessions.find({'project': item_id}, ['label', 'files', 'uid', 'timestamp', 'timezone'])
session_dict = {session['_id']: session for session in sessions}
acquisitions = config.db.acquisitions.find({'session': {'$in': session_dict.keys()}}, ['label', 'files', 'session', 'uid', 'timestamp', 'timezone'])
session_prefixes = {}
for session in session_dict.itervalues():
session_prefix = prefix + '/' + self._path_from_container(session, used_subpaths, project['_id'])
session_prefixes[session['_id']] = session_prefix
total_size, file_cnt = _append_targets(targets, session, session_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
for acq in acquisitions:
session = session_dict[acq['session']]
acq_prefix = session_prefixes[session['_id']] + '/' + self._path_from_container(acq, used_subpaths, session['_id'])
total_size, file_cnt = _append_targets(targets, acq, acq_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
session = config.db.sessions.find_one(base_query, ['project', 'label', 'files', 'uid', 'timestamp', 'timezone'])
if not session:
# silently skip missing objects/objects user does not have access to
continue
project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label'])
prefix = project['group'] + '/' + project['label'] + '/' + self._path_from_container(session, used_subpaths, project['_id'])
total_size, file_cnt = _append_targets(targets, session, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
# If the param `collection` holding a collection id is not None, filter out acquisitions that are not in the collection
a_query = {'session': item_id}
if collection:
a_query['collections'] = bson.ObjectId(collection)
acquisitions = config.db.acquisitions.find(a_query, ['label', 'files', 'uid', 'timestamp', 'timezone'])
acq_prefix = prefix + '/' + self._path_from_container(acq, used_subpaths, session['_id'])
total_size, file_cnt = _append_targets(targets, acq, acq_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
acq = config.db.acquisitions.find_one(base_query, ['session', 'label', 'files', 'uid', 'timestamp', 'timezone'])
if not acq:
# silently skip missing objects/objects user does not have access to
continue
session = config.db.sessions.find_one({'_id': acq['session']}, ['project', 'label', 'uid', 'timestamp', 'timezone'])
project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label'])
prefix = project['group'] + '/' + project['label'] + '/' + self._path_from_container(session, used_subpaths, project['_id']) + '/' + self._path_from_container(acq, used_subpaths, session['_id'])
total_size, file_cnt = _append_targets(targets, acq, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
if len(targets) > 0:
log.debug(json.dumps(targets, sort_keys=True, indent=4, separators=(',', ': ')))
filename = 'sdm_' + datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') + '.tar'
ticket = util.download_ticket(self.request.client_addr, 'batch', targets, filename, total_size)
config.db.downloads.insert_one(ticket)
return {'ticket': ticket['_id'], 'file_cnt': file_cnt, 'size': total_size}
else:
self.abort(404, 'No requested containers could be found')
def _path_from_container(self, container, used_subpaths, parent_id):
def _find_new_path(path, list_used_subpaths):
"""from the input path finds a path that hasn't been used"""
if path not in list_used_subpaths:
return path
i = 0
while True:
modified_path = path + '_' + str(i)
if modified_path not in list_used_subpaths:
return modified_path
path = None
if not path and container.get('label'):
path = container['label']
if not path and container.get('timestamp'):
timezone = container.get('timezone')
if timezone:
path = pytz.timezone('UTC').localize(container['timestamp']).astimezone(pytz.timezone(timezone)).strftime('%Y%m%d_%H%M')
else:
path = container['timestamp'].strftime('%Y%m%d_%H%M')
if not path and container.get('uid'):
path = container['uid']
if not path:
path = 'untitled'
path = _find_new_path(path, used_subpaths.get(parent_id, []))
used_subpaths[parent_id] = used_subpaths.get(parent_id, []) + [path]
return path
"""
.. http:get:: /api/download
Download GET Description...
:statuscode 400: describe me
:statuscode 404: describe me
.. http:post:: /api/download
Download POST Description...
:statuscode 400: describe me
:statuscode 404: describe me
"""
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"""
In downloads we use filters in the payload to exclude/include files.
To pass a single filter, each of its conditions should be satisfied.
If a file pass at least one filter, it is included in the targets.
For example:
download_payload = {
'optional': True,
'nodes': [{'level':'project', '_id':project_id}],
'filters':[{
'tags':{'+':['incomplete']}
},
{
'types':{'-':['dicom']}
}]
}
will download files with tag 'incomplete' OR type different from 'dicom'
download_payload = {
'optional': True,
'nodes': [{'level':'project', '_id':project_id}],
'filters':[{
'tags':{'+':['incomplete']},
'types':{'+':['dicom']}
}]
}
will download only files with tag 'incomplete' AND type different from 'dicom'
"""
ticket_id = self.get_param('ticket')
ticket = config.db.downloads.find_one({'_id': ticket_id})
if ticket['ip'] != self.request.client_addr:
self.abort(400, 'ticket not for this source IP')
if self.get_param('symlinks'):
self.response.app_iter = symlinkarchivestream(ticket, config.get_item('persistent', 'data_path'))
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename=' + str(ticket['filename'])
config.db.projects.update_one({'_id': project_id}, {'$inc': {'counter': 1}})
req_spec = self.request.json_body
if self.is_true('bulk'):
return self._bulk_preflight_archivestream(req_spec.get('files', []))
else:
payload_schema_uri = validators.schema_uri('input', 'download.json')
validator = validators.from_schema_path(payload_schema_uri)
validator(req_spec, 'POST')
log.debug(json.dumps(req_spec, sort_keys=True, indent=4, separators=(',', ': ')))
return self._preflight_archivestream(req_spec, collection=self.get_param('collection'))