Skip to content
Snippets Groups Projects
Commit 5d453b78 authored by Kevin S. Hahn's avatar Kevin S. Hahn
Browse files

Merge pull request #17 from scitran/jobs_apps

Jobs apps
parents 1173e88b 86c90657
No related branches found
No related tags found
No related merge requests found
......@@ -18,7 +18,9 @@ import webapp2
import bson.json_util
import webapp2_extras.routes
import apps
import core
import jobs
import users
import projects
import sessions
......@@ -30,9 +32,22 @@ routes = [
webapp2.Route(r'/api', core.Core),
webapp2_extras.routes.PathPrefixRoute(r'/api', [
webapp2.Route(r'/download', core.Core, handler_method='download', methods=['GET', 'POST'], name='download'),
webapp2.Route(r'/download/<fn>', core.Core, handler_method='download', methods=['GET', 'POST'], name='named_download'),
webapp2.Route(r'/sites', core.Core, handler_method='sites', methods=['GET']),
webapp2.Route(r'/search', core.Core, handler_method='search', methods=['GET', 'POST']),
]),
webapp2.Route(r'/api/jobs', jobs.Jobs),
webapp2_extras.routes.PathPrefixRoute(r'/api/jobs', [
webapp2.Route(r'/next', jobs.Jobs, handler_method='next', methods=['GET']),
webapp2.Route(r'/count', jobs.Jobs, handler_method='count', methods=['GET']),
webapp2.Route(r'/<_id>', jobs.Job, name='job'),
]),
webapp2.Route(r'/api/apps', apps.Apps),
webapp2_extras.routes.PathPrefixRoute(r'/api/apps', [
webapp2.Route(r'/count', apps.Apps, handler_method='count', methods=['GET']),
webapp2.Route(r'/<_id>', apps.App, name='job'),
webapp2.Route(r'/<_id>/file', apps.App, handler_method='get_file'),
]),
webapp2.Route(r'/api/users', users.Users),
webapp2_extras.routes.PathPrefixRoute(r'/api/users', [
webapp2.Route(r'/count', users.Users, handler_method='count', methods=['GET']),
......@@ -92,6 +107,7 @@ routes = [
webapp2.Route(r'/<:[0-9a-f]{24}>', acquisitions.Acquisition, name='acquisition'),
webapp2.Route(r'/<:[0-9a-f]{24}>/file', acquisitions.Acquisition, handler_method='get_file', methods=['GET', 'POST']),
webapp2.Route(r'/<:[0-9a-f]{24}>/file', acquisitions.Acquisition, handler_method='put_file', methods=['PUT']),
webapp2.Route(r'/<:[0-9a-f]{24}>/tile', acquisitions.Acquisition, handler_method='get_tile', methods=['GET']),
webapp2.Route(r'/<:[0-9a-f]{24}>/attachment', acquisitions.Acquisition, handler_method='delete_attachment', methods=['DELETE']),
webapp2.Route(r'/<:[0-9a-f]{24}>/attachment', acquisitions.Acquisition, handler_method='get_attachment', methods=['GET', 'POST']),
webapp2.Route(r'/<:[0-9a-f]{24}>/attachment', acquisitions.Acquisition, handler_method='put_attachment', methods=['PUT']),
......@@ -118,6 +134,7 @@ if __name__ == '__main__':
arg_parser.add_argument('--port', default='8080', help='TCP port to listen on [8080]')
arg_parser.add_argument('--db_uri', help='SciTran DB URI', default='mongodb://localhost/scitran')
arg_parser.add_argument('--data_path', help='path to storage area', required=True)
arg_parser.add_argument('--apps_path', help='path to apps storage', required=True)
arg_parser.add_argument('--log_level', help='log level [info]', default='info')
arg_parser.add_argument('--ssl_cert', help='path to SSL certificate file, containing private key and certificate chain', required=True)
arg_parser.add_argument('--site_id', help='site ID for Scitran Central [local]', default='local')
......
......@@ -17,6 +17,7 @@ os.umask(0o022)
ap = argparse.ArgumentParser()
ap.add_argument('--db_uri', help='SciTran DB URI', required=True)
ap.add_argument('--data_path', help='path to storage area', required=True)
ap.add_argument('--apps_path', help='path to apps storage', required=True)
ap.add_argument('--ssl_cert', help='path to SSL certificate file, containing private key and certificate chain', required=True)
ap.add_argument('--api_uri', help='api uri, with https:// prefix')
ap.add_argument('--site_id', help='site ID for Scitran Central [local]', default='local')
......
apps.py 0 → 100644
# @author: Kevin S Hahn
"""
API request handlers for Apps.
represents the /nimsapi/apps route
"""
import os
import json
import bson
import shutil
import hashlib
import logging
import tarfile
import jsonschema
log = logging.getLogger('nimsapi.jobs')
import tempdir as tempfile
import base
# TODO: create schemas to verify various json payloads
APP_SCHEMA = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'title': 'App',
'type': 'object',
'properties': {
'_id': {
'title': 'ID',
'type': 'string',
},
'entrypoint': { # MR SPECIFIC!!!
'title': 'Entrypoint',
'type': 'string',
},
'outputs': {
'title': 'Outputs',
'type': 'array',
},
'default': { # MR SPECIFIC!!!
'title': 'Default Application',
'type': 'boolean',
},
'app_type': {
'title': 'App Type',
'type': 'string',
},
'inputs': {
'title': 'Inputs',
'type': 'array',
},
},
'required': ['_id', 'entrypoint', 'outputs', 'default', 'app_type', 'inputs'],
'additionalProperties': True
}
# TODO: apps should be stored separately from the datasets
# possible in something similar to 'quarantine', or at a whole different
# location. this should also be configurable.
class Apps(base.RequestHandler):
"""Return information about the all the apps."""
def get(self):
return list(self.app.db.apps.find())
def count(self):
return self.app.db.apps.count()
def post(self):
"""Create a new App."""
# this handles receive and writing the file
# but the the json validation and database is handled by util.
if self.public_request: # TODO: how to handle auth during bootstrap?
self.abort(403, 'must be logged in to upload apps')
apps_path = self.app.config['apps_path']
app_meta = None
with tempfile.TemporaryDirectory(prefix='.tmp', dir=apps_path) as tempdir_path:
hash_ = hashlib.sha1()
app_temp = os.path.join(tempdir_path, 'temp')
with open(app_temp, 'wb') as fd:
for chunk in iter(lambda: self.request.body_file.read(2**20), ''):
hash_.update(chunk)
fd.write(chunk)
if hash_.hexdigest() != self.request.headers['Content-MD5']:
self.abort(400, 'Content-MD5 mismatch.') # sha1
if not tarfile.is_tarfile(app_temp):
self.abort(415, 'Only tar files are accepted.')
with tarfile.open(app_temp) as tf:
for ti in tf:
if ti.name.endswith('description.json'):
app_meta = json.load(tf.extractfile(ti))
break
if not app_meta:
self.abort(415, 'application tar does not contain description.json')
try:
jsonschema.validate(app_meta, APP_SCHEMA)
except (ValueError, jsonschema.ValidationError) as e:
self.abort(400, str(e))
util.insert_app(self.app.db, app_temp, apps_path, app_meta=app_meta) # pass meta info, prevent re-reading
log.debug('Recieved App: %s' % app_info.get('_id'))
class App(base.RequestHandler):
def get(self, _id):
# TODO: auth? should viewing apps be restricted?
return self.app.db.apps.find_one({'_id': _id})
def get_file(self, _id):
if self.public_request: # this will most often be a drone request
self.abort(403, 'must be logged in to download apps')
name, version = _id.split(':')
fn = '%s-%s.tar' % (name, version)
fp = os.path.join(self.app.config['apps_path'], name, fn)
self.response.app_iter = open(fp, 'rb')
self.response.headers['Content-Length'] = str(os.path.getsize(fp)) # must be set after setting app_iter
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename=%s' % fn
......@@ -53,11 +53,16 @@ class RequestHandler(webapp2.RequestHandler):
if not self.app.db.sites.find_one({'_id': remote_instance}):
self.abort(402, remote_instance + ' is not an authorized remote instance')
else:
if not self.app.db.drones.find_one({'_id': remote_instance}):
self.abort(402, remote_instance + ' is not an authorized drone')
drone_type, drone_id = self.request.user_agent.replace('SciTran', '').strip().split()
if not self.app.db.drones.find_one({'_id': drone_id}):
self.abort(402, drone_id + ' is not an authorized drone')
self.drone_request = True
self.public_request = not bool(self.uid)
if self.public_request or self.source_site:
log.debug('public request: %s' % str(self.public_request))
if self.drone_request and not self.source_site: # engine request
self.public_request = False
self.superuser_request = True
elif self.public_request or self.source_site:
self.superuser_request = False
else:
user = self.app.db.users.find_one({'_id': self.uid}, ['root', 'wheel'])
......
......@@ -19,5 +19,11 @@
"lastname": "User",
"wheel": true
}
],
"drones": [
{
"_id": "local",
"type": "engine"
}
]
}
......@@ -3,13 +3,16 @@
# @author: Gunnar Schaefer
import os
import bson
import json
import time
import pymongo
import hashlib
import logging
import argparse
import datetime
import util
def connect_db(db_uri, **kwargs):
for x in range(0, 30):
......@@ -72,6 +75,7 @@ def dbinit(args):
db.acquisitions.create_index('collections')
db.tokens.create_index('timestamp', expireAfterSeconds=600)
db.downloads.create_index('timestamp', expireAfterSeconds=60)
# TODO: apps and jobs indexes (indicies?)
if args.json:
with open(args.json) as json_dump:
......@@ -80,6 +84,8 @@ def dbinit(args):
db.users.insert(input_data['users'])
if 'groups' in input_data:
db.groups.insert(input_data['groups'])
if 'drones' in input_data:
db.drones.insert(input_data['drones'])
for u in db.users.find():
db.users.update({'_id': u['_id']}, {'$set': {'email_hash': hashlib.md5(u['email']).hexdigest()}})
......@@ -90,10 +96,107 @@ example:
./scripts/bootstrap.py dbinit mongodb://cnifs.stanford.edu/nims?replicaSet=cni -j nims_users_and_groups.json
"""
def appsinit(args):
"""Upload an app."""
import tarfile
db_client = connect_db(args.db_uri)
db = db_client.get_default_database()
app_tgz = args.app_tgz + '.tgz'
if not os.path.exists(app_tgz):
with tarfile.open(app_tgz, 'w:gz', compresslevel=6) as tf:
tf.add(args.app_tgz, arcname=os.path.basename(args.app_tgz))
util.insert_app(db, app_tgz, args.apps_path)
appsinit_desc = """
example:
./scripts/bootstrap.py appsinit mongodb://cnifs.stanford.edu/nims?repliaceSet=cni /path/to/app/dir /path/to/apps/storage
"""
# TODO: this should use util.create_job to eliminate duplicate code
# TODO: update util.create_job to be useable from this bootstrap script.
def jobsinit(args):
"""Create a job entry for every acquisition's orig dataset."""
db_client = connect_db(args.db_uri)
db = db_client.get_default_database()
if args.force:
db.drop_collection('jobs')
counter = db.jobs.count() + 1 # where to start creating jobs
dbc = db.jobs
for a in db.acquisitions.find({'files.state': ['orig']}, {'files.$': 1, 'session': 1, 'series': 1, 'acquisition': 1}):
if a.get('files')[0].get('kinds')[0] == 'screenshot':
print 'no default app set for screenshots. skipping...'
continue
else:
app_id = 'scitran/dcm2nii:latest'
app_outputs = [
{
'fext': '.nii.gz',
'state': ['derived', ],
'type': 'nifti',
'kinds': a.get('files')[0].get('kinds'), # there should be someway to indicate 'from parent file'
},
{
'fext': '.bvec',
'state': ['derived', ],
'type': 'text',
'kinds': ['bvec', ],
},
{
'fext': '.bval',
'state': ['derived', ],
'type': 'text',
'kinds': ['bval', ],
},
]
aid = a.get('_id')
session = db.sessions.find_one({'_id': bson.ObjectId(a.get('session'))})
project = db.projects.find_one({'_id': bson.ObjectId(session.get('project'))})
output_url = '%s/%s/%s' % ('acquisitions', aid, 'file')
db.jobs.insert({
'_id': counter,
'group': project.get('group_id'),
'project': {
'_id': project.get('_id'),
'name': project.get('name'),
},
'exam': session.get('exam'),
'app': {
'_id': 'scitran/dcm2nii:latest',
'type': 'docker',
},
'inputs': [
{
'url': '%s/%s/%s' % ('acquisitions', aid, 'file'),
'payload': {
'type': a['files'][0]['type'],
'state': a['files'][0]['state'],
'kinds': a['files'][0]['kinds'],
},
}
],
'outputs': [{'url': output_url, 'payload': i} for i in app_outputs],
'status': 'pending', # queued
'activity': None,
'added': datetime.datetime.now(),
'timestamp': datetime.datetime.now(),
})
print 'created job %d, group: %s, project %s, exam %s, %s.%s' % (counter, project.get('group_id'), project.get('_id'), session.get('exam'), a.get('series'), a.get('acquisition'))
counter += 1
jobinit_desc = """
example:
./scripts/bootstrap.py jobsinit mongodb://cnifs.stanford.edu/nims?replicaSet=cni
"""
def sort(args):
logging.basicConfig(level=logging.WARNING)
import util
quarantine_path = os.path.join(args.sort_path, 'quarantine')
if not os.path.exists(args.sort_path):
os.makedirs(args.sort_path)
......@@ -213,6 +316,27 @@ dbinit_parser.add_argument('-j', '--json', help='JSON file containing users and
dbinit_parser.add_argument('db_uri', help='DB URI')
dbinit_parser.set_defaults(func=dbinit)
appsinit_parser = subparsers.add_parser(
name='appsinit',
help='load an app',
description=appsinit_desc,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
appsinit_parser.add_argument('db_uri', help='DB URI')
appsinit_parser.add_argument('app_tgz', help='filesystem path to tgz of app build context')
appsinit_parser.add_argument('apps_path', help='filesystem path to loaded apps')
appsinit_parser.set_defaults(func=appsinit)
jobsinit_parser = subparsers.add_parser(
name='jobsinit',
help='initalize jobs collection from existing acquisitions',
description=dbinit_desc,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
jobsinit_parser.add_argument('-f', '--force', action='store_true', help='wipe out any existing jobs')
jobsinit_parser.add_argument('db_uri', help='DB URI')
jobsinit_parser.set_defaults(func=jobsinit)
sort_parser = subparsers.add_parser(
name='sort',
help='sort all files in a dicrectory tree',
......
......@@ -10,6 +10,8 @@ import datetime
import jsonschema
import bson.json_util
import tempdir as tempfile
import base
import util
import users
......@@ -199,23 +201,31 @@ class Container(base.RequestHandler):
self.response.app_iter = open(filepath, 'rb')
self.response.headers['Content-Length'] = str(file_info['size']) # must be set after setting app_iter
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename=%s' % str(filename)
else:
ticket = util.download_ticket('single', filepath, filename, file_info['size'])
tkt_id = self.app.db.downloads.insert(ticket)
return {'url': self.uri_for('download', _full=True, ticket=tkt_id)}
return {'url': self.uri_for('named_download', fn=filename, _scheme='https', ticket=tkt_id)}
def put_file(self, cid=None):
def _put(self, cid=None, flavor='file'):
"""
Receive a targeted processor or user upload.
Receive a targeted processor or user upload for an attachment or file.
Accepts a multipart request that contains json in first part, and data in second part.
This POST route is used to add a file to an existing container, not for creating new containers.
This upload is different from the main PUT route, because this does not update the primary
metadata, nor does it try to determine where to place the file. It always gets placed in
the current container.
This PUT route is used to add a file to an existing container, not for creating new containers.
This upload is different from the main PUT route, because this does not update the main container
metadata, nor does it try to parse the file to determine sorting information. The uploaded file(s)
will always get uploaded to the specificied container.
Accepts a multipart request that contains the following form fields:
- 'metadata': list of dicts, each dict contains metadata for a file
- filename: file object
- 'sha': list of dicts, each dict contains 'name' and 'sha1'.
"""
# TODO; revise how engine's upload their data to be compatible with the put_attachment fxn
# TODO read self.request.body, using '------WebKitFormBoundary' as divider
# first line is 'content-disposition' line, extract filename
# second line is content-type, determine how to write to a file, as bytes or as string
# third linedata_path = self.app.config['data_path'], just a separator, useless
def receive_stream_and_validate(stream, digest, filename):
# FIXME pull this out to also be used from core.Core.put() and also replace the duplicated code below
hash_ = hashlib.sha1()
......@@ -228,63 +238,6 @@ class Container(base.RequestHandler):
self.abort(400, 'Content-MD5 mismatch.')
return filepath
if cid is None: # sortable user upload
pass
else: # targeted upload
pass
if self.request.content_type != 'multipart/form-data':
self.abort(400, 'content-type must be "multipart/form-data"')
try:
metadata = json.loads(self.request.get('metadata'))
jsonschema.validate(metadata, FILE_UPLOAD_SCHEMA)
except (ValueError, jsonschema.ValidationError) as e:
self.abort(400, str(e))
if self.public_request: # processor upload
_id = None
# FIXME check that processor is legit
elif cid is not None: # targeted user upload
_id = bson.ObjectId(cid)
container, _ = self._get(_id, 'rw')
else: # sortable user upload
pass
# FIXME: pre-parse file, reject if unparsable
data_path = self.app.config['data_path']
quarantine_path = self.app.config['quarantine_path']
with tempfile.TemporaryDirectory(prefix='.tmp', dir=data_path) as tempdir_path:
filepaths = []
for file_info in metadata['files']:
hash_ = hashlib.sha1()
filename = file_info['name'] + file_info['ext']
filepaths.append(os.path.join(tempdir_path, filename))
field_storage_obj = self.request.POST.get(filename)
with open(filepaths[-1], 'wb') as fd:
for chunk in iter(lambda: field_storage_obj.file.read(2**20), ''):
hash_.update(chunk)
fd.write(chunk)
if hash_.hexdigest() != file_info['sha1']:
self.abort(400, 'Content-MD5 mismatch.')
log.info('Received %s [%s] from %s' % (filename, util.hrsize(file_info['size']), self.request.user_agent)) # FIXME: user_agent or uid
for filepath in filepaths:
status, detail = util.insert_file(self.dbc, _id, file_info, filepath, file_info['sha1'], data_path, quarantine_path)
if status != 200:
self.abort(status, detail)
def put_attachment(self, cid):
"""
Recieve a targetted user upload of an attachment.
Attachments are different from files, in that they are not 'research ready'. Attachments
represent other documents that are generally not useable by the engine; documents like
consent forms, pen/paper questionnaires, study recruiting materials, etc.
Internally, attachments are distinguished from files because of what metadata is
required. Attachments really only need a 'kinds' and 'type'. We don't expect iteration over
an attachment in a way that would require tracking 'state'.
"""
# TODO read self.request.body, using '------WebKitFormBoundary' as divider
# first line is 'content-disposition' line, extract filename
# second line is content-type, determine how to write to a file, as bytes or as string
# third linedata_path = self.app.config['data_path'], just a separator, useless
if self.request.content_type != 'multipart/form-data':
self.abort(400, 'content-type must be "multipart/form-data"')
# TODO: metadata validation
......@@ -317,13 +270,49 @@ class Container(base.RequestHandler):
self.abort(400, 'Content-MD5 mismatch %s vs %s' % (fhash.hexdigest(), s.get('sha1')))
else:
finfo['sha1'] = s.get('sha1')
status, detail = util.insert_file(self.dbc, _id, finfo, filepath, s.get('sha1'), data_path, quarantine_path, flavor='attachment')
status, detail = util.insert_file(self.dbc, _id, finfo, filepath, s.get('sha1'), data_path, quarantine_path, flavor=flavor)
if status != 200:
self.abort(400, 'upload failed')
break
else:
self.abort(400, '%s is not listed in the sha1s' % fname)
def put_file(self, cid=None):
"""Receive a targeted upload of a dataset file."""
self._put(cid, flavor='file')
def put_attachment(self, cid):
"""Recieve a targetted upload of an attachment file."""
self._put(cid, flavor='attachment')
def get_tile(self, cid):
"""fetch info about a tiled tiff, or retrieve a specific tile."""
_id = bson.ObjectId(cid)
montage_info = self.dbc.find_one(
{
'_id': _id,
'$and': [
{'files.kinds': ['montage']},
{'files.ext': '.tiff'},
],
},
['files.$'],
)
if not montage_info:
self.abort(404, 'montage tiff not found')
fn = montage_info['files'][0]['name'] + montage_info['files'][0]['ext']
fp = os.path.join(self.app.config['data_path'], cid[-3:], cid, fn)
z = self.request.get('z')
x = self.request.get('x')
y = self.request.get('y')
if not (z and x and y):
return util.get_info(fp)
else:
self.response.content_type = 'image/png'
tile = util.get_tile(fp, int(z), int(x), int(y))
if tile:
self.response.write(tile)
def get_attachment(self, cid):
"""Download one attachment."""
fname = self.request.get('name')
......@@ -342,7 +331,7 @@ class Container(base.RequestHandler):
else:
ticket = util.download_ticket('single', fpath, fname, a_info['size'])
tkt_id = self.app.db.downloads.insert(ticket)
return {'url': self.uri_for('download', _full=True, ticket=tkt_id)}
return {'url': self.uri_for('named_download', fn=fname, _scheme='https', ticket=tkt_id)}
def delete_attachment(self, cid):
"""Delete one attachment."""
......
......@@ -214,7 +214,7 @@ class Core(base.RequestHandler):
filename = 'sdm_' + datetime.datetime.utcnow().strftime('%Y%m%d_%H%M%S') + '.zip'
ticket = util.download_ticket('batch', targets, filename, total_size)
tkt_id = self.app.db.downloads.insert(ticket)
return {'url': self.uri_for('download', _full=True, ticket=tkt_id), 'file_cnt': file_cnt, 'size': total_size}
return {'url': self.uri_for('named_download', fn=filename, _scheme='https', ticket=tkt_id), 'file_cnt': file_cnt, 'size': total_size}
def _archivestream(self, ticket):
length = None # FIXME compute actual length
......@@ -224,8 +224,12 @@ class Core(base.RequestHandler):
z.write(filepath, acrpath)
return z, length
def download(self):
def download(self, fn=None):
# discard the filename, fn. backend doesn't need it, but
# front end makes uses of filename in url.
ticket_id = self.request.get('ticket')
# TODO: what's the default? stream or attach?
attach = self.request.get('attach').lower() in ['1', 'true']
if ticket_id:
ticket = self.app.db.downloads.find_one({'_id': ticket_id})
if not ticket:
......@@ -236,8 +240,11 @@ class Core(base.RequestHandler):
else:
self.response.app_iter, length = self._archivestream(ticket)
self.response.headers['Content-Length'] = str(length) # must be set after setting app_iter
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename=' + str(ticket['filename'])
if attach:
self.response.headers['Content-Type'] = 'application/octet-stream'
self.response.headers['Content-Disposition'] = 'attachment; filename=' + str(ticket['filename'])
else:
self.response.headers['Content-Type'] = util.guess_mime(ticket['filename'])
else:
try:
req_spec = self.request.json_body
......@@ -360,7 +367,151 @@ class Core(base.RequestHandler):
def search(self):
"""Search."""
SEARCH_POST_SCHEMA = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'title': 'File',
'type': 'object',
'properties': {
'subj_code': {
'title': 'Subject Code',
'type': 'string',
},
'subj_firstname': {
'title': 'Subject First Name', # hash
'type': 'string',
},
'subj_lastname': {
'title': 'Subject Last Name',
'type': 'string',
},
'scan_type': { # MR SPECIFIC!!!
'title': 'Scan Type',
'enum': self.app.db.acquisitions.distinct('types.kind')
},
'date_from': {
'title': 'Date From',
'type': 'string',
},
'date_to': {
'title': 'Date To',
'type': 'string',
},
'psd': { # MR SPECIFIC!!!
'title': 'PSD Name',
'type': 'string', # 'enum': self.app.db.acquisitions.distinct('psd'),
},
'subj_age_max': { # age in years
'title': 'Subject Age Max',
'type': 'integer',
},
'subj_age_min': { # age in years
'title': 'Subject Age Min',
'type': 'integer',
},
'exam': {
'title': 'Exam Number',
'type': 'integer',
},
'description': {
'title': 'Description',
'type': 'string',
},
},
# 'required': ['subj_code', 'scan_type', 'date_from', 'date_to', 'psd_name', 'operator', 'subj_age_max', 'subj_age_min', 'exam'],
# 'additionalProperties': False
}
if self.request.method == 'GET':
return self.search_schema
else:
return SEARCH_POST_SCHEMA
try:
json_body = self.request.json_body
jsonschema.validate(json_body, SEARCH_POST_SCHEMA)
except (ValueError, jsonschema.ValidationError) as e:
self.abort(400, str(e))
# TODO: search needs to include operator details? do types of datasets have an 'operator'?
# TODO: sessions need to have more 'subject' information to be able to do age searching
# construct the queries based on the information available
# TODO: provide a schema that allows directly using the request data, rather than
# requiring construction of the queries....
session_query = {}
exam = json_body.get('exam')
subj_code = json_body.get('subj_code')
age_max = json_body.get('subj_age_max')
age_min = json_body.get('subj_age_min')
if exam:
session_query.update({'exam': exam})
if subj_code:
session_query.update({'subject.code': subj_code})
if age_min and age_max:
session_query.update({'subject.age': {'$gte': age_min, '$lte': age_max}})
elif age_max:
session_query.update({'subject.age': {'$lte': age_max}})
elif age_min:
session_query.update({'subject.age': {'$gte': age_min}})
# TODO: don't build these, want to get as close to dump the data from the request
acq_query = {}
psd = json_body.get('psd')
types_kind = json_body.get('scan_type')
time_fmt = '%Y-%m-%d' # assume that dates will come in as "2014-01-01"
description = json_body.get('description')
date_to = json_body.get('date_to') # need to do some datetime conversion
if date_to:
date_to = datetime.datetime.strptime(date_to, time_fmt)
date_from = json_body.get('date_from') # need to do some datetime conversion
if date_from:
date_from = datetime.datetime.strptime(date_from, time_fmt)
if psd:
acq_query.update({'psd': psd})
if types_kind:
acq_query.update({'types.kind': types_kind})
if date_to and date_from:
acq_query.update({'timestamp': {'$gte': date_from, '$lte': date_to}})
elif date_to:
acq_query.update({'timestamp': {'$lte': date_to}})
elif date_from:
acq_query.update({'timestamp': {'$gte': date_from}})
if description:
# glob style matching, whole word must exist within description
pass
# also query sessions
sessions = list(self.app.db.sessions.find(session_query))
session_ids = [s['_id'] for s in sessions]
log.debug(session_ids)
# first find the acquisitions that meet the acquisition level query params
aquery = {'session': {'$in': session_ids}}
aquery.update(acq_query)
log.debug(aquery)
# build a more complex response, and clean out database specifics
groups = []
projects = []
sessions = []
acqs = list(self.app.db.acquisitions.find(aquery))
for acq in acqs:
session = self.app.db.sessions.find_one({'_id': acq['session']})
project = self.app.db.projects.find_one({'_id': session['project']})
group = project['group_id']
del project['group_id']
project['group'] = group
acq['_id'] = str(acq['_id'])
acq['session'] = str(acq['session'])
session['_id'] = str(session['_id'])
session['project'] = str(session['project'])
project['_id'] = str(project['_id'])
if session not in sessions:
sessions.append(session)
if project not in projects:
projects.append(project)
if group not in groups:
groups.append(group)
results = {
'groups': groups,
'projects': projects,
'sessions': sessions,
'acquisitions': acqs,
}
return results
jobs.py 0 → 100644
# @author: Kevin S Hahn
"""
API request handlers for process-job-handling.
represents the /nimsapi/jobs route
"""
import logging
import datetime
log = logging.getLogger('nimsapi.jobs')
import base
# TODO: what should this whitelist contain? protocol + FQDN?
# ex. https://coronal.stanford.edu
PROCESSOR_WHITELIST = [
'dockerhost',
]
JOB_STATES = [
'pending', # created but not started
'queued', # job claimed by a processor
'running', # job running on a processor
'done', # job completed successfully
'failed', # some error occurred,
'paused', # job paused. can't think when this would be useful...
]
# Jobs must now how they affect the various components of a file description
# some "special" case things will reset state from 'orig' to 'pending'
# but the usual case will be to append an item to the state list.
# TODO: create job function should live here
# where it can be editted with the route that consume and modify the jobs
# GET /jobs full list of jobs, allow specifiers, status=
# POST /jobs creates a new job. this will be used by webapp to add new jobs
# GET /jobs/<_id> get information about one job
# PUT /jobs/<_id> update informabout about one job
# GET /jobs/next, special route to get the 'next job'
class Jobs(base.RequestHandler):
"""Provide /jobs API routes."""
def get(self):
"""
Return one Job that needs processing.
TODO: allow querying for group
TODO: allow querying for project
TODO: allow querying by other meta data. can this be generalized?
"""
# TODO: auth
return list(self.app.db.jobs.find())
def count(self):
"""Return the total number of jobs."""
# no auth?
return self.app.db.jobs.count()
def counts(self):
"""Return more information about the jobs."""
counts = {
'total': self.app.db.jobs.count(),
'failed': self.app.db.jobs.find({'status': 'failed'}).count(),
'pending': self.app.db.jobs.find({'status': 'pending'}).count(),
'done': self.app.db.jobs.find({'status': 'done'}).count(),
}
return counts
def next(self):
"""Return the next job in the queue that matches the query parameters."""
# TODO: add ability to query on things like psd type or psd name
try:
query_params = self.request.json
except ValueError as e:
self.abort(400, str(e))
query = {'status': 'pending'}
try:
query_params = self.request.json
except ValueError as e:
self.abort(400, str(e))
project_query = query_params.get('project')
group_query = query_params.get('group')
query = {'status': 'pending'}
if project_query:
query.update({'project': project_query})
if group_query:
query.update({'group': group_query})
# TODO: how to guarantee the 'oldest' jobs pending jobs are given out first
job_spec = self.app.db.jobs.find_and_modify(
query,
{'$set': {'status': 'queued', 'modified': datetime.datetime.now()}},
sort=[('modified', -1)],
new=True
)
return job_spec
class Job(base.RequestHandler):
"""Provides /Jobs/<jid> routes."""
# TODO flesh out the job schema
json_schema = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'title': 'User',
'type': 'object',
'properties': {
'_id': {
'title': 'Job ID',
'type': 'string',
},
},
'required': ['_id'],
'additionalProperties': True,
}
def get(self, _id):
return self.app.db.jobs.find_one({'_id': int(_id)})
def put(self, _id):
"""Update a single job."""
payload = self.request.json
# TODO: validate the json before updating the db
self.app.db.jobs.update({'_id': int(_id)}, {'$set': {'status': payload.get('status'), 'activity': payload.get('activity')}})
......@@ -4,21 +4,44 @@ import logging
log = logging.getLogger('scitran.api')
import os
import json
import bson
import copy
import shutil
import difflib
import tarfile
import datetime
import mimetypes
import tempdir as tempfile
import scitran.data
import scitran.data.medimg.montage
mimetypes.types_map.update({'.bvec': 'text/plain'})
mimetypes.types_map.update({'.bval': 'text/plain'})
get_info = scitran.data.medimg.montage.get_info
get_tile = scitran.data.medimg.montage.get_tile
PROJECTION_FIELDS = ['timestamp', 'permissions', 'public']
def guess_mime(fn):
"""Guess mimetype based on filename."""
# TODO: could move mime types to scitran.data, but that would only work well if ALL files
# went thrugh scitra.data. We can guarantee that all files go through the API during upload,
# or download. the API seems the right place to determine mime information.
mime, enc = mimetypes.guess_type(fn)
if not mime:
mime = 'application/octet-stream'
return mime
def insert_file(dbc, _id, file_info, filepath, digest, data_path, quarantine_path, flavor='file'):
"""Insert a file as an attachment or as a file."""
filename = os.path.basename(filepath)
flavor += 's'
dataset = None
if _id is None:
try:
log.info('Parsing %s' % filename)
......@@ -67,6 +90,8 @@ def insert_file(dbc, _id, file_info, filepath, digest, data_path, quarantine_pat
if not success['updatedExisting']:
dbc.update({'_id': _id}, {'$push': {flavor: file_info}})
shutil.move(filepath, container_path + '/' + filename)
if dataset: # only create jobs if dataset is parseable
create_job(dbc, dataset)
log.debug('Done %s' % os.path.basename(filepath)) # must use filepath, since filename is updated for sorted files
return 200, 'Success'
......@@ -123,8 +148,105 @@ def _update_db(db, dataset):
if dataset.nims_timestamp:
db.projects.update({'_id': project['_id']}, {'$max': dict(timestamp=dataset.nims_timestamp)})
db.sessions.update({'_id': session['_id']}, {'$min': dict(timestamp=dataset.nims_timestamp), '$set': dict(timezone=dataset.nims_timezone)})
# create a job, if necessary
return acquisition['_id']
# TODO: create job should be use-able from bootstrap.py with only database information
def create_job(dbc, dataset):
db = dbc.database
type_ = dataset.nims_file_type
kinds_ = dataset.nims_file_kinds
state_ = dataset.nims_file_state
app = None
# TODO: check if there are 'default apps' set for this project/session/acquisition
acquisition = db.acquisitions.find_one({'uid': dataset.nims_acquisition_id})
session = db.sessions.find_one({'_id': bson.ObjectId(acquisition.get('session'))})
project = db.projects.find_one({'_id': bson.ObjectId(session.get('project'))})
aid = acquisition.get('_id')
# XXX: if an input kinds = None, then that job is meant to work on any file kinds
app = db.apps.find_one({
'$or': [
{'inputs': {'$elemMatch': {'type': type_, 'state': state_, 'kinds': kinds_}}, 'default': True},
{'inputs': {'$elemMatch': {'type': type_, 'state': state_, 'kinds': None}}, 'default': True},
],
})
# TODO: this has to move...
# force acquisition dicom file to be marked as 'optional = True'
db.acquisitions.find_and_modify(
{'uid': dataset.nims_acquisition_id, 'files.type': 'dicom'},
{'$set': {'files.$.optional': True}},
)
if not app:
log.info('no app for type=%s, state=%s, kinds=%s, default=True. no job created.' % (type_, state_, kinds_))
else:
# XXX: outputs can specify to __INHERIT__ a value from the parent input file, for ex: kinds
for output in app['outputs']:
if output['kinds'] == '__INHERIT__':
output['kinds'] = kinds_
# TODO: job description needs more metadata to be searchable in a useful way
output_url = '%s/%s/%s' % ('acquisitions', aid, 'file')
job = db.jobs.find_and_modify(
{
'_id': db.jobs.count() + 1,
},
{
'_id': db.jobs.count() + 1,
'group': project.get('group_id'),
'project': {
'_id': project.get('_id'),
'name': project.get('name'),
},
'exam': session.get('exam'),
'app': {
'_id': app['_id'],
'type': 'docker',
},
'inputs': [
{
'filename': dataset.nims_file_name + dataset.nims_file_ext,
'url': '%s/%s/%s' % ('acquisitions', aid, 'file'),
'payload': {
'type': dataset.nims_file_type,
'state': dataset.nims_file_state,
'kinds': dataset.nims_file_kinds,
},
}
],
'outputs': [{'url': output_url, 'payload': i} for i in app['outputs']],
'status': 'pending',
'activity': None,
'added': datetime.datetime.now(),
'timestamp': datetime.datetime.now(),
},
upsert=True,
new=True,
)
log.info('created job %d, group: %s, project %s' % (job['_id'], job['group'], job['project']))
def insert_app(db, fp, apps_path, app_meta=None):
"""Validate and insert an application tar into the filesystem and database."""
# download, md-5 check, and json validation are handled elsewhere
if not app_meta:
with tarfile.open(fp) as tf:
for ti in tf:
if ti.name.endswith('description.json'):
app_meta = json.load(tf.extractfile(ti))
break
name, version = app_meta.get('_id').split(':')
app_dir = os.path.join(apps_path, name)
if not os.path.exists(app_dir):
os.makedirs(app_dir)
app_tar = os.path.join(app_dir, '%s-%s.tar' % (name, version))
app_meta.update({'asset_url': 'apps/%s' % app_meta.get('_id')})
db.apps.update({'_id': app_meta.get('_id')}, app_meta, new=True, upsert=True)
shutil.move(fp, app_tar)
def _entity_metadata(dataset, properties, metadata={}, parent_key=''):
metadata = copy.deepcopy(metadata)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment