Skip to content
Snippets Groups Projects
Commit b3405989 authored by Renzo Frigato's avatar Renzo Frigato
Browse files

Merge pull request #106 from scitran/bids_download_to_master

add bids download to master
parents 4c602aee d62dff06
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,38 @@ from . import tempdir as tempfile
log = config.log
def _filter_check(property_filter, property_values):
minus = set(property_filter.get('-', []))
plus = set(property_filter.get('+', []))
if not minus.isdisjoint(property_values):
return False
if plus and plus.isdisjoint(property_values):
return False
return True
def _append_targets(targets, container, prefix, total_size, total_cnt, optional, data_path, filters):
for f in container.get('files', []):
if filters:
filtered = True
for filter_ in filters:
type_as_list = [f['type']] if f.get('type') else []
if (
_filter_check(filter_.get('tags', {}), f.get('tags', [])) and
_filter_check(filter_.get('types', {}), type_as_list)
):
filtered = False
break
if filtered:
continue
if optional or not f.get('optional', False):
filepath = os.path.join(data_path, util.path_from_hash(f['hash']))
if os.path.exists(filepath): # silently skip missing files
targets.append((filepath, prefix + '/' + f['name'], f['size']))
total_size += f['size']
total_cnt += 1
return total_size, total_cnt
class Config(base.RequestHandler):
......@@ -151,18 +183,6 @@ class Core(base.RequestHandler):
def _preflight_archivestream(self, req_spec):
data_path = config.get_item('persistent', 'data_path')
arc_prefix = 'sdm'
def append_targets(targets, container, prefix, total_size, total_cnt):
prefix = arc_prefix + '/' + prefix
for f in container.get('files', []):
if req_spec['optional'] or not f.get('optional', False):
filepath = os.path.join(data_path, util.path_from_hash(f['hash']))
if os.path.exists(filepath): # silently skip missing files
targets.append((filepath, prefix + '/' + f['name'], f['size']))
total_size += f['size']
total_cnt += 1
return total_size, total_cnt
file_cnt = 0
total_size = 0
targets = []
......@@ -171,39 +191,92 @@ class Core(base.RequestHandler):
item_id = bson.ObjectId(item['_id'])
if item['level'] == 'project':
project = config.db.projects.find_one({'_id': item_id}, ['group', 'label', 'files'])
prefix = project['group'] + '/' + project['label']
total_size, file_cnt = append_targets(targets, project, prefix, total_size, file_cnt)
prefix = '/'.join([arc_prefix, project['group'], project['label']])
total_size, file_cnt = _append_targets(targets, project, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
sessions = config.db.sessions.find({'project': item_id}, ['label', 'files'])
session_dict = {session['_id']: session for session in sessions}
acquisitions = config.db.acquisitions.find({'session': {'$in': session_dict.keys()}}, ['label', 'files', 'session'])
for session in session_dict.itervalues():
session_prefix = prefix + '/' + session.get('label', 'untitled')
total_size, file_cnt = append_targets(targets, session, session_prefix, total_size, file_cnt)
total_size, file_cnt = _append_targets(targets, session, session_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
for acq in acquisitions:
session = session_dict[acq['session']]
acq_prefix = prefix + '/' + session.get('label', 'untitled') + '/' + acq.get('label', 'untitled')
total_size, file_cnt = append_targets(targets, acq, acq_prefix, total_size, file_cnt)
total_size, file_cnt = _append_targets(targets, acq, acq_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
elif item['level'] == 'session':
session = config.db.sessions.find_one({'_id': item_id}, ['project', 'label', 'files'])
project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label'])
prefix = project['group'] + '/' + project['label'] + '/' + session.get('label', 'untitled')
total_size, file_cnt = append_targets(targets, session, prefix, total_size, file_cnt)
total_size, file_cnt = _append_targets(targets, session, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
acquisitions = config.db.acquisitions.find({'session': item_id}, ['label', 'files'])
for acq in acquisitions:
acq_prefix = prefix + '/' + acq.get('label', 'untitled')
total_size, file_cnt = append_targets(targets, acq, acq_prefix, total_size, file_cnt)
total_size, file_cnt = _append_targets(targets, acq, acq_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
elif item['level'] == 'acquisition':
acq = config.db.acquisitions.find_one({'_id': item_id}, ['session', 'label', 'files'])
session = config.db.sessions.find_one({'_id': acq['session']}, ['project', 'label'])
project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label'])
prefix = project['group'] + '/' + project['label'] + '/' + session.get('label', 'untitled') + '/' + acq.get('label', 'untitled')
total_size, file_cnt = append_targets(targets, acq, prefix, total_size, file_cnt)
total_size, file_cnt = _append_targets(targets, acq, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
log.debug(json.dumps(targets, sort_keys=True, indent=4, separators=(',', ': ')))
filename = 'sdm_' + datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') + '.tar'
ticket = util.download_ticket(self.request.client_addr, 'batch', targets, filename, total_size)
config.db.downloads.insert_one(ticket)
return {'ticket': ticket['_id'], 'file_cnt': file_cnt, 'size': total_size}
def _preflight_archivestream_bids(self, req_spec):
data_path = config.get_item('persistent', 'data_path')
file_cnt = 0
total_size = 0
targets = []
# FIXME: check permissions of everything
projects = []
prefix = 'untitled'
if len(req_spec['nodes']) != 1:
self.abort(400, 'bids downloads are limited to single dataset downloads')
for item in req_spec['nodes']:
item_id = bson.ObjectId(item['_id'])
if item['level'] == 'project':
project = self.app.db.projects.find_one({'_id': item_id}, ['group', 'label', 'files', 'notes'])
projects.append(item_id)
prefix = project['name']
total_size, file_cnt = _append_targets(targets, project, prefix, total_size,
file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
ses_or_subj_list = self.app.db.sessions.find({'project': item_id}, ['_id', 'label', 'files', 'subject.code', 'subject_code'])
subject_prefixes = {
'missing_subject': prefix + '/missing_subject'
}
sessions = {}
for ses_or_subj in ses_or_subj_list:
subj_code = ses_or_subj.get('subject', {}).get('code') or ses_or_subj.get('subject_code')
if subj_code == 'subject':
subject_prefix = prefix + '/' + ses_or_subj.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, ses_or_subj, subject_prefix, total_size,
file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
subject_prefixes[str(ses_or_subj.get('_id'))] = subject_prefix
elif subj_code:
sessions[subj_code] = sessions.get(subj_code, []) + [ses_or_subj]
else:
sessions['missing_subject'] = sessions.get('missing_subject', []) + [ses_or_subj]
for subj_code, ses_list in sessions.items():
subject_prefix = subject_prefixes.get(subj_code)
if not subject_prefix:
continue
for session in ses_list:
session_prefix = subject_prefix + '/' + session.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, session, session_prefix, total_size,
file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
acquisitions = self.app.db.acquisitions.find({'session': session['_id']}, ['label', 'files'])
for acq in acquisitions:
acq_prefix = session_prefix + '/' + acq.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, acq, acq_prefix, total_size,
file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
log.debug(json.dumps(targets, sort_keys=True, indent=4, separators=(',', ': ')))
filename = prefix + '_' + datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') + '.tar'
ticket = util.download_ticket(self.request.client_addr, 'batch', targets, filename, total_size, projects)
self.app.db.downloads.insert_one(ticket)
return {'ticket': ticket['_id'], 'file_cnt': file_cnt, 'size': total_size}
def _archivestream(self, ticket):
BLOCKSIZE = 512
CHUNKSIZE = 2**20 # stream files in 1MB chunks
......@@ -233,6 +306,34 @@ class Core(base.RequestHandler):
def download(self):
"""
In downloads we use filters in the payload to exclude/include files.
To pass a single filter, each of its conditions should be satisfied.
If a file pass at least one filter, it is included in the targets.
For example:
download_payload = {
'optional': True,
'nodes': [{'level':'project', '_id':project_id}],
'filters':[{
'tags':{'+':['incomplete']}
},
{
'types':{'-':['dicom']}
}]
}
will download files with tag 'incomplete' OR type different from 'dicom'
download_payload = {
'optional': True,
'nodes': [{'level':'project', '_id':project_id}],
'filters':[{
'tags':{'+':['incomplete']},
'types':{'+':['dicom']}
}]
}
will download only files with tag 'incomplete' AND type different from 'dicom'
"""
ticket_id = self.get_param('ticket')
if ticket_id:
ticket = config.db.downloads.find_one({'_id': ticket_id})
......@@ -246,12 +347,17 @@ class Core(base.RequestHandler):
self.response.app_iter = self._archivestream(ticket)
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename=' + str(ticket['filename'])
for project_id in ticket['projects']:
self.app.db.projects.update_one({'_id': project_id}, {'$inc': {'counter': 1}})
else:
req_spec = self.request.json_body
validator = validators.payload_from_schema_file(self, 'input/download.json')
validator(req_spec, 'POST')
log.debug(json.dumps(req_spec, sort_keys=True, indent=4, separators=(',', ': ')))
return self._preflight_archivestream(req_spec)
if self.get_param('format') == 'bids':
return self._preflight_archivestream_bids(req_spec)
else:
return self._preflight_archivestream(req_spec)
def sites(self):
"""Return local and remote sites."""
......
{
"id": "#",
"$schema": "http://json-schema.org/draft-04/schema#",
"definitions": {
"filterDefinition": {
"type": "object",
"properties": {
"+": {"$ref": "#/definitions/filterItems"},
"-": {"$ref": "#/definitions/filterItems"}
}
},
"filterItems": {
"type": "array",
"minItems": 1,
"items": {
"type": "string"
}
}
},
"title": "Download",
"type": "object",
"properties": {
......@@ -24,6 +41,17 @@
"required": ["level", "_id"],
"additionalProperties": false
}
},
"filters": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"properties": {
"tags": {"$ref": "input/download.json#/definitions/filterDefinition"},
"types": {"$ref": "input/download.json#/definitions/filterDefinition"}
}
}
}
},
"required": ["optional", "nodes"],
......
......@@ -52,7 +52,7 @@ def container_fileinfo(container, filename):
return None
def download_ticket(ip, type_, target, filename, size):
def download_ticket(ip, type_, target, filename, size, projects = None):
return {
'_id': str(uuid.uuid4()),
'timestamp': datetime.datetime.utcnow(),
......@@ -61,6 +61,7 @@ def download_ticket(ip, type_, target, filename, size):
'target': target,
'filename': filename,
'size': size,
'projects': projects or []
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment