Skip to content
Snippets Groups Projects
Commit 48349cc5 authored by Kevin S. Hahn's avatar Kevin S. Hahn
Browse files

Merge branch 'upload'

parents 85adaae5 c5a0daa4
No related branches found
No related tags found
No related merge requests found
......@@ -33,6 +33,7 @@ routes = [
webapp2_extras.routes.PathPrefixRoute(r'/api', [
webapp2.Route(r'/download', core.Core, handler_method='download', methods=['GET', 'POST'], name='download'),
webapp2.Route(r'/download/<fn>', core.Core, handler_method='download', methods=['GET', 'POST'], name='named_download'),
webapp2.Route(r'/incremental', core.Core, handler_method='incremental_upload', methods=['POST']),
webapp2.Route(r'/sites', core.Core, handler_method='sites', methods=['GET']),
webapp2.Route(r'/search', core.Core, handler_method='search', methods=['GET', 'POST']),
]),
......@@ -135,7 +136,7 @@ if __name__ == '__main__':
arg_parser.add_argument('--port', default='8080', help='TCP port to listen on [8080]')
arg_parser.add_argument('--db_uri', help='SciTran DB URI', default='mongodb://localhost/scitran')
arg_parser.add_argument('--data_path', help='path to storage area', required=True)
arg_parser.add_argument('--apps_path', help='path to apps storage', required=True)
arg_parser.add_argument('--apps_path', help='path to apps storage')
arg_parser.add_argument('--log_level', help='log level [info]', default='info')
arg_parser.add_argument('--ssl_cert', help='path to SSL certificate file, containing private key and certificate chain', required=True)
arg_parser.add_argument('--site_id', help='site ID for Scitran Central [local]', default='local')
......@@ -145,6 +146,7 @@ if __name__ == '__main__':
args = arg_parser.parse_args()
args.quarantine_path = os.path.join(args.data_path, 'quarantine')
args.upload_path = os.path.join(args.data_path, 'upload')
app.config = vars(args)
logging.getLogger('paste.httpserver').setLevel(logging.WARNING) # silence paste logging
......@@ -159,6 +161,13 @@ if __name__ == '__main__':
os.makedirs(app.config['data_path'])
if not os.path.exists(app.config['quarantine_path']):
os.makedirs(app.config['quarantine_path'])
if not os.path.exists(app.config['upload_path']):
os.makedirs(app.config['upload_path'])
if not application.config['apps_path']:
log.warning('apps_path is not defined. Apps functionality disabled')
else:
if not os.path.exists(application.config['apps_path']):
os.makedirs(application.config['apps_path'])
db_client = pymongo.MongoReplicaSetClient(args.db_uri) if 'replicaSet' in args.db_uri else pymongo.MongoClient(args.db_uri)
app.db = db_client.get_default_database()
......
......@@ -6,6 +6,7 @@ import time
import logging
import pymongo
import argparse
import datetime
import uwsgidecorators
import api
......@@ -17,7 +18,7 @@ os.umask(0o022)
ap = argparse.ArgumentParser()
ap.add_argument('--db_uri', help='SciTran DB URI', required=True)
ap.add_argument('--data_path', help='path to storage area', required=True)
ap.add_argument('--apps_path', help='path to apps storage', required=True)
ap.add_argument('--apps_path', help='path to apps storage')
ap.add_argument('--ssl_cert', help='path to SSL certificate file, containing private key and certificate chain', required=True)
ap.add_argument('--api_uri', help='api uri, with https:// prefix')
ap.add_argument('--site_id', help='site ID for Scitran Central [local]', default='local')
......@@ -33,6 +34,7 @@ args = ap.parse_args()
# --site_name 'Example Site' or --site_name "Example Site"
args.site_name = ' '.join(args.site_name).strip('"\'')
args.quarantine_path = os.path.join(args.data_path, 'quarantine')
args.upload_path = os.path.join(args.data_path, 'upload')
logging.basicConfig(level=getattr(logging, args.log_level.upper())) #FIXME probably not necessary, because done in api.py
log = logging.getLogger('nimsapi')
......@@ -45,6 +47,13 @@ if not os.path.exists(application.config['data_path']):
os.makedirs(application.config['data_path'])
if not os.path.exists(application.config['quarantine_path']):
os.makedirs(application.config['quarantine_path'])
if not os.path.exists(application.config['upload_path']):
os.makedirs(application.config['upload_path'])
if not application.config['apps_path']:
log.warning('apps_path is not defined. Apps functionality disabled')
else:
if not os.path.exists(application.config['apps_path']):
os.makedirs(application.config['apps_path'])
# connect to db
application.db = None
......@@ -63,6 +72,16 @@ else:
# TODO: make api_uri a required arg?
application.db.sites.update({'_id': args.site_id}, {'_id': args.site_id, 'name': args.site_name, 'api_uri': args.api_uri}, upsert=True)
@uwsgidecorators.cron(0, -1, -1, -1, -1) # top of every hour
def upload_storage_cleaning(num):
upload_path = application.config['upload_path']
for f in os.listdir(upload_path):
fp = os.path.join(upload_path, f)
timestamp = datetime.datetime.utcfromtimestamp(int(os.stat(fp).st_mtime))
if timestamp < (datetime.datetime.utcnow() - datetime.timedelta(hours=1)):
log.debug('upload %s was last modified %s' % (fp, str(timestamp)))
os.remove(fp)
if not args.ssl_cert:
log.warning('SSL certificate not specified, Scitran Central functionality disabled')
elif not args.api_uri:
......
......@@ -20,6 +20,7 @@ log = logging.getLogger('nimsapi.jobs')
import tempdir as tempfile
import base
import util
# TODO: create schemas to verify various json payloads
APP_SCHEMA = {
......@@ -64,18 +65,27 @@ class Apps(base.RequestHandler):
"""Return information about the all the apps."""
def get(self):
apps_path = self.app.config.get('apps_path')
if not apps_path:
self.abort(503, 'GET api/apps/<id> unavailable. apps_path not defined')
return list(self.app.db.apps.find())
def count(self):
apps_path = self.app.config.get('apps_path')
if not apps_path:
self.abort(503, 'GET api/apps/<id> unavailable. apps_path not defined')
return self.app.db.apps.count()
def post(self):
"""Create a new App."""
# this handles receive and writing the file
# but the the json validation and database is handled by util.
apps_path = self.app.config['apps_path']
if not apps_path:
self.abort(503, 'POST api/apps unavailable. apps_path not defined')
if self.public_request: # TODO: how to handle auth during bootstrap?
self.abort(403, 'must be logged in to upload apps')
apps_path = self.app.config['apps_path']
app_meta = None
with tempfile.TemporaryDirectory(prefix='.tmp', dir=apps_path) as tempdir_path:
hash_ = hashlib.sha1()
......@@ -100,21 +110,27 @@ class Apps(base.RequestHandler):
except (ValueError, jsonschema.ValidationError) as e:
self.abort(400, str(e))
util.insert_app(self.app.db, app_temp, apps_path, app_meta=app_meta) # pass meta info, prevent re-reading
log.debug('Recieved App: %s' % app_info.get('_id'))
log.debug('Recieved App: %s' % app_meta.get('_id'))
class App(base.RequestHandler):
def get(self, _id):
# TODO: auth? should viewing apps be restricted?
apps_path = self.app.config.get('apps_path')
if not apps_path:
self.abort(503, 'GET api/apps/<id> unavailable. apps_path not defined')
return self.app.db.apps.find_one({'_id': _id})
def get_file(self, _id):
apps_path = self.app.config.get('apps_path')
if not apps_path:
self.abort(503, 'GET api/apps/<id> unavailable. apps_path not defined')
if self.public_request: # this will most often be a drone request
self.abort(403, 'must be logged in to download apps')
name, version = _id.split(':')
fn = '%s-%s.tar' % (name, version)
fp = os.path.join(self.app.config['apps_path'], name, fn)
fp = os.path.join(apps_path, name, fn)
self.response.app_iter = open(fp, 'rb')
self.response.headers['Content-Length'] = str(os.path.getsize(fp)) # must be set after setting app_iter
self.response.headers['Content-Type'] = 'application/octet-stream'
......
......@@ -14,6 +14,8 @@ import datetime
import util
import scitran.data as scidata
def connect_db(db_uri, **kwargs):
for x in range(0, 30):
try:
......@@ -122,72 +124,16 @@ def jobsinit(args):
if args.force:
db.drop_collection('jobs')
counter = db.jobs.count() + 1 # where to start creating jobs
dbc = db.jobs
for a in db.acquisitions.find({'files.state': ['orig']}, {'files.$': 1, 'session': 1, 'series': 1, 'acquisition': 1}):
if a.get('files')[0].get('kinds')[0] == 'screenshot':
print 'no default app set for screenshots. skipping...'
# find all "orig" files, and create jobs for them
for a in db.acquisitions.find({'files': {'$elemMatch': {'state': ['orig']}}}, {'files.$': 1}):
aid = str(a['_id'])
print aid
fp = os.path.join(args.data_path, aid[-3:], aid, a['files'][0]['name'] + a['files'][0]['ext'])
if not os.path.exists(fp):
print ('%s does not exist. no job created.' % fp)
continue
else:
app_id = 'scitran/dcm2nii:latest'
app_outputs = [
{
'fext': '.nii.gz',
'state': ['derived', ],
'type': 'nifti',
'kinds': a.get('files')[0].get('kinds'), # there should be someway to indicate 'from parent file'
},
{
'fext': '.bvec',
'state': ['derived', ],
'type': 'text',
'kinds': ['bvec', ],
},
{
'fext': '.bval',
'state': ['derived', ],
'type': 'text',
'kinds': ['bval', ],
},
]
aid = a.get('_id')
session = db.sessions.find_one({'_id': bson.ObjectId(a.get('session'))})
project = db.projects.find_one({'_id': bson.ObjectId(session.get('project'))})
output_url = '%s/%s/%s' % ('acquisitions', aid, 'file')
db.jobs.insert({
'_id': counter,
'group': project.get('group_id'),
'project': {
'_id': project.get('_id'),
'name': project.get('name'),
},
'exam': session.get('exam'),
'app': {
'_id': 'scitran/dcm2nii:latest',
'type': 'docker',
},
'inputs': [
{
'url': '%s/%s/%s' % ('acquisitions', aid, 'file'),
'payload': {
'type': a['files'][0]['type'],
'state': a['files'][0]['state'],
'kinds': a['files'][0]['kinds'],
},
}
],
'outputs': [{'url': output_url, 'payload': i} for i in app_outputs],
'status': 'pending', # queued
'activity': None,
'added': datetime.datetime.now(),
'timestamp': datetime.datetime.now(),
})
print 'created job %d, group: %s, project %s, exam %s, %s.%s' % (counter, project.get('group_id'), project.get('_id'), session.get('exam'), a.get('series'), a.get('acquisition'))
counter += 1
dataset = scidata.parse(fp)
util.create_job(db.jobs, dataset)
jobinit_desc = """
example:
......@@ -231,15 +177,6 @@ example:
./scripts/bootstrap.py sort mongodb://localhost/nims /tmp/data /tmp/sorted
"""
def dbinitsort(args):
logging.basicConfig(level=logging.WARNING)
dbinit(args)
upload(args)
dbinitsort_desc = """
example:
./scripts/bootstrap.py dbinitsort mongodb://localhost/nims -j bootstrap.json /tmp/data https://example.com/api/upload
"""
def upload(args):
import util
......@@ -335,6 +272,7 @@ jobsinit_parser = subparsers.add_parser(
)
jobsinit_parser.add_argument('-f', '--force', action='store_true', help='wipe out any existing jobs')
jobsinit_parser.add_argument('db_uri', help='DB URI')
jobsinit_parser.add_argument('data_path', help='filesystem path to sorted data')
jobsinit_parser.set_defaults(func=jobsinit)
sort_parser = subparsers.add_parser(
......@@ -349,20 +287,6 @@ sort_parser.add_argument('path', help='filesystem path to data')
sort_parser.add_argument('sort_path', help='filesystem path to sorted data')
sort_parser.set_defaults(func=sort)
dbinitsort_parser = subparsers.add_parser(
name='dbinitsort',
help='initialize database, then sort all files in a directory tree',
description=dbinitsort_desc,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
dbinitsort_parser.add_argument('db_uri', help='database URI')
dbinitsort_parser.add_argument('path', help='filesystem path to data')
dbinitsort_parser.add_argument('url', help='upload URL')
dbinitsort_parser.add_argument('-j', '--json', help='JSON file container users and groups')
dbinitsort_parser.add_argument('-f', '--force', action='store_true', help='wipe out any existing db data')
dbinitsort_parser.add_argument('-n', '--no_verify', help='disable SSL verification', action='store_true')
dbinitsort_parser.set_defaults(func=dbinitsort)
upload_parser = subparsers.add_parser(
name='upload',
help='upload all files in a directory tree',
......
......@@ -8,11 +8,13 @@ import re
import json
import hashlib
import tarfile
import zipfile
import datetime
import lockfile
import markdown
import jsonschema
import collections
import bson.json_util
import bson
import base
import util
......@@ -162,6 +164,156 @@ class Core(base.RequestHandler):
if status != 200:
self.abort(status, detail)
def incremental_upload(self):
"""
Recieve an incremental upload within a staging area.
3 phases:
1 - upload metadata
2 - upload dicoms, one at a time
3 - send a 'complete' message
"""
USER_UPLOAD_SCHEMA = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'title': 'metadata',
'type': 'object',
'properties': {
'filetype': {
'title': 'Filetype',
'type': 'string',
},
'overwrite': {
'title': 'Header Overwrites',
'type': 'object',
'properties': {
'group_name': {
'title': 'Group Name',
'type': 'string',
},
'project_name': {
'title': 'Project Name',
'type': 'string',
},
'series_uid': {
'title': 'Series UID',
'type': 'string',
},
'acq_no': {
'title': 'Acquisition Number',
'type': 'integer',
},
'manufacturer': {
'title': 'Instrument Manufacturer',
'type': 'string',
},
},
'required': ['group_name', 'project_name', 'series_uid', 'acq_no', 'manufacturer'],
},
},
'required': ['filetype', 'overwrite'],
'additionalProperties': True,
}
upload_id = self.request.get('_id')
complete = self.request.get('complete').lower() in ['1', 'true']
filename = self.request.get('filename')
content_md5 = self.request.headers.get('Content-MD5')
upload_path = self.app.config.get('upload_path')
def write_to_tar(fp, mode, fn, fobj, content_md5, arcname=None):
"""
Write fn to tarfile fp, with mode, in the inner dir arcname.
fp = path to tarfile
mode = 'w' or 'a'
fn = filename
fobj = opened file object, must be able to read()
content_md5 = expected sha1() hexdigest
arcname = tarfile internal directory
"""
if mode == 'a' and not os.path.exists(fp):
return 400, '%s does not exist' % fp
if not arcname: # get the arcname from the last file in the archive
with tarfile.open(fp, 'r') as tf:
arcname = os.path.dirname(tf.getnames()[0])
with lockfile.LockFile(fp):
with tarfile.open(fp, mode) as tf:
with tempfile.TemporaryDirectory() as tempdir_path:
hash_ = hashlib.sha1()
tempfn = os.path.join(tempdir_path, fn)
with open(tempfn, 'wb') as fd:
for chunk in iter(lambda: fobj.read(2**20), ''):
hash_.update(chunk)
fd.write(chunk)
if hash_.hexdigest() != content_md5:
status = 400
detail = 'Content-MD5 mismatch.'
else:
tf.add(tempfn, arcname=os.path.join(arcname, fn))
status = 200
detail = 'OK'
return status, detail
if not upload_id and filename.lower() == 'metadata.json': # create a new temporary file for staging
try:
metadata = self.request.json_body
jsonschema.validate(metadata, USER_UPLOAD_SCHEMA)
except jsonschema.ValidationError as e:
self.abort(400, str(e))
filetype = metadata['filetype']
overwrite = metadata['overwrite']
# check project and group permissions before proceeding
perms = self.app.db.projects.find_one({
'name': overwrite.get('project_name'),
'group_id': overwrite.get('group_name'),
'permissions': {'$elemMatch': {'_id': self.uid}},
}, ['permissions'])
if not perms and not self.superuser_request:
self.abort(403)
# give the interior directory the same name the reaper would give
acq_no = overwrite.get('acq_no', 1) if overwrite.get('manufacturer', '').upper() != 'SIEMENS' else None
arcname = overwrite.get('series_uid', '') + ('_' + str(acq_no) if acq_no is not None else '') + '_' + filetype
upload_id = str(bson.ObjectId())
log.debug('creating new temporary file %s' % upload_id)
fp = os.path.join(upload_path, upload_id + '.tar')
status, detail = write_to_tar(fp, 'w', 'METADATA.json', self.request.body_file, content_md5, arcname)
if status != 200:
self.abort(status, detail)
else:
return upload_id
elif upload_id and filename and not complete:
log.debug('appending to %s' % upload_id)
fp = os.path.join(upload_path, upload_id + '.tar')
status, detail = write_to_tar(fp, 'a', filename, self.request.body_file, content_md5) # don't know arcname anymore...
if status != 200:
self.abort(status, detail)
elif upload_id and complete:
fp = os.path.join(upload_path, upload_id + '.tar')
log.debug('completing %s' % fp)
with tempfile.TemporaryDirectory() as tempdir_path:
log.debug('working in tempdir %s' % tempdir_path)
zip_fp = os.path.join(tempdir_path, upload_id + '.tgz')
with tarfile.open(zip_fp,'w:gz', compresslevel=6) as zf: # zip of tarfile was not being recognized by scitran.data
with tarfile.open(fp, 'r') as tf:
for ti in tf.getmembers():
zf.addfile(ti, tf.extractfile(ti))
hash_ = hashlib.sha1()
with open(zip_fp, 'rb') as fd:
while True:
chunk = fd.read(2**20)
if not chunk:
break
hash_.update(chunk)
log.debug('inserting')
status, detail = util.insert_file(self.app.db.acquisitions, None, None, zip_fp, hash_.hexdigest(), self.app.config['data_path'], self.app.config['quarantine_path'])
if status != 200:
self.abort(status, detail)
os.remove(fp) # always remove the original tar upon 'complete'. complete file is sorted or quarantined.
else:
self.abort(400, 'Expected _id (str), filename (str), and/or complete (bool) parameters and binary file content as body')
def _preflight_archivestream(self, req_spec):
data_path = self.app.config['data_path']
arc_prefix = 'sdm'
......
......@@ -108,6 +108,7 @@ class Projects(containers.ContainerList):
self.abort(400, 'must be group admin to create project')
json_body['files'] = []
json_body['permissions'] = group['roles']
json_body['public'] = json_body.get('public', False)
return {'_id': str(self.dbc.insert(json_body))}
def get(self):
......
......@@ -53,6 +53,8 @@ def insert_file(dbc, _id, file_info, filepath, digest, data_path, quarantine_pat
log.info('Sorting %s' % filename)
_id = _update_db(dbc.database, dataset)
if not _id:
return 400, 'Session exists in different project'
file_spec = dict(
_id=_id,
files={'$elemMatch': {
......@@ -103,7 +105,12 @@ def _update_db(db, dataset):
session_spec = {'uid': dataset.nims_session_id}
session = db.sessions.find_one(session_spec, ['project'])
if session: # skip project creation, if session exists
project = db.projects.find_one({'_id': session['project']}, fields=PROJECTION_FIELDS)
project = db.projects.find_one({'_id': session['project']}, fields=PROJECTION_FIELDS + ['name'])
#TODO:the session must belong to the specified group/project, or not exist at all
# if the session exists, for a different group/project, reject the hell out of it.
# a single session cannot be split between two different projects
if project['name'] != dataset.nims_project:
return None
else:
existing_group_ids = [g['_id'] for g in db.groups.find(None, ['_id'])]
group_id_matches = difflib.get_close_matches(dataset.nims_group_id, existing_group_ids, cutoff=0.8)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment