Newer
Older
def _filter_check(property_filter, property_values):
minus = set(property_filter.get('-', []))
plus = set(property_filter.get('+', []))
if not minus.isdisjoint(property_values):
return False
if plus and plus.isdisjoint(property_values):
return False
return True
def _append_targets(targets, container, prefix, total_size, total_cnt, optional, data_path, filters):
if filters:
filtered = True
for filter_ in filters:
type_as_list = [f['type']] if f.get('type') else []
if (
_filter_check(filter_.get('tags', {}), f.get('tags', [])) and
_filter_check(filter_.get('types', {}), type_as_list)
):
filtered = False
break
if filtered:
continue
if optional or not f.get('optional', False):
filepath = os.path.join(data_path, util.path_from_hash(f['hash']))
if os.path.exists(filepath): # silently skip missing files
targets.append((filepath, prefix + '/' + f['name'], f['size']))
total_size += f['size']
total_cnt += 1
return total_size, total_cnt
return config.get_public_config()
self.response.write(
'config = ' +
json.dumps( self.get(), sort_keys=True, indent=4, separators=(',', ': '), default=util.custom_json_serializer,) +
';'
)
class Core(base.RequestHandler):
def head(self):
"""Return 200 OK."""
def get(self):
"""Return API documentation"""
resources = """
Resource | Description
:-----------------------------------|:-----------------------
[(/sites)] | local and remote sites
/download | download
[(/users)] | list of users
[(/users/self)] | user identity
[(/users/roles)] | user roles
[(/users/*<uid>*)] | details for user *<uid>*
[(/users/*<uid>*/groups)] | groups for user *<uid>*
[(/users/*<uid>*/projects)] | projects for user *<uid>*
[(/groups)] | list of groups
/groups/*<gid>* | details for group *<gid>*
/groups/*<gid>*/projects | list of projects for group *<gid>*
/groups/*<gid>*/sessions | list of sessions for group *<gid>*
[(/projects)] | list of projects
[(/projects/groups)] | groups for projects
[(/projects/schema)] | schema for single project
/projects/*<pid>* | details for project *<pid>*
/projects/*<pid>*/sessions | list sessions for project *<pid>*
[(/sessions/schema)] | schema for single session
/sessions/*<sid>* | details for session *<sid>*
/sessions/*<sid>*/move | move session *<sid>* to a different project
/sessions/*<sid>*/acquisitions | list acquisitions for session *<sid>*
[(/acquisitions/schema)] | schema for single acquisition
/acquisitions/*<aid>* | details for acquisition *<aid>*
[(/collections)] | list of collections
[(/collections/schema)] | schema for single collection
/collections/*<cid>* | details for collection *<cid>*
/collections/*<cid>*/sessions | list of sessions for collection *<cid>*
/collections/*<cid>*/acquisitions | list of acquisitions for collection *<cid>*
[(/schema/group)] | group schema
[(/schema/user)] | user schema
resources = re.sub(r'\[\((.*)\)\]', r'[\1](/api\1?user=%s&root=%r)' % (self.uid, self.superuser_request), resources)
resources = re.sub(r'(\(.*)\*<uid>\*(.*\))', r'\1%s\2' % self.uid, resources)
resources = re.sub(r'\[\((.*)\)\]', r'[\1](/api\1)', resources)
resources = resources.replace('<', '<').replace('>', '>').strip()
self.response.headers['Content-Type'] = 'text/html; charset=utf-8'
self.response.write('<html>\n')
self.response.write('<head>\n')
self.response.write('<title>SciTran API</title>\n')
self.response.write('<meta name="viewport" content="width=device-width, initial-scale=1.0, user-scalable=yes">\n')
self.response.write('<style type="text/css">\n')
self.response.write('table {width:0%; border-width:1px; padding: 0;border-collapse: collapse;}\n')
self.response.write('table tr {border-top: 1px solid #b8b8b8; background-color: white; margin: 0; padding: 0;}\n')
self.response.write('table tr:nth-child(2n) {background-color: #f8f8f8;}\n')
self.response.write('table thead tr :last-child {width:100%;}\n')
self.response.write('table tr th {font-weight: bold; border: 1px solid #b8b8b8; background-color: #cdcdcd; margin: 0; padding: 6px 13px;}\n')
self.response.write('table tr th {font-weight: bold; border: 1px solid #b8b8b8; background-color: #cdcdcd; margin: 0; padding: 6px 13px;}\n')
self.response.write('table tr td {border: 1px solid #b8b8b8; margin: 0; padding: 6px 13px;}\n')
self.response.write('table tr th :first-child, table tr td :first-child {margin-top: 0;}\n')
self.response.write('table tr th :last-child, table tr td :last-child {margin-bottom: 0;}\n')
self.response.write('</style>\n')
self.response.write('</head>\n')
self.response.write('<body style="min-width:900px">\n')
if self.debug and not self.get_param('user'):
self.response.write('<form name="username" action="" method="get">\n')
self.response.write('Username: <input type="text" name="user">\n')
self.response.write('Root: <input type="checkbox" name="root" value="1">\n')
self.response.write('<input type="submit" value="Generate Custom Links">\n')
self.response.write('</form>\n')
self.response.write(markdown.markdown(resources, ['extra']))
self.response.write('</body>\n')
self.response.write('</html>\n')
def reaper(self):
"""Receive a sortable reaper upload."""
if not self.superuser_request:
self.abort(402, 'uploads must be from an authorized drone')
with tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) as tempdir_path:
try:
file_store = files.FileStore(self.request, tempdir_path)
except files.FileStoreException as e:
fileinfo = dict(
name=file_store.filename,
size=file_store.size,
hash=file_store.hash,
type=file_store.filetype,
tags=file_store.tags,
metadata=file_store.metadata
)
container = reaperutil.create_container_hierarchy(file_store.metadata)
f = container.find(file_store.filename)
target_path = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash']))
if not f:
file_store.move_file(target_path)
container.add_file(fileinfo)
rules.create_jobs(config.db, container.acquisition, 'acquisition', fileinfo)
elif not file_store.identical(util.path_from_hash(fileinfo['hash']), f['hash']):
file_store.move_file(target_path)
container.update_file(fileinfo)
rules.create_jobs(config.db, container.acquisition, 'acquisition', fileinfo)
throughput = file_store.size / file_store.duration.total_seconds()
log.info('Received %s [%s, %s/s] from %s' % (file_store.filename, util.hrsize(file_store.size), util.hrsize(throughput), self.request.client_addr))
data_path = config.get_item('persistent', 'data_path')
targets = []
# FIXME: check permissions of everything
for item in req_spec['nodes']:
item_id = bson.ObjectId(item['_id'])
if item['level'] == 'project':
project = config.db.projects.find_one({'_id': item_id}, ['group', 'label', 'files'])
prefix = '/'.join([arc_prefix, project['group'], project['label']])
total_size, file_cnt = _append_targets(targets, project, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
sessions = config.db.sessions.find({'project': item_id}, ['label', 'files'])
session_dict = {session['_id']: session for session in sessions}
acquisitions = config.db.acquisitions.find({'session': {'$in': session_dict.keys()}}, ['label', 'files', 'session'])
for session in session_dict.itervalues():
session_prefix = prefix + '/' + session.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, session, session_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
for acq in acquisitions:
session = session_dict[acq['session']]
acq_prefix = prefix + '/' + session.get('label', 'untitled') + '/' + acq.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, acq, acq_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
session = config.db.sessions.find_one({'_id': item_id}, ['project', 'label', 'files'])
project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label'])
prefix = project['group'] + '/' + project['label'] + '/' + session.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, session, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
acquisitions = config.db.acquisitions.find({'session': item_id}, ['label', 'files'])
acq_prefix = prefix + '/' + acq.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, acq, acq_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
acq = config.db.acquisitions.find_one({'_id': item_id}, ['session', 'label', 'files'])
session = config.db.sessions.find_one({'_id': acq['session']}, ['project', 'label'])
project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label'])
prefix = project['group'] + '/' + project['label'] + '/' + session.get('label', 'untitled') + '/' + acq.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, acq, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
log.debug(json.dumps(targets, sort_keys=True, indent=4, separators=(',', ': ')))
filename = 'sdm_' + datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') + '.tar'
ticket = util.download_ticket(self.request.client_addr, 'batch', targets, filename, total_size)
config.db.downloads.insert_one(ticket)
return {'ticket': ticket['_id'], 'file_cnt': file_cnt, 'size': total_size}
def _preflight_archivestream_bids(self, req_spec):
data_path = config.get_item('persistent', 'data_path')
file_cnt = 0
total_size = 0
targets = []
# FIXME: check permissions of everything
projects = []
prefix = 'untitled'
if len(req_spec['nodes']) != 1:
self.abort(400, 'bids downloads are limited to single dataset downloads')
for item in req_spec['nodes']:
item_id = bson.ObjectId(item['_id'])
if item['level'] == 'project':
project = self.app.db.projects.find_one({'_id': item_id}, ['group', 'label', 'files', 'notes'])
projects.append(item_id)
prefix = project['name']
total_size, file_cnt = _append_targets(targets, project, prefix, total_size,
file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
ses_or_subj_list = self.app.db.sessions.find({'project': item_id}, ['_id', 'label', 'files', 'subject.code', 'subject_code'])
subject_prefixes = {
'missing_subject': prefix + '/missing_subject'
}
sessions = {}
for ses_or_subj in ses_or_subj_list:
subj_code = ses_or_subj.get('subject', {}).get('code') or ses_or_subj.get('subject_code')
if subj_code == 'subject':
subject_prefix = prefix + '/' + ses_or_subj.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, ses_or_subj, subject_prefix, total_size,
file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
subject_prefixes[str(ses_or_subj.get('_id'))] = subject_prefix
elif subj_code:
sessions[subj_code] = sessions.get(subj_code, []) + [ses_or_subj]
else:
sessions['missing_subject'] = sessions.get('missing_subject', []) + [ses_or_subj]
for subj_code, ses_list in sessions.items():
subject_prefix = subject_prefixes.get(subj_code)
if not subject_prefix:
continue
for session in ses_list:
session_prefix = subject_prefix + '/' + session.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, session, session_prefix, total_size,
file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
acquisitions = self.app.db.acquisitions.find({'session': session['_id']}, ['label', 'files'])
for acq in acquisitions:
acq_prefix = session_prefix + '/' + acq.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, acq, acq_prefix, total_size,
file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
log.debug(json.dumps(targets, sort_keys=True, indent=4, separators=(',', ': ')))
filename = prefix + '_' + datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') + '.tar'
ticket = util.download_ticket(self.request.client_addr, 'batch', targets, filename, total_size, projects)
self.app.db.downloads.insert_one(ticket)
return {'ticket': ticket['_id'], 'file_cnt': file_cnt, 'size': total_size}
CHUNKSIZE = 2**20 # stream files in 1MB chunks
with tarfile.open(mode='w|', fileobj=stream) as archive:
for filepath, arcpath, _ in ticket['target']:
yield archive.gettarinfo(filepath, arcpath).tobuf()
with open(filepath, 'rb') as fd:
for chunk in iter(lambda: fd.read(CHUNKSIZE), ''):
if len(chunk) % BLOCKSIZE != 0:
yield (BLOCKSIZE - (len(chunk) % BLOCKSIZE)) * b'\0'
yield stream.getvalue() # get tar stream trailer
stream.close()
def _symlinkarchivestream(self, ticket, data_path):
for filepath, arcpath, _ in ticket['target']:
t = tarfile.TarInfo(name=arcpath)
t.type = tarfile.SYMTYPE
t.linkname = os.path.relpath(filepath, data_path)
yield t.tobuf()
stream = cStringIO.StringIO()
with tarfile.open(mode='w|', fileobj=stream) as archive:
pass
yield stream.getvalue() # get tar stream trailer
stream.close()
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
"""
In downloads we use filters in the payload to exclude/include files.
To pass a single filter, each of its conditions should be satisfied.
If a file pass at least one filter, it is included in the targets.
For example:
download_payload = {
'optional': True,
'nodes': [{'level':'project', '_id':project_id}],
'filters':[{
'tags':{'+':['incomplete']}
},
{
'types':{'-':['dicom']}
}]
}
will download files with tag 'incomplete' OR type different from 'dicom'
download_payload = {
'optional': True,
'nodes': [{'level':'project', '_id':project_id}],
'filters':[{
'tags':{'+':['incomplete']},
'types':{'+':['dicom']}
}]
}
will download only files with tag 'incomplete' AND type different from 'dicom'
"""
ticket_id = self.get_param('ticket')
ticket = config.db.downloads.find_one({'_id': ticket_id})
if ticket['ip'] != self.request.client_addr:
self.abort(400, 'ticket not for this source IP')
if self.get_param('symlinks'):
self.response.app_iter = self._symlinkarchivestream(ticket, config.get_item('persistent', 'data_path'))
else:
self.response.app_iter = self._archivestream(ticket)
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename=' + str(ticket['filename'])
for project_id in ticket['projects']:
self.app.db.projects.update_one({'_id': project_id}, {'$inc': {'counter': 1}})
req_spec = self.request.json_body
validator = validators.payload_from_schema_file(self, 'input/download.json')
validator(req_spec, 'POST')
log.debug(json.dumps(req_spec, sort_keys=True, indent=4, separators=(',', ': ')))
if self.get_param('format') == 'bids':
return self._preflight_archivestream_bids(req_spec)
else:
return self._preflight_archivestream(req_spec)
def sites(self):
"""Return local and remote sites."""
projection = ['name', 'onload']
# TODO onload for local is true
site_id = config.get_item('site', 'id')
if self.public_request or self.is_true('all'):
sites = list(config.db.sites.find(None, projection))
# TODO onload based on user prefs
remotes = (config.db.users.find_one({'_id': self.uid}, ['remotes']) or {}).get('remotes', [])
remote_ids = [r['_id'] for r in remotes] + [site_id]
sites = list(config.db.sites.find({'_id': {'$in': remote_ids}}, projection))
for s in sites: # TODO: this for loop will eventually move to public case
def register(self):
if not config.get_item('site', 'registered'):
self.abort(400, 'Site not registered with central')
if not config.get_item('site', 'ssl_cert'):
self.abort(400, 'SSL cert not configured')
if not config.get_item('site', 'central_url'):
self.abort(400, 'Central URL not configured')
if not centralclient.update(config.db, config.get_item('site', 'ssl_cert'), config.get_item('site', 'central_url')):
centralclient.fail_count += 1
else:
centralclient.fail_count = 0
if centralclient.fail_count == 3:
log.warning('scitran central unreachable, purging all remotes info')
centralclient.clean_remotes(mongo.db)