Skip to content
Snippets Groups Projects
Commit d220990b authored by Renzo Frigato's avatar Renzo Frigato
Browse files

add bids download to master

parent 4c602aee
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,19 @@ from . import tempdir as tempfile
log = config.log
def _append_targets(targets, container, prefix, total_size, total_cnt, optional, data_path, attachments=True):
for f in container.get('files', []):
if (not attachments or (type(attachments) == list and f['name'] not in attachments)) and 'attachment' in f.get('tags', []):
continue
if optional or not f.get('optional', False):
filepath = os.path.join(data_path, util.path_from_hash(f['hash']))
if os.path.exists(filepath): # silently skip missing files
targets.append((filepath, prefix + '/' + f['name'], f['size']))
total_size += f['size']
total_cnt += 1
return total_size, total_cnt
class Config(base.RequestHandler):
def get(self):
......@@ -151,18 +164,6 @@ class Core(base.RequestHandler):
def _preflight_archivestream(self, req_spec):
data_path = config.get_item('persistent', 'data_path')
arc_prefix = 'sdm'
def append_targets(targets, container, prefix, total_size, total_cnt):
prefix = arc_prefix + '/' + prefix
for f in container.get('files', []):
if req_spec['optional'] or not f.get('optional', False):
filepath = os.path.join(data_path, util.path_from_hash(f['hash']))
if os.path.exists(filepath): # silently skip missing files
targets.append((filepath, prefix + '/' + f['name'], f['size']))
total_size += f['size']
total_cnt += 1
return total_size, total_cnt
file_cnt = 0
total_size = 0
targets = []
......@@ -171,39 +172,92 @@ class Core(base.RequestHandler):
item_id = bson.ObjectId(item['_id'])
if item['level'] == 'project':
project = config.db.projects.find_one({'_id': item_id}, ['group', 'label', 'files'])
prefix = project['group'] + '/' + project['label']
total_size, file_cnt = append_targets(targets, project, prefix, total_size, file_cnt)
prefix = '/'.join([arc_prefix, project['group'], project['label']])
total_size, file_cnt = _append_targets(targets, project, prefix, total_size, file_cnt, req_spec['optional'], data_path)
sessions = config.db.sessions.find({'project': item_id}, ['label', 'files'])
session_dict = {session['_id']: session for session in sessions}
acquisitions = config.db.acquisitions.find({'session': {'$in': session_dict.keys()}}, ['label', 'files', 'session'])
for session in session_dict.itervalues():
session_prefix = prefix + '/' + session.get('label', 'untitled')
total_size, file_cnt = append_targets(targets, session, session_prefix, total_size, file_cnt)
total_size, file_cnt = _append_targets(targets, session, session_prefix, total_size, file_cnt, req_spec['optional'], data_path)
for acq in acquisitions:
session = session_dict[acq['session']]
acq_prefix = prefix + '/' + session.get('label', 'untitled') + '/' + acq.get('label', 'untitled')
total_size, file_cnt = append_targets(targets, acq, acq_prefix, total_size, file_cnt)
total_size, file_cnt = _append_targets(targets, acq, acq_prefix, total_size, file_cnt, req_spec['optional'], data_path)
elif item['level'] == 'session':
session = config.db.sessions.find_one({'_id': item_id}, ['project', 'label', 'files'])
project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label'])
prefix = project['group'] + '/' + project['label'] + '/' + session.get('label', 'untitled')
total_size, file_cnt = append_targets(targets, session, prefix, total_size, file_cnt)
total_size, file_cnt = _append_targets(targets, session, prefix, total_size, file_cnt, req_spec['optional'], data_path)
acquisitions = config.db.acquisitions.find({'session': item_id}, ['label', 'files'])
for acq in acquisitions:
acq_prefix = prefix + '/' + acq.get('label', 'untitled')
total_size, file_cnt = append_targets(targets, acq, acq_prefix, total_size, file_cnt)
total_size, file_cnt = _append_targets(targets, acq, acq_prefix, total_size, file_cnt, req_spec['optional'], data_path)
elif item['level'] == 'acquisition':
acq = config.db.acquisitions.find_one({'_id': item_id}, ['session', 'label', 'files'])
session = config.db.sessions.find_one({'_id': acq['session']}, ['project', 'label'])
project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label'])
prefix = project['group'] + '/' + project['label'] + '/' + session.get('label', 'untitled') + '/' + acq.get('label', 'untitled')
total_size, file_cnt = append_targets(targets, acq, prefix, total_size, file_cnt)
total_size, file_cnt = _append_targets(targets, acq, prefix, total_size, file_cnt, req_spec['optional'], data_path)
log.debug(json.dumps(targets, sort_keys=True, indent=4, separators=(',', ': ')))
filename = 'sdm_' + datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') + '.tar'
ticket = util.download_ticket(self.request.client_addr, 'batch', targets, filename, total_size)
config.db.downloads.insert_one(ticket)
return {'ticket': ticket['_id'], 'file_cnt': file_cnt, 'size': total_size}
def _preflight_archivestream_bids(self, req_spec):
data_path = config.get_item('persistent', 'data_path')
file_cnt = 0
total_size = 0
targets = []
# FIXME: check permissions of everything
projects = []
prefix = 'untitled'
if len(req_spec['nodes']) != 1:
self.abort(400, 'bids downloads are limited to single dataset downloads')
for item in req_spec['nodes']:
item_id = bson.ObjectId(item['_id'])
if item['level'] == 'project':
project = self.app.db.projects.find_one({'_id': item_id}, ['group', 'label', 'files', 'notes'])
projects.append(item_id)
prefix = project['name']
total_size, file_cnt = _append_targets(targets, project, prefix, total_size,
file_cnt, req_spec['optional'], data_path, ['README', 'dataset_description.json'])
ses_or_subj_list = self.app.db.sessions.find({'project': item_id}, ['_id', 'label', 'files', 'subject.code', 'subject_code'])
subject_prefixes = {
'missing_subject': prefix + '/missing_subject'
}
sessions = {}
for ses_or_subj in ses_or_subj_list:
subj_code = ses_or_subj.get('subject', {}).get('code') or ses_or_subj.get('subject_code')
if subj_code == 'subject':
subject_prefix = prefix + '/' + ses_or_subj.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, ses_or_subj, subject_prefix, total_size,
file_cnt, req_spec['optional'], data_path, False)
subject_prefixes[str(ses_or_subj.get('_id'))] = subject_prefix
elif subj_code:
sessions[subj_code] = sessions.get(subj_code, []) + [ses_or_subj]
else:
sessions['missing_subject'] = sessions.get('missing_subject', []) + [ses_or_subj]
for subj_code, ses_list in sessions.items():
subject_prefix = subject_prefixes.get(subj_code)
if not subject_prefix:
continue
for session in ses_list:
session_prefix = subject_prefix + '/' + session.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, session, session_prefix, total_size,
file_cnt, req_spec['optional'], data_path, False)
acquisitions = self.app.db.acquisitions.find({'session': session['_id']}, ['label', 'files'])
for acq in acquisitions:
acq_prefix = session_prefix + '/' + acq.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, acq, acq_prefix, total_size,
file_cnt, req_spec['optional'], data_path, False)
log.debug(json.dumps(targets, sort_keys=True, indent=4, separators=(',', ': ')))
filename = prefix + '_' + datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') + '.tar'
ticket = util.download_ticket(self.request.client_addr, 'batch', targets, filename, total_size, projects)
self.app.db.downloads.insert_one(ticket)
return {'ticket': ticket['_id'], 'file_cnt': file_cnt, 'size': total_size}
def _archivestream(self, ticket):
BLOCKSIZE = 512
CHUNKSIZE = 2**20 # stream files in 1MB chunks
......@@ -246,12 +300,17 @@ class Core(base.RequestHandler):
self.response.app_iter = self._archivestream(ticket)
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename=' + str(ticket['filename'])
for project_id in ticket['projects']:
self.app.db.projects.update_one({'_id': project_id}, {'$inc': {'counter': 1}})
else:
req_spec = self.request.json_body
validator = validators.payload_from_schema_file(self, 'input/download.json')
validator(req_spec, 'POST')
log.debug(json.dumps(req_spec, sort_keys=True, indent=4, separators=(',', ': ')))
return self._preflight_archivestream(req_spec)
if self.request.GET.get('format') == 'bids':
return self._preflight_archivestream_bids(req_spec)
else:
return self._preflight_archivestream(req_spec)
def sites(self):
"""Return local and remote sites."""
......
......@@ -52,7 +52,7 @@ def container_fileinfo(container, filename):
return None
def download_ticket(ip, type_, target, filename, size):
def download_ticket(ip, type_, target, filename, size, projects = None):
return {
'_id': str(uuid.uuid4()),
'timestamp': datetime.datetime.utcnow(),
......@@ -61,6 +61,7 @@ def download_ticket(ip, type_, target, filename, size):
'target': target,
'filename': filename,
'size': size,
'projects': projects or []
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment