import os
import re
import bson
import json
import tarfile
import datetime
import markdown
import cStringIO
import validators
from . import base
from . import util
from . import files
from . import rules
from . import config
from . import centralclient
from .dao import reaperutil
from . import tempdir as tempfile
log = config.log
def _filter_check(property_filter, property_values):
minus = set(property_filter.get('-', []))
plus = set(property_filter.get('+', []))
if not minus.isdisjoint(property_values):
return False
if plus and plus.isdisjoint(property_values):
return False
return True
def _append_targets(targets, container, prefix, total_size, total_cnt, optional, data_path, filters):
for f in container.get('files', []):
if filters:
filtered = True
for filter_ in filters:
type_as_list = [f['type']] if f.get('type') else []
if (
_filter_check(filter_.get('tags', {}), f.get('tags', [])) and
_filter_check(filter_.get('types', {}), type_as_list)
filtered = False
if filtered:
if optional or not f.get('optional', False):
filepath = os.path.join(data_path, util.path_from_hash(f['hash']))
if os.path.exists(filepath): # silently skip missing files
targets.append((filepath, prefix + '/' + f['name'], f['size']))
total_size += f['size']
total_cnt += 1
return total_size, total_cnt
class Config(base.RequestHandler):
def get(self):
return config.get_public_config()
def get_js(self):
'config = ' +
json.dumps( self.get(), sort_keys=True, indent=4, separators=(',', ': '), default=util.custom_json_serializer,) +
class Core(base.RequestHandler):
"""/api """
def head(self):
"""Return 200 OK."""
def get(self):
"""Return API documentation"""
resources = """
Resource | Description
[(/sites)] | local and remote sites
/download | download
[(/users)] | list of users
[(/users/self)] | user identity
[(/users/roles)] | user roles
[(/users/*<uid>*)] | details for user *<uid>*
[(/users/*<uid>*/groups)] | groups for user *<uid>*
[(/users/*<uid>*/projects)] | projects for user *<uid>*
[(/groups)] | list of groups
/groups/*<gid>* | details for group *<gid>*
/groups/*<gid>*/projects | list of projects for group *<gid>*
/groups/*<gid>*/sessions | list of sessions for group *<gid>*
[(/projects)] | list of projects
[(/projects/groups)] | groups for projects
[(/projects/schema)] | schema for single project
/projects/*<pid>* | details for project *<pid>*
/projects/*<pid>*/sessions | list sessions for project *<pid>*
[(/sessions)] | list of sessions
[(/sessions/schema)] | schema for single session
/sessions/*<sid>* | details for session *<sid>*
/sessions/*<sid>*/move | move session *<sid>* to a different project
/sessions/*<sid>*/acquisitions | list acquisitions for session *<sid>*
[(/acquisitions/schema)] | schema for single acquisition
/acquisitions/*<aid>* | details for acquisition *<aid>*
[(/collections)] | list of collections
[(/collections/schema)] | schema for single collection
/collections/*<cid>* | details for collection *<cid>*
/collections/*<cid>*/sessions | list of sessions for collection *<cid>*
/collections/*<cid>*/acquisitions | list of acquisitions for collection *<cid>*
[(/schema/group)] | group schema
[(/schema/user)] | user schema
if self.debug and self.uid:
resources = re.sub(r'\[\((.*)\)\]', r'[\1](/api\1?user=%s&root=%r)' % (self.uid, self.superuser_request), resources)
resources = re.sub(r'(\(.*)\*<uid>\*(.*\))', r'\1%s\2' % self.uid, resources)
resources = re.sub(r'\[\((.*)\)\]', r'[\1](/api\1)', resources)
resources = resources.replace('<', '<').replace('>', '>').strip()
self.response.headers['Content-Type'] = 'text/html; charset=utf-8'
self.response.write('<title>SciTran API</title>\n')
self.response.write('<meta name="viewport" content="width=device-width, initial-scale=1.0, user-scalable=yes">\n')
self.response.write('<style type="text/css">\n')
self.response.write('table {width:0%; border-width:1px; padding: 0;border-collapse: collapse;}\n')
self.response.write('table tr {border-top: 1px solid #b8b8b8; background-color: white; margin: 0; padding: 0;}\n')
self.response.write('table tr:nth-child(2n) {background-color: #f8f8f8;}\n')
self.response.write('table thead tr :last-child {width:100%;}\n')
self.response.write('table tr th {font-weight: bold; border: 1px solid #b8b8b8; background-color: #cdcdcd; margin: 0; padding: 6px 13px;}\n')
self.response.write('table tr td {border: 1px solid #b8b8b8; margin: 0; padding: 6px 13px;}\n')
self.response.write('table tr th :first-child, table tr td :first-child {margin-top: 0;}\n')
self.response.write('table tr th :last-child, table tr td :last-child {margin-bottom: 0;}\n')
self.response.write('<body style="min-width:900px">\n')
if self.debug and not self.get_param('user'):
self.response.write('<form name="username" action="" method="get">\n')
self.response.write('Username: <input type="text" name="user">\n')
self.response.write('Root: <input type="checkbox" name="root" value="1">\n')
self.response.write('<input type="submit" value="Generate Custom Links">\n')
self.response.write(markdown.markdown(resources, ['extra']))
def reaper(self):
"""Receive a sortable reaper upload."""
if not self.superuser_request:
self.abort(402, 'uploads must be from an authorized drone')
with tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) as tempdir_path:
file_store = files.FileStore(self.request, tempdir_path)
except files.FileStoreException as e:
self.abort(400, str(e))
now = datetime.datetime.now()
fileinfo = dict(
container = reaperutil.create_container_hierarchy(file_store.metadata)
f = container.find(file_store.filename)
target_path = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash']))
if not f:
rules.create_jobs(config.db, container.acquisition, 'acquisition', fileinfo)
elif not file_store.identical(util.path_from_hash(fileinfo['hash']), f['hash']):
rules.create_jobs(config.db, container.acquisition, 'acquisition', fileinfo)
throughput = file_store.size / file_store.duration.total_seconds()
log.info('Received %s [%s, %s/s] from %s' % (file_store.filename, util.hrsize(file_store.size), util.hrsize(throughput), self.request.client_addr))
def _preflight_archivestream(self, req_spec):
data_path = config.get_item('persistent', 'data_path')
arc_prefix = 'sdm'
file_cnt = 0
total_size = 0
targets = []
# FIXME: check permissions of everything
for item in req_spec['nodes']:
item_id = bson.ObjectId(item['_id'])
if item['level'] == 'project':
project = config.db.projects.find_one({'_id': item_id}, ['group', 'label', 'files'])
prefix = '/'.join([arc_prefix, project['group'], project['label']])
total_size, file_cnt = _append_targets(targets, project, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
sessions = config.db.sessions.find({'project': item_id}, ['label', 'files'])
session_dict = {session['_id']: session for session in sessions}
acquisitions = config.db.acquisitions.find({'session': {'$in': session_dict.keys()}}, ['label', 'files', 'session'])
for session in session_dict.itervalues():
session_prefix = prefix + '/' + session.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, session, session_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
for acq in acquisitions:
session = session_dict[acq['session']]
acq_prefix = prefix + '/' + session.get('label', 'untitled') + '/' + acq.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, acq, acq_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
elif item['level'] == 'session':
session = config.db.sessions.find_one({'_id': item_id}, ['project', 'label', 'files'])
project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label'])
prefix = project['group'] + '/' + project['label'] + '/' + session.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, session, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
acquisitions = config.db.acquisitions.find({'session': item_id}, ['label', 'files'])
for acq in acquisitions:
acq_prefix = prefix + '/' + acq.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, acq, acq_prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
elif item['level'] == 'acquisition':
acq = config.db.acquisitions.find_one({'_id': item_id}, ['session', 'label', 'files'])
session = config.db.sessions.find_one({'_id': acq['session']}, ['project', 'label'])
project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label'])
prefix = project['group'] + '/' + project['label'] + '/' + session.get('label', 'untitled') + '/' + acq.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, acq, prefix, total_size, file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
log.debug(json.dumps(targets, sort_keys=True, indent=4, separators=(',', ': ')))
filename = 'sdm_' + datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') + '.tar'
ticket = util.download_ticket(self.request.client_addr, 'batch', targets, filename, total_size)
return {'ticket': ticket['_id'], 'file_cnt': file_cnt, 'size': total_size}
def _preflight_archivestream_bids(self, req_spec):
data_path = config.get_item('persistent', 'data_path')
file_cnt = 0
total_size = 0
targets = []
# FIXME: check permissions of everything
projects = []
prefix = 'untitled'
if len(req_spec['nodes']) != 1:
self.abort(400, 'bids downloads are limited to single dataset downloads')
for item in req_spec['nodes']:
item_id = bson.ObjectId(item['_id'])
if item['level'] == 'project':
project = self.app.db.projects.find_one({'_id': item_id}, ['group', 'label', 'files', 'notes'])
prefix = project['name']
total_size, file_cnt = _append_targets(targets, project, prefix, total_size,
file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
ses_or_subj_list = self.app.db.sessions.find({'project': item_id}, ['_id', 'label', 'files', 'subject.code', 'subject_code'])
subject_prefixes = {
'missing_subject': prefix + '/missing_subject'
sessions = {}
for ses_or_subj in ses_or_subj_list:
subj_code = ses_or_subj.get('subject', {}).get('code') or ses_or_subj.get('subject_code')
if subj_code == 'subject':
subject_prefix = prefix + '/' + ses_or_subj.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, ses_or_subj, subject_prefix, total_size,
file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
subject_prefixes[str(ses_or_subj.get('_id'))] = subject_prefix
elif subj_code:
sessions[subj_code] = sessions.get(subj_code, []) + [ses_or_subj]
sessions['missing_subject'] = sessions.get('missing_subject', []) + [ses_or_subj]
for subj_code, ses_list in sessions.items():
subject_prefix = subject_prefixes.get(subj_code)
if not subject_prefix:
for session in ses_list:
session_prefix = subject_prefix + '/' + session.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, session, session_prefix, total_size,
file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
acquisitions = self.app.db.acquisitions.find({'session': session['_id']}, ['label', 'files'])
for acq in acquisitions:
acq_prefix = session_prefix + '/' + acq.get('label', 'untitled')
total_size, file_cnt = _append_targets(targets, acq, acq_prefix, total_size,
file_cnt, req_spec['optional'], data_path, req_spec.get('filters'))
log.debug(json.dumps(targets, sort_keys=True, indent=4, separators=(',', ': ')))
filename = prefix + '_' + datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') + '.tar'
ticket = util.download_ticket(self.request.client_addr, 'batch', targets, filename, total_size, projects)
return {'ticket': ticket['_id'], 'file_cnt': file_cnt, 'size': total_size}
def _archivestream(self, ticket):
CHUNKSIZE = 2**20 # stream files in 1MB chunks
stream = cStringIO.StringIO()
with tarfile.open(mode='w|', fileobj=stream) as archive:
for filepath, arcpath, _ in ticket['target']:
yield archive.gettarinfo(filepath, arcpath).tobuf()
with open(filepath, 'rb') as fd:
for chunk in iter(lambda: fd.read(CHUNKSIZE), ''):
yield chunk
if len(chunk) % BLOCKSIZE != 0:
yield (BLOCKSIZE - (len(chunk) % BLOCKSIZE)) * b'\0'
yield stream.getvalue() # get tar stream trailer
def _symlinkarchivestream(self, ticket, data_path):
for filepath, arcpath, _ in ticket['target']:
t = tarfile.TarInfo(name=arcpath)
t.type = tarfile.SYMTYPE
t.linkname = os.path.relpath(filepath, data_path)
yield t.tobuf()
stream = cStringIO.StringIO()
with tarfile.open(mode='w|', fileobj=stream) as archive:
yield stream.getvalue() # get tar stream trailer
def download(self):
In downloads we use filters in the payload to exclude/include files.
To pass a single filter, each of its conditions should be satisfied.
If a file pass at least one filter, it is included in the targets.
For example:
download_payload = {
'optional': True,
'nodes': [{'level':'project', '_id':project_id}],
will download files with tag 'incomplete' OR type different from 'dicom'
download_payload = {
'optional': True,
'nodes': [{'level':'project', '_id':project_id}],
will download only files with tag 'incomplete' AND type different from 'dicom'
ticket_id = self.get_param('ticket')
if ticket_id:
ticket = config.db.downloads.find_one({'_id': ticket_id})
if not ticket:
self.abort(404, 'no such ticket')
if ticket['ip'] != self.request.client_addr:
self.abort(400, 'ticket not for this source IP')
if self.get_param('symlinks'):
self.response.app_iter = self._symlinkarchivestream(ticket, config.get_item('persistent', 'data_path'))
self.response.app_iter = self._archivestream(ticket)
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename=' + str(ticket['filename'])
for project_id in ticket['projects']:
self.app.db.projects.update_one({'_id': project_id}, {'$inc': {'counter': 1}})
req_spec = self.request.json_body
validator = validators.payload_from_schema_file(self, 'input/download.json')
validator(req_spec, 'POST')
log.debug(json.dumps(req_spec, sort_keys=True, indent=4, separators=(',', ': ')))
if self.get_param('format') == 'bids':
return self._preflight_archivestream_bids(req_spec)
return self._preflight_archivestream(req_spec)
def sites(self):
"""Return local and remote sites."""
projection = ['name', 'onload']
# TODO onload for local is true
site_id = config.get_item('site', 'id')
if self.public_request or self.is_true('all'):
sites = list(config.db.sites.find(None, projection))
# TODO onload based on user prefs
remotes = (config.db.users.find_one({'_id': self.uid}, ['remotes']) or {}).get('remotes', [])
remote_ids = [r['_id'] for r in remotes] + [site_id]
sites = list(config.db.sites.find({'_id': {'$in': remote_ids}}, projection))
for s in sites: # TODO: this for loop will eventually move to public case
if s['_id'] == site_id:
s['onload'] = True
return sites
def register(self):
if not config.get_item('site', 'registered'):
self.abort(400, 'Site not registered with central')
if not config.get_item('site', 'ssl_cert'):
self.abort(400, 'SSL cert not configured')
if not config.get_item('site', 'central_url'):
self.abort(400, 'Central URL not configured')
if not centralclient.update(config.db, config.get_item('site', 'ssl_cert'), config.get_item('site', 'central_url')):
centralclient.fail_count += 1
centralclient.fail_count = 0
if centralclient.fail_count == 3:
log.warning('scitran central unreachable, purging all remotes info')